make things work a bit more

This commit is contained in:
Serge Smertin 2023-08-07 14:36:32 +02:00
parent a686542b1f
commit 771d7bc5f8
No known key found for this signature in database
GPG Key ID: 92A95A66446BCE3F
11 changed files with 411 additions and 147 deletions

View File

@ -13,6 +13,9 @@ func LoadAll(ctx context.Context) (features []*Feature, err error) {
return nil, err return nil, err
} }
labsDir, err := os.ReadDir(filepath.Join(home, ".databricks", "labs")) labsDir, err := os.ReadDir(filepath.Join(home, ".databricks", "labs"))
if os.IsNotExist(err) {
return nil, nil
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -24,6 +27,10 @@ func LoadAll(ctx context.Context) (features []*Feature, err error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("%s: %w", v.Name(), err) return nil, fmt.Errorf("%s: %w", v.Name(), err)
} }
err = feature.loadMetadata()
if err != nil {
return nil, fmt.Errorf("%s metadata: %w", v.Name(), err)
}
features = append(features, feature) features = append(features, feature)
} }
return features, nil return features, nil

View File

@ -1,20 +1,17 @@
package feature package feature
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io"
"os" "os"
"os/exec"
"path/filepath" "path/filepath"
"sort"
"strings" "strings"
"time" "time"
"github.com/databricks/cli/libs/git" "github.com/databricks/cli/libs/git"
"github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/log"
"golang.org/x/mod/semver" "github.com/databricks/cli/libs/process"
"github.com/databricks/cli/libs/python"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
) )
@ -25,7 +22,7 @@ type Feature struct {
Hooks struct { Hooks struct {
Install string `json:"install,omitempty"` Install string `json:"install,omitempty"`
Uninstall string `json:"uninstall,omitempty"` Uninstall string `json:"uninstall,omitempty"`
} } `json:"hooks,omitempty"`
Entrypoint string `json:"entrypoint"` Entrypoint string `json:"entrypoint"`
Commands []struct { Commands []struct {
Name string `json:"name"` Name string `json:"name"`
@ -36,6 +33,7 @@ type Feature struct {
} `json:"flags,omitempty"` } `json:"flags,omitempty"`
} `json:"commands,omitempty"` } `json:"commands,omitempty"`
version string
path string path string
checkout *git.Repository checkout *git.Repository
} }
@ -45,25 +43,26 @@ func NewFeature(name string) (*Feature, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
version := "latest"
split := strings.Split(name, "@")
if len(split) > 2 {
return nil, fmt.Errorf("invalid coordinates: %s", name)
}
if len(split) == 2 {
name = split[0]
version = split[1]
}
path := filepath.Join(home, ".databricks", "labs", name) path := filepath.Join(home, ".databricks", "labs", name)
checkout, err := git.NewRepository(path) checkout, err := git.NewRepository(path)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return nil, err return nil, err
} }
feat := &Feature{ return &Feature{
Name: name, Name: name,
path: path, path: path,
version: version,
checkout: checkout, checkout: checkout,
} }, nil
raw, err := os.ReadFile(filepath.Join(path, "labs.yml"))
if err != nil {
return nil, fmt.Errorf("read labs.yml: %w", err)
}
err = yaml.Unmarshal(raw, feat)
if err != nil {
return nil, fmt.Errorf("parse labs.yml: %w", err)
}
return feat, nil
} }
type release struct { type release struct {
@ -73,7 +72,19 @@ type release struct {
PublishedAt time.Time `json:"published_at"` PublishedAt time.Time `json:"published_at"`
} }
func (i *Feature) LatestVersion(ctx context.Context) (*release, error) { func (i *Feature) loadMetadata() error {
raw, err := os.ReadFile(filepath.Join(i.path, "labs.yml"))
if err != nil {
return fmt.Errorf("read labs.yml: %w", err)
}
err = yaml.Unmarshal(raw, i)
if err != nil {
return fmt.Errorf("parse labs.yml: %w", err)
}
return nil
}
func (i *Feature) fetchLatestVersion(ctx context.Context) (*release, error) {
var tags []release var tags []release
url := fmt.Sprintf("https://api.github.com/repos/databrickslabs/%s/releases", i.Name) url := fmt.Sprintf("https://api.github.com/repos/databrickslabs/%s/releases", i.Name)
err := httpCall(ctx, url, &tags) err := httpCall(ctx, url, &tags)
@ -83,136 +94,105 @@ func (i *Feature) LatestVersion(ctx context.Context) (*release, error) {
return &tags[0], nil return &tags[0], nil
} }
const CacheDir = ".databricks" func (i *Feature) requestedVersion(ctx context.Context) (string, error) {
if i.version == "latest" {
type pythonInstallation struct { release, err := i.fetchLatestVersion(ctx)
Version string
Binary string
}
func (i *Feature) pythonExecutables(ctx context.Context) ([]pythonInstallation, error) {
found := []pythonInstallation{}
paths := strings.Split(os.Getenv("PATH"), string(os.PathListSeparator))
for _, candidate := range paths {
bin := filepath.Join(candidate, "python3")
_, err := os.Stat(bin)
if err != nil && os.IsNotExist(err) {
continue
}
out, err := i.cmd(ctx, bin, "--version")
if err != nil { if err != nil {
return nil, err return "", err
} }
words := strings.Split(out, " ") return release.TagName, nil
found = append(found, pythonInstallation{
Version: words[len(words)-1],
Binary: bin,
})
} }
if len(found) == 0 { return i.version, nil
return nil, fmt.Errorf("no python3 executables found")
}
sort.Slice(found, func(i, j int) bool {
a := found[i].Version
b := found[j].Version
cmp := semver.Compare(a, b)
if cmp != 0 {
return cmp < 0
}
return a < b
})
return found, nil
}
func (i *Feature) installVirtualEnv(ctx context.Context) error {
_, err := os.Stat(filepath.Join(i.path, "setup.py"))
if err != nil {
return err
}
pys, err := i.pythonExecutables(ctx)
if err != nil {
return err
}
python3 := pys[0].Binary
log.Debugf(ctx, "Creating python virtual environment in %s/%s", i.path, CacheDir)
_, err = i.cmd(ctx, python3, "-m", "venv", CacheDir)
if err != nil {
return fmt.Errorf("create venv: %w", err)
}
log.Debugf(ctx, "Installing dependencies from setup.py")
venvPip := filepath.Join(i.path, CacheDir, "bin", "pip")
_, err = i.cmd(ctx, venvPip, "install", ".")
if err != nil {
return fmt.Errorf("pip install: %w", err)
}
return nil
}
func (i *Feature) Run(ctx context.Context, raw []byte) error {
err := i.installVirtualEnv(ctx)
if err != nil {
return err
}
// TODO: detect virtual env (also create it on installation),
// because here we just assume that virtual env is installed.
python3 := filepath.Join(i.path, CacheDir, "bin", "python")
// make sure to sync on writing to stdout
reader, writer := io.Pipe()
go io.CopyBuffer(os.Stdout, reader, make([]byte, 128))
defer reader.Close()
defer writer.Close()
// pass command parameters down to script as the first arg
cmd := exec.Command(python3, i.Entrypoint, string(raw))
cmd.Dir = i.path
cmd.Stdout = writer
cmd.Stderr = writer
stdin, err := cmd.StdinPipe()
if err != nil {
return err
}
go io.CopyBuffer(stdin, os.Stdin, make([]byte, 128))
defer stdin.Close()
err = cmd.Start()
if err != nil {
return err
}
return cmd.Wait()
}
func (i *Feature) cmd(ctx context.Context, args ...string) (string, error) {
commandStr := strings.Join(args, " ")
log.Debugf(ctx, "running: %s", commandStr)
cmd := exec.Command(args[0], args[1:]...)
stdout := &bytes.Buffer{}
cmd.Dir = i.path
cmd.Stdin = os.Stdin
cmd.Stdout = stdout
cmd.Stderr = stdout
if err := cmd.Run(); err != nil {
return "", fmt.Errorf("%s: %s", commandStr, stdout.String())
}
return strings.TrimSpace(stdout.String()), nil
} }
func (i *Feature) Install(ctx context.Context) error { func (i *Feature) Install(ctx context.Context) error {
if i.checkout != nil { if i.hasFile(".git/HEAD") {
curr, err := i.cmd(ctx, "git", "tag", "--points-at", "HEAD") curr, err := process.Background(ctx, []string{
"git", "tag", "--points-at", "HEAD",
}, process.WithDir(i.path))
if err != nil { if err != nil {
return err return err
} }
return fmt.Errorf("%s (%s) is already installed", i.Name, curr) return fmt.Errorf("%s (%s) is already installed", i.Name, curr)
} }
url := fmt.Sprintf("https://github.com/databrickslabs/%s", i.Name) url := fmt.Sprintf("https://github.com/databrickslabs/%s", i.Name)
release, err := i.LatestVersion(ctx) version, err := i.requestedVersion(ctx)
if err != nil { if err != nil {
return err return err
} }
log.Infof(ctx, "Installing %s (%s) into %s", url, release.TagName, i.path) log.Infof(ctx, "Installing %s (%s) into %s", url, version, i.path)
return git.Clone(ctx, url, release.TagName, i.path) err = git.Clone(ctx, url, version, i.path)
if err != nil {
return err
}
err = i.loadMetadata()
if err != nil {
return fmt.Errorf("labs.yml: %w", err)
}
if i.isPython() {
err := i.installPythonTool(ctx)
if err != nil {
return err
}
}
return nil
}
const CacheDir = ".databricks"
func (i *Feature) Run(ctx context.Context, raw []byte) error {
// raw is a JSON-encoded payload that holds things like command name and flags
return i.forwardPython(ctx, filepath.Join(i.path, i.Entrypoint), string(raw))
}
func (i *Feature) hasFile(name string) bool {
_, err := os.Stat(filepath.Join(i.path, name))
return err == nil
}
func (i *Feature) isPython() bool {
return i.hasFile("setup.py") || i.hasFile("pyproject.toml")
}
func (i *Feature) venvBinDir() string {
return filepath.Join(i.path, CacheDir, "bin")
}
func (i *Feature) forwardPython(ctx context.Context, pythonArgs ...string) error {
args := []string{filepath.Join(i.venvBinDir(), "python")}
args = append(args, pythonArgs...)
return process.Forwarded(ctx, args,
process.WithDir(i.path), // we may need to skip it for install step
process.WithEnv("PYTHONPATH", i.path))
}
func (i *Feature) installPythonTool(ctx context.Context) error {
pythons, err := python.DetectInterpreters(ctx)
if err != nil {
return err
}
interpreter := pythons.Latest()
log.Debugf(ctx, "Creating Python %s virtual environment in %s", interpreter.Version, i.path)
_, err = process.Background(ctx, []string{
interpreter.Binary, "-m", "venv", CacheDir,
}, process.WithDir(i.path))
if err != nil {
return fmt.Errorf("create venv: %w", err)
}
log.Debugf(ctx, "Installing dependencies via PIP")
venvPip := filepath.Join(i.venvBinDir(), "pip")
_, err = process.Background(ctx, []string{
venvPip, "install", ".",
}, process.WithDir(i.path))
if err != nil {
return fmt.Errorf("pip install: %w", err)
}
if i.Hooks.Install != "" {
installer := filepath.Join(i.path, i.Hooks.Install)
err = i.forwardPython(ctx, installer)
if err != nil {
return fmt.Errorf("%s: %w", i.Hooks.Install, err)
}
}
return nil
} }

View File

@ -14,6 +14,15 @@ func newInstallCommand() *cobra.Command {
PreRunE: root.MustWorkspaceClient, PreRunE: root.MustWorkspaceClient,
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context() ctx := cmd.Context()
// TODO: context can be on both command and feature level
err := root.MustWorkspaceClient(cmd, args)
if err != nil {
return err
}
// TODO: add account-level init as well
w := root.WorkspaceClient(cmd.Context())
propagateEnvConfig(w.Config)
state, err := feature.NewFeature(args[0]) state, err := feature.NewFeature(args[0])
if err != nil { if err != nil {
return err return err

View File

@ -10,6 +10,6 @@ import (
func TestInstallDbx(t *testing.T) { func TestInstallDbx(t *testing.T) {
ctx := context.Background() ctx := context.Background()
_, err := internal.RunGetOutput(ctx, "labs", "install", "dbx") _, err := internal.RunGetOutput(ctx, "labs", "install", "dbx@metadata", "--profile", "bogdan")
assert.NoError(t, err) assert.NoError(t, err)
} }

View File

@ -50,6 +50,21 @@ type commandInput struct {
OutputType string `json:"output_type"` OutputType string `json:"output_type"`
} }
func propagateEnvConfig(cfg *config.Config) error {
for _, a := range config.ConfigAttributes {
if a.IsZero(cfg) {
continue
}
for _, ev := range a.EnvVars {
err := os.Setenv(ev, a.GetString(cfg))
if err != nil {
return fmt.Errorf("set %s: %w", a.Name, err)
}
}
}
return nil
}
func infuse(cmd *cobra.Command) error { func infuse(cmd *cobra.Command) error {
ctx := cmd.Context() ctx := cmd.Context()
all, err := feature.LoadAll(ctx) all, err := feature.LoadAll(ctx)
@ -79,17 +94,7 @@ func infuse(cmd *cobra.Command) error {
} }
// TODO: add account-level init as well // TODO: add account-level init as well
w := root.WorkspaceClient(cmd.Context()) w := root.WorkspaceClient(cmd.Context())
for _, a := range config.ConfigAttributes { propagateEnvConfig(w.Config)
if a.IsZero(w.Config) {
continue
}
for _, ev := range a.EnvVars {
err = os.Setenv(ev, a.GetString(w.Config))
if err != nil {
return fmt.Errorf("set %s: %w", a.Name, err)
}
}
}
} }
ci := &commandInput{ ci := &commandInput{
Command: l.Name, Command: l.Name,

View File

@ -0,0 +1,32 @@
package process
import (
"bytes"
"context"
"fmt"
"os"
"os/exec"
"strings"
"github.com/databricks/cli/libs/log"
)
func Background(ctx context.Context, args []string, opts ...execOption) (string, error) {
commandStr := strings.Join(args, " ")
log.Debugf(ctx, "running: %s", commandStr)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
stdout := &bytes.Buffer{}
cmd.Stdin = os.Stdin
cmd.Stdout = stdout
cmd.Stderr = stdout
for _, o := range opts {
err := o(cmd)
if err != nil {
return "", err
}
}
if err := cmd.Run(); err != nil {
return "", fmt.Errorf("%s: %s", commandStr, stdout.String())
}
return strings.TrimSpace(stdout.String()), nil
}

47
libs/process/forwarded.go Normal file
View File

@ -0,0 +1,47 @@
package process
import (
"context"
"io"
"os"
"os/exec"
"strings"
"github.com/databricks/cli/libs/log"
)
func Forwarded(ctx context.Context, args []string, opts ...execOption) error {
commandStr := strings.Join(args, " ")
log.Debugf(ctx, "starting: %s", commandStr)
cmd := exec.CommandContext(ctx, args[0], args[1:]...)
// make sure to sync on writing to stdout
reader, writer := io.Pipe()
go io.CopyBuffer(os.Stdout, reader, make([]byte, 128))
defer reader.Close()
defer writer.Close()
cmd.Stdout = writer
cmd.Stderr = writer
// apply common options
for _, o := range opts {
err := o(cmd)
if err != nil {
return err
}
}
stdin, err := cmd.StdinPipe()
if err != nil {
return err
}
go io.CopyBuffer(stdin, os.Stdin, make([]byte, 128))
defer stdin.Close()
err = cmd.Start()
if err != nil {
return err
}
return cmd.Wait()
}

39
libs/process/opts.go Normal file
View File

@ -0,0 +1,39 @@
package process
import (
"fmt"
"os"
"os/exec"
)
type execOption func(*exec.Cmd) error
func WithEnv(key, value string) execOption {
return func(c *exec.Cmd) error {
if c.Env == nil {
c.Env = os.Environ()
}
v := fmt.Sprintf("%s=%s", key, value)
c.Env = append(c.Env, v)
return nil
}
}
func WithEnvs(envs map[string]string) execOption {
return func(c *exec.Cmd) error {
for k, v := range envs {
err := WithEnv(k, v)(c)
if err != nil {
return err
}
}
return nil
}
}
func WithDir(dir string) execOption {
return func(c *exec.Cmd) error {
c.Dir = dir
return nil
}
}

View File

@ -0,0 +1,95 @@
package python
import (
"context"
"fmt"
"os"
"path/filepath"
"sort"
"strings"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/process"
"golang.org/x/mod/semver"
)
type Interpreter struct {
Version string
Binary string
}
func (i Interpreter) String() string {
return fmt.Sprintf("%s (%s)", i.Version, i.Binary)
}
type AllInterpreters []Interpreter
func (a AllInterpreters) Latest() Interpreter {
return a[len(a)-1]
}
func DetectInterpreters(ctx context.Context) (AllInterpreters, error) {
found := AllInterpreters{}
paths := strings.Split(os.Getenv("PATH"), string(os.PathListSeparator))
seen := map[string]bool{}
for _, prefix := range paths {
entries, err := os.ReadDir(prefix)
if os.IsNotExist(err) {
// some directories in $PATH may not exist
continue
}
if err != nil {
return nil, fmt.Errorf("listing %s: %w", prefix, err)
}
for _, v := range entries {
if v.IsDir() {
continue
}
if strings.Contains(v.Name(), "-") {
// skip python3-config, python3.10-config, etc
continue
}
if !strings.HasPrefix(v.Name(), "python3") {
continue
}
bin := filepath.Join(prefix, v.Name())
resolved, err := filepath.EvalSymlinks(bin)
if err != nil {
log.Debugf(ctx, "cannot resolve symlink for %s: %s", bin, resolved)
continue
}
if seen[resolved] {
continue
}
seen[resolved] = true
out, err := process.Background(ctx, []string{resolved, "--version"})
if err != nil {
// TODO: skip-and-log or return?
return nil, err
}
words := strings.Split(out, " ")
if words[0] != "Python" {
continue
}
lastWord := words[len(words)-1]
version := semver.Canonical("v" + lastWord)
found = append(found, Interpreter{
Version: version,
Binary: bin,
})
}
}
if len(found) == 0 {
return nil, fmt.Errorf("no python3 executables found")
}
sort.Slice(found, func(i, j int) bool {
a := found[i].Version
b := found[j].Version
cmp := semver.Compare(a, b)
if cmp != 0 {
return cmp < 0
}
return a < b
})
return found, nil
}

View File

@ -0,0 +1,17 @@
package python
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
func TestAtLeastOnePythonInstalled(t *testing.T) {
ctx := context.Background()
all, err := DetectInterpreters(ctx)
assert.NoError(t, err)
a := all.Latest()
t.Logf("latest is: %s", a)
assert.True(t, len(all) > 0)
}

33
libs/python/venv.go Normal file
View File

@ -0,0 +1,33 @@
package python
import (
"errors"
"os"
"path/filepath"
)
// DetectVirtualEnv scans direct subfolders in path to get a valid
// Virtual Environment installation, that is marked by pyvenv.cfg file.
//
// See: https://packaging.python.org/en/latest/tutorials/packaging-projects/
func DetectVirtualEnvPath(path string) (string, error) {
files, err := os.ReadDir(path)
if err != nil {
return "", err
}
for _, v := range files {
if !v.IsDir() {
continue
}
candidate := filepath.Join(path, v.Name())
_, err = os.Stat(filepath.Join(candidate, "pyvenv.cfg"))
if errors.Is(err, os.ErrNotExist) {
continue
}
if err != nil {
return "", err
}
return candidate, nil
}
return "", nil
}