mirror of https://github.com/databricks/cli.git
Add ApplyPythonMutator (#1430)
## Changes Add ApplyPythonMutator, which will fork the Python subprocess and process pipe bundle configuration through it. It's enabled through `experimental` section, for example: ```yaml experimental: pydabs: enable: true venv_path: .venv ``` For now, it's limited to two phases in the mutator pipeline: - `load`: adds new jobs - `init`: adds new jobs, or modifies existing ones It's enforced that no jobs are modified in `load` and not jobs are deleted in `load/init`, because, otherwise, it will break existing assumptions. ## Tests Unit tests
This commit is contained in:
parent
b2c03ea54c
commit
57a5a65f87
|
@ -23,6 +23,22 @@ type Experimental struct {
|
||||||
// be removed in the future once we have a proper workaround like allowing IS_OWNER
|
// be removed in the future once we have a proper workaround like allowing IS_OWNER
|
||||||
// as a top-level permission in the DAB.
|
// as a top-level permission in the DAB.
|
||||||
UseLegacyRunAs bool `json:"use_legacy_run_as,omitempty"`
|
UseLegacyRunAs bool `json:"use_legacy_run_as,omitempty"`
|
||||||
|
|
||||||
|
// PyDABs determines whether to load the 'databricks-pydabs' package.
|
||||||
|
//
|
||||||
|
// PyDABs allows to define bundle configuration using Python.
|
||||||
|
PyDABs PyDABs `json:"pydabs,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type PyDABs struct {
|
||||||
|
// Enabled is a flag to enable the feature.
|
||||||
|
Enabled bool `json:"enabled,omitempty"`
|
||||||
|
|
||||||
|
// VEnvPath is path to the virtual environment.
|
||||||
|
//
|
||||||
|
// Required if PyDABs is enabled. PyDABs will load the code in the specified
|
||||||
|
// environment.
|
||||||
|
VEnvPath string `json:"venv_path,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Command string
|
type Command string
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"github.com/databricks/cli/bundle"
|
"github.com/databricks/cli/bundle"
|
||||||
"github.com/databricks/cli/bundle/config"
|
"github.com/databricks/cli/bundle/config"
|
||||||
"github.com/databricks/cli/bundle/config/loader"
|
"github.com/databricks/cli/bundle/config/loader"
|
||||||
|
pythonmutator "github.com/databricks/cli/bundle/config/mutator/python"
|
||||||
"github.com/databricks/cli/bundle/scripts"
|
"github.com/databricks/cli/bundle/scripts"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -24,5 +25,6 @@ func DefaultMutators() []bundle.Mutator {
|
||||||
InitializeVariables(),
|
InitializeVariables(),
|
||||||
DefineDefaultTarget(),
|
DefineDefaultTarget(),
|
||||||
LoadGitDetails(),
|
LoadGitDetails(),
|
||||||
|
pythonmutator.ApplyPythonMutator(pythonmutator.ApplyPythonMutatorPhaseLoad),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,268 @@
|
||||||
|
package python
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"runtime"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/bundle"
|
||||||
|
"github.com/databricks/cli/bundle/config"
|
||||||
|
"github.com/databricks/cli/libs/diag"
|
||||||
|
"github.com/databricks/cli/libs/dyn"
|
||||||
|
"github.com/databricks/cli/libs/dyn/convert"
|
||||||
|
"github.com/databricks/cli/libs/dyn/merge"
|
||||||
|
"github.com/databricks/cli/libs/dyn/yamlloader"
|
||||||
|
"github.com/databricks/cli/libs/log"
|
||||||
|
"github.com/databricks/cli/libs/process"
|
||||||
|
)
|
||||||
|
|
||||||
|
type phase string
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ApplyPythonMutatorPhaseLoad is the phase in which bundle configuration is loaded.
|
||||||
|
//
|
||||||
|
// At this stage, PyDABs adds statically defined resources to the bundle configuration.
|
||||||
|
// Which resources are added should be deterministic and not depend on the bundle configuration.
|
||||||
|
//
|
||||||
|
// We also open for possibility of appending other sections of bundle configuration,
|
||||||
|
// for example, adding new variables. However, this is not supported yet, and CLI rejects
|
||||||
|
// such changes.
|
||||||
|
ApplyPythonMutatorPhaseLoad phase = "load"
|
||||||
|
|
||||||
|
// ApplyPythonMutatorPhaseInit is the phase after bundle configuration was loaded, and
|
||||||
|
// the list of statically declared resources is known.
|
||||||
|
//
|
||||||
|
// At this stage, PyDABs adds resources defined using generators, or mutates existing resources,
|
||||||
|
// including the ones defined using YAML.
|
||||||
|
//
|
||||||
|
// During this process, within generator and mutators, PyDABs can access:
|
||||||
|
// - selected deployment target
|
||||||
|
// - bundle variables values
|
||||||
|
// - variables provided through CLI arguments or environment variables
|
||||||
|
//
|
||||||
|
// The following is not available:
|
||||||
|
// - variables referencing other variables are in unresolved format
|
||||||
|
//
|
||||||
|
// PyDABs can output YAML containing references to variables, and CLI should resolve them.
|
||||||
|
//
|
||||||
|
// Existing resources can't be removed, and CLI rejects such changes.
|
||||||
|
ApplyPythonMutatorPhaseInit phase = "init"
|
||||||
|
)
|
||||||
|
|
||||||
|
type applyPythonMutator struct {
|
||||||
|
phase phase
|
||||||
|
}
|
||||||
|
|
||||||
|
func ApplyPythonMutator(phase phase) bundle.Mutator {
|
||||||
|
return &applyPythonMutator{
|
||||||
|
phase: phase,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *applyPythonMutator) Name() string {
|
||||||
|
return fmt.Sprintf("ApplyPythonMutator(%s)", m.phase)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getExperimental(b *bundle.Bundle) config.Experimental {
|
||||||
|
if b.Config.Experimental == nil {
|
||||||
|
return config.Experimental{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return *b.Config.Experimental
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *applyPythonMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||||
|
experimental := getExperimental(b)
|
||||||
|
|
||||||
|
if !experimental.PyDABs.Enabled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if experimental.PyDABs.VEnvPath == "" {
|
||||||
|
return diag.Errorf("\"experimental.pydabs.enabled\" can only be used when \"experimental.pydabs.venv_path\" is set")
|
||||||
|
}
|
||||||
|
|
||||||
|
err := b.Config.Mutate(func(leftRoot dyn.Value) (dyn.Value, error) {
|
||||||
|
pythonPath := interpreterPath(experimental.PyDABs.VEnvPath)
|
||||||
|
|
||||||
|
if _, err := os.Stat(pythonPath); err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("can't find %q, check if venv is created", pythonPath)
|
||||||
|
} else {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("can't find %q: %w", pythonPath, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rightRoot, err := m.runPythonMutator(ctx, b.RootPath, pythonPath, leftRoot)
|
||||||
|
if err != nil {
|
||||||
|
return dyn.InvalidValue, err
|
||||||
|
}
|
||||||
|
|
||||||
|
visitor, err := createOverrideVisitor(ctx, m.phase)
|
||||||
|
if err != nil {
|
||||||
|
return dyn.InvalidValue, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return merge.Override(leftRoot, rightRoot, visitor)
|
||||||
|
})
|
||||||
|
|
||||||
|
return diag.FromErr(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *applyPythonMutator) runPythonMutator(ctx context.Context, rootPath string, pythonPath string, root dyn.Value) (dyn.Value, error) {
|
||||||
|
args := []string{
|
||||||
|
pythonPath,
|
||||||
|
"-m",
|
||||||
|
"databricks.bundles.build",
|
||||||
|
"--phase",
|
||||||
|
string(m.phase),
|
||||||
|
}
|
||||||
|
|
||||||
|
// we need to marshal dyn.Value instead of bundle.Config to JSON to support
|
||||||
|
// non-string fields assigned with bundle variables
|
||||||
|
rootConfigJson, err := json.Marshal(root.AsAny())
|
||||||
|
if err != nil {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("failed to marshal root config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logWriter := newLogWriter(ctx, "stderr: ")
|
||||||
|
|
||||||
|
stdout, err := process.Background(
|
||||||
|
ctx,
|
||||||
|
args,
|
||||||
|
process.WithDir(rootPath),
|
||||||
|
process.WithStderrWriter(logWriter),
|
||||||
|
process.WithStdinReader(bytes.NewBuffer(rootConfigJson)),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("python mutator process failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// we need absolute path, or because later parts of pipeline assume all paths are absolute
|
||||||
|
// and this file will be used as location
|
||||||
|
virtualPath, err := filepath.Abs(filepath.Join(rootPath, "__generated_by_pydabs__.yml"))
|
||||||
|
if err != nil {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("failed to get absolute path: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
generated, err := yamlloader.LoadYAML(virtualPath, bytes.NewReader([]byte(stdout)))
|
||||||
|
if err != nil {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("failed to parse Python mutator output: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
normalized, diagnostic := convert.Normalize(config.Root{}, generated)
|
||||||
|
if diagnostic.Error() != nil {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("failed to normalize Python mutator output: %w", diagnostic.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// warnings shouldn't happen because output should be already normalized
|
||||||
|
// when it happens, it's a bug in the mutator, and should be treated as an error
|
||||||
|
|
||||||
|
for _, d := range diagnostic.Filter(diag.Warning) {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("failed to normalize Python mutator output: %s", d.Summary)
|
||||||
|
}
|
||||||
|
|
||||||
|
return normalized, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createOverrideVisitor(ctx context.Context, phase phase) (merge.OverrideVisitor, error) {
|
||||||
|
switch phase {
|
||||||
|
case ApplyPythonMutatorPhaseLoad:
|
||||||
|
return createLoadOverrideVisitor(ctx), nil
|
||||||
|
case ApplyPythonMutatorPhaseInit:
|
||||||
|
return createInitOverrideVisitor(ctx), nil
|
||||||
|
default:
|
||||||
|
return merge.OverrideVisitor{}, fmt.Errorf("unknown phase: %s", phase)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// createLoadOverrideVisitor creates an override visitor for the load phase.
|
||||||
|
//
|
||||||
|
// During load, it's only possible to create new resources, and not modify or
|
||||||
|
// delete existing ones.
|
||||||
|
func createLoadOverrideVisitor(ctx context.Context) merge.OverrideVisitor {
|
||||||
|
jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"))
|
||||||
|
|
||||||
|
return merge.OverrideVisitor{
|
||||||
|
VisitDelete: func(valuePath dyn.Path, left dyn.Value) error {
|
||||||
|
return fmt.Errorf("unexpected change at %q (delete)", valuePath.String())
|
||||||
|
},
|
||||||
|
VisitInsert: func(valuePath dyn.Path, right dyn.Value) (dyn.Value, error) {
|
||||||
|
if !valuePath.HasPrefix(jobsPath) {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (insert)", valuePath.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
insertResource := len(valuePath) == len(jobsPath)+1
|
||||||
|
|
||||||
|
// adding a property into an existing resource is not allowed, because it changes it
|
||||||
|
if !insertResource {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (insert)", valuePath.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf(ctx, "Insert value at %q", valuePath.String())
|
||||||
|
|
||||||
|
return right, nil
|
||||||
|
},
|
||||||
|
VisitUpdate: func(valuePath dyn.Path, left dyn.Value, right dyn.Value) (dyn.Value, error) {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (update)", valuePath.String())
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// createInitOverrideVisitor creates an override visitor for the init phase.
|
||||||
|
//
|
||||||
|
// During the init phase it's possible to create new resources, modify existing
|
||||||
|
// resources, but not delete existing resources.
|
||||||
|
func createInitOverrideVisitor(ctx context.Context) merge.OverrideVisitor {
|
||||||
|
jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"))
|
||||||
|
|
||||||
|
return merge.OverrideVisitor{
|
||||||
|
VisitDelete: func(valuePath dyn.Path, left dyn.Value) error {
|
||||||
|
if !valuePath.HasPrefix(jobsPath) {
|
||||||
|
return fmt.Errorf("unexpected change at %q (delete)", valuePath.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
deleteResource := len(valuePath) == len(jobsPath)+1
|
||||||
|
|
||||||
|
if deleteResource {
|
||||||
|
return fmt.Errorf("unexpected change at %q (delete)", valuePath.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleting properties is allowed because it only changes an existing resource
|
||||||
|
log.Debugf(ctx, "Delete value at %q", valuePath.String())
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
VisitInsert: func(valuePath dyn.Path, right dyn.Value) (dyn.Value, error) {
|
||||||
|
if !valuePath.HasPrefix(jobsPath) {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (insert)", valuePath.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf(ctx, "Insert value at %q", valuePath.String())
|
||||||
|
|
||||||
|
return right, nil
|
||||||
|
},
|
||||||
|
VisitUpdate: func(valuePath dyn.Path, left dyn.Value, right dyn.Value) (dyn.Value, error) {
|
||||||
|
if !valuePath.HasPrefix(jobsPath) {
|
||||||
|
return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (update)", valuePath.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf(ctx, "Update value at %q", valuePath.String())
|
||||||
|
|
||||||
|
return right, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// interpreterPath returns platform-specific path to Python interpreter in the virtual environment.
|
||||||
|
func interpreterPath(venvPath string) string {
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
return filepath.Join(venvPath, "Scripts", "python3.exe")
|
||||||
|
} else {
|
||||||
|
return filepath.Join(venvPath, "bin", "python3")
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,450 @@
|
||||||
|
package python
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"path/filepath"
|
||||||
|
"reflect"
|
||||||
|
"runtime"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"golang.org/x/exp/maps"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/libs/dyn"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/bundle"
|
||||||
|
"github.com/databricks/cli/bundle/config"
|
||||||
|
assert "github.com/databricks/cli/libs/dyn/dynassert"
|
||||||
|
"github.com/databricks/cli/libs/process"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestApplyPythonMutator_Name_load(t *testing.T) {
|
||||||
|
mutator := ApplyPythonMutator(ApplyPythonMutatorPhaseLoad)
|
||||||
|
|
||||||
|
assert.Equal(t, "ApplyPythonMutator(load)", mutator.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApplyPythonMutator_Name_init(t *testing.T) {
|
||||||
|
mutator := ApplyPythonMutator(ApplyPythonMutatorPhaseInit)
|
||||||
|
|
||||||
|
assert.Equal(t, "ApplyPythonMutator(init)", mutator.Name())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApplyPythonMutator_load(t *testing.T) {
|
||||||
|
withFakeVEnv(t, ".venv")
|
||||||
|
|
||||||
|
b := loadYaml("databricks.yml", `
|
||||||
|
experimental:
|
||||||
|
pydabs:
|
||||||
|
enabled: true
|
||||||
|
venv_path: .venv
|
||||||
|
resources:
|
||||||
|
jobs:
|
||||||
|
job0:
|
||||||
|
name: job_0`)
|
||||||
|
|
||||||
|
ctx := withProcessStub(
|
||||||
|
[]string{
|
||||||
|
interpreterPath(".venv"),
|
||||||
|
"-m",
|
||||||
|
"databricks.bundles.build",
|
||||||
|
"--phase",
|
||||||
|
"load",
|
||||||
|
},
|
||||||
|
`{
|
||||||
|
"experimental": {
|
||||||
|
"pydabs": {
|
||||||
|
"enabled": true,
|
||||||
|
"venv_path": ".venv"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"resources": {
|
||||||
|
"jobs": {
|
||||||
|
"job0": {
|
||||||
|
name: "job_0"
|
||||||
|
},
|
||||||
|
"job1": {
|
||||||
|
name: "job_1"
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
|
||||||
|
mutator := ApplyPythonMutator(ApplyPythonMutatorPhaseLoad)
|
||||||
|
diag := bundle.Apply(ctx, b, mutator)
|
||||||
|
|
||||||
|
assert.NoError(t, diag.Error())
|
||||||
|
|
||||||
|
assert.ElementsMatch(t, []string{"job0", "job1"}, maps.Keys(b.Config.Resources.Jobs))
|
||||||
|
|
||||||
|
if job0, ok := b.Config.Resources.Jobs["job0"]; ok {
|
||||||
|
assert.Equal(t, "job_0", job0.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
if job1, ok := b.Config.Resources.Jobs["job1"]; ok {
|
||||||
|
assert.Equal(t, "job_1", job1.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApplyPythonMutator_load_disallowed(t *testing.T) {
|
||||||
|
withFakeVEnv(t, ".venv")
|
||||||
|
|
||||||
|
b := loadYaml("databricks.yml", `
|
||||||
|
experimental:
|
||||||
|
pydabs:
|
||||||
|
enabled: true
|
||||||
|
venv_path: .venv
|
||||||
|
resources:
|
||||||
|
jobs:
|
||||||
|
job0:
|
||||||
|
name: job_0`)
|
||||||
|
|
||||||
|
ctx := withProcessStub(
|
||||||
|
[]string{
|
||||||
|
interpreterPath(".venv"),
|
||||||
|
"-m",
|
||||||
|
"databricks.bundles.build",
|
||||||
|
"--phase",
|
||||||
|
"load",
|
||||||
|
},
|
||||||
|
`{
|
||||||
|
"experimental": {
|
||||||
|
"pydabs": {
|
||||||
|
"enabled": true,
|
||||||
|
"venv_path": ".venv"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"resources": {
|
||||||
|
"jobs": {
|
||||||
|
"job0": {
|
||||||
|
name: "job_0",
|
||||||
|
description: "job description"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
|
||||||
|
mutator := ApplyPythonMutator(ApplyPythonMutatorPhaseLoad)
|
||||||
|
diag := bundle.Apply(ctx, b, mutator)
|
||||||
|
|
||||||
|
assert.EqualError(t, diag.Error(), "unexpected change at \"resources.jobs.job0.description\" (insert)")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApplyPythonMutator_init(t *testing.T) {
|
||||||
|
withFakeVEnv(t, ".venv")
|
||||||
|
|
||||||
|
b := loadYaml("databricks.yml", `
|
||||||
|
experimental:
|
||||||
|
pydabs:
|
||||||
|
enabled: true
|
||||||
|
venv_path: .venv
|
||||||
|
resources:
|
||||||
|
jobs:
|
||||||
|
job0:
|
||||||
|
name: job_0`)
|
||||||
|
|
||||||
|
ctx := withProcessStub(
|
||||||
|
[]string{
|
||||||
|
interpreterPath(".venv"),
|
||||||
|
"-m",
|
||||||
|
"databricks.bundles.build",
|
||||||
|
"--phase",
|
||||||
|
"init",
|
||||||
|
},
|
||||||
|
`{
|
||||||
|
"experimental": {
|
||||||
|
"pydabs": {
|
||||||
|
"enabled": true,
|
||||||
|
"venv_path": ".venv"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"resources": {
|
||||||
|
"jobs": {
|
||||||
|
"job0": {
|
||||||
|
name: "job_0",
|
||||||
|
description: "my job"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
|
||||||
|
mutator := ApplyPythonMutator(ApplyPythonMutatorPhaseInit)
|
||||||
|
diag := bundle.Apply(ctx, b, mutator)
|
||||||
|
|
||||||
|
assert.NoError(t, diag.Error())
|
||||||
|
|
||||||
|
assert.ElementsMatch(t, []string{"job0"}, maps.Keys(b.Config.Resources.Jobs))
|
||||||
|
assert.Equal(t, "job_0", b.Config.Resources.Jobs["job0"].Name)
|
||||||
|
assert.Equal(t, "my job", b.Config.Resources.Jobs["job0"].Description)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApplyPythonMutator_badOutput(t *testing.T) {
|
||||||
|
withFakeVEnv(t, ".venv")
|
||||||
|
|
||||||
|
b := loadYaml("databricks.yml", `
|
||||||
|
experimental:
|
||||||
|
pydabs:
|
||||||
|
enabled: true
|
||||||
|
venv_path: .venv
|
||||||
|
resources:
|
||||||
|
jobs:
|
||||||
|
job0:
|
||||||
|
name: job_0`)
|
||||||
|
|
||||||
|
ctx := withProcessStub(
|
||||||
|
[]string{
|
||||||
|
interpreterPath(".venv"),
|
||||||
|
"-m",
|
||||||
|
"databricks.bundles.build",
|
||||||
|
"--phase",
|
||||||
|
"load",
|
||||||
|
},
|
||||||
|
`{
|
||||||
|
"resources": {
|
||||||
|
"jobs": {
|
||||||
|
"job0": {
|
||||||
|
unknown_property: "my job"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}`)
|
||||||
|
|
||||||
|
mutator := ApplyPythonMutator(ApplyPythonMutatorPhaseLoad)
|
||||||
|
diag := bundle.Apply(ctx, b, mutator)
|
||||||
|
|
||||||
|
assert.EqualError(t, diag.Error(), "failed to normalize Python mutator output: unknown field: unknown_property")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApplyPythonMutator_disabled(t *testing.T) {
|
||||||
|
b := loadYaml("databricks.yml", ``)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
mutator := ApplyPythonMutator(ApplyPythonMutatorPhaseLoad)
|
||||||
|
diag := bundle.Apply(ctx, b, mutator)
|
||||||
|
|
||||||
|
assert.NoError(t, diag.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApplyPythonMutator_venvRequired(t *testing.T) {
|
||||||
|
b := loadYaml("databricks.yml", `
|
||||||
|
experimental:
|
||||||
|
pydabs:
|
||||||
|
enabled: true`)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
mutator := ApplyPythonMutator(ApplyPythonMutatorPhaseLoad)
|
||||||
|
diag := bundle.Apply(ctx, b, mutator)
|
||||||
|
|
||||||
|
assert.Error(t, diag.Error(), "\"experimental.enable_pydabs\" is enabled, but \"experimental.venv.path\" is not set")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestApplyPythonMutator_venvNotFound(t *testing.T) {
|
||||||
|
expectedError := fmt.Sprintf("can't find %q, check if venv is created", interpreterPath("bad_path"))
|
||||||
|
|
||||||
|
b := loadYaml("databricks.yml", `
|
||||||
|
experimental:
|
||||||
|
pydabs:
|
||||||
|
enabled: true
|
||||||
|
venv_path: bad_path`)
|
||||||
|
|
||||||
|
mutator := ApplyPythonMutator(ApplyPythonMutatorPhaseInit)
|
||||||
|
diag := bundle.Apply(context.Background(), b, mutator)
|
||||||
|
|
||||||
|
assert.EqualError(t, diag.Error(), expectedError)
|
||||||
|
}
|
||||||
|
|
||||||
|
type createOverrideVisitorTestCase struct {
|
||||||
|
name string
|
||||||
|
updatePath dyn.Path
|
||||||
|
deletePath dyn.Path
|
||||||
|
insertPath dyn.Path
|
||||||
|
phase phase
|
||||||
|
updateError error
|
||||||
|
deleteError error
|
||||||
|
insertError error
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCreateOverrideVisitor(t *testing.T) {
|
||||||
|
left := dyn.NewValue(42, dyn.Location{})
|
||||||
|
right := dyn.NewValue(1337, dyn.Location{})
|
||||||
|
|
||||||
|
testCases := []createOverrideVisitorTestCase{
|
||||||
|
{
|
||||||
|
name: "load: can't change an existing job",
|
||||||
|
phase: ApplyPythonMutatorPhaseLoad,
|
||||||
|
updatePath: dyn.MustPathFromString("resources.jobs.job0.name"),
|
||||||
|
deletePath: dyn.MustPathFromString("resources.jobs.job0.name"),
|
||||||
|
insertPath: dyn.MustPathFromString("resources.jobs.job0.name"),
|
||||||
|
deleteError: fmt.Errorf("unexpected change at \"resources.jobs.job0.name\" (delete)"),
|
||||||
|
insertError: fmt.Errorf("unexpected change at \"resources.jobs.job0.name\" (insert)"),
|
||||||
|
updateError: fmt.Errorf("unexpected change at \"resources.jobs.job0.name\" (update)"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "load: can't delete an existing job",
|
||||||
|
phase: ApplyPythonMutatorPhaseLoad,
|
||||||
|
deletePath: dyn.MustPathFromString("resources.jobs.job0"),
|
||||||
|
deleteError: fmt.Errorf("unexpected change at \"resources.jobs.job0\" (delete)"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "load: can insert a job",
|
||||||
|
phase: ApplyPythonMutatorPhaseLoad,
|
||||||
|
insertPath: dyn.MustPathFromString("resources.jobs.job0"),
|
||||||
|
insertError: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "load: can't change include",
|
||||||
|
phase: ApplyPythonMutatorPhaseLoad,
|
||||||
|
deletePath: dyn.MustPathFromString("include[0]"),
|
||||||
|
insertPath: dyn.MustPathFromString("include[0]"),
|
||||||
|
updatePath: dyn.MustPathFromString("include[0]"),
|
||||||
|
deleteError: fmt.Errorf("unexpected change at \"include[0]\" (delete)"),
|
||||||
|
insertError: fmt.Errorf("unexpected change at \"include[0]\" (insert)"),
|
||||||
|
updateError: fmt.Errorf("unexpected change at \"include[0]\" (update)"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "init: can change an existing job",
|
||||||
|
phase: ApplyPythonMutatorPhaseInit,
|
||||||
|
updatePath: dyn.MustPathFromString("resources.jobs.job0.name"),
|
||||||
|
deletePath: dyn.MustPathFromString("resources.jobs.job0.name"),
|
||||||
|
insertPath: dyn.MustPathFromString("resources.jobs.job0.name"),
|
||||||
|
deleteError: nil,
|
||||||
|
insertError: nil,
|
||||||
|
updateError: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "init: can't delete an existing job",
|
||||||
|
phase: ApplyPythonMutatorPhaseInit,
|
||||||
|
deletePath: dyn.MustPathFromString("resources.jobs.job0"),
|
||||||
|
deleteError: fmt.Errorf("unexpected change at \"resources.jobs.job0\" (delete)"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "init: can insert a job",
|
||||||
|
phase: ApplyPythonMutatorPhaseInit,
|
||||||
|
insertPath: dyn.MustPathFromString("resources.jobs.job0"),
|
||||||
|
insertError: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "init: can't change include",
|
||||||
|
phase: ApplyPythonMutatorPhaseInit,
|
||||||
|
deletePath: dyn.MustPathFromString("include[0]"),
|
||||||
|
insertPath: dyn.MustPathFromString("include[0]"),
|
||||||
|
updatePath: dyn.MustPathFromString("include[0]"),
|
||||||
|
deleteError: fmt.Errorf("unexpected change at \"include[0]\" (delete)"),
|
||||||
|
insertError: fmt.Errorf("unexpected change at \"include[0]\" (insert)"),
|
||||||
|
updateError: fmt.Errorf("unexpected change at \"include[0]\" (update)"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
visitor, err := createOverrideVisitor(context.Background(), tc.phase)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create visitor failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.updatePath != nil {
|
||||||
|
t.Run(tc.name+"-update", func(t *testing.T) {
|
||||||
|
out, err := visitor.VisitUpdate(tc.updatePath, left, right)
|
||||||
|
|
||||||
|
if tc.updateError != nil {
|
||||||
|
assert.Equal(t, tc.updateError, err)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, right, out)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.deletePath != nil {
|
||||||
|
t.Run(tc.name+"-delete", func(t *testing.T) {
|
||||||
|
err := visitor.VisitDelete(tc.deletePath, left)
|
||||||
|
|
||||||
|
if tc.deleteError != nil {
|
||||||
|
assert.Equal(t, tc.deleteError, err)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if tc.insertPath != nil {
|
||||||
|
t.Run(tc.name+"-insert", func(t *testing.T) {
|
||||||
|
out, err := visitor.VisitInsert(tc.insertPath, right)
|
||||||
|
|
||||||
|
if tc.insertError != nil {
|
||||||
|
assert.Equal(t, tc.insertError, err)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, right, out)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestInterpreterPath(t *testing.T) {
|
||||||
|
if runtime.GOOS == "windows" {
|
||||||
|
assert.Equal(t, "venv\\Scripts\\python3.exe", interpreterPath("venv"))
|
||||||
|
} else {
|
||||||
|
assert.Equal(t, "venv/bin/python3", interpreterPath("venv"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func withProcessStub(args []string, stdout string) context.Context {
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, stub := process.WithStub(ctx)
|
||||||
|
|
||||||
|
stub.WithCallback(func(actual *exec.Cmd) error {
|
||||||
|
if reflect.DeepEqual(actual.Args, args) {
|
||||||
|
_, err := actual.Stdout.Write([]byte(stdout))
|
||||||
|
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("unexpected command: %v", actual.Args)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadYaml(name string, content string) *bundle.Bundle {
|
||||||
|
v, diag := config.LoadFromBytes(name, []byte(content))
|
||||||
|
|
||||||
|
if diag.Error() != nil {
|
||||||
|
panic(diag.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
return &bundle.Bundle{
|
||||||
|
Config: *v,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func withFakeVEnv(t *testing.T, path string) {
|
||||||
|
interpreterPath := interpreterPath(path)
|
||||||
|
|
||||||
|
cwd, err := os.Getwd()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := os.Chdir(t.TempDir()); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = os.MkdirAll(filepath.Dir(interpreterPath), 0755)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = os.WriteFile(interpreterPath, []byte(""), 0755)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Cleanup(func() {
|
||||||
|
if err := os.Chdir(cwd); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -0,0 +1,42 @@
|
||||||
|
package python
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/databricks/cli/libs/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
type logWriter struct {
|
||||||
|
ctx context.Context
|
||||||
|
prefix string
|
||||||
|
buf bytes.Buffer
|
||||||
|
}
|
||||||
|
|
||||||
|
// newLogWriter creates a new io.Writer that writes to log with specified prefix.
|
||||||
|
func newLogWriter(ctx context.Context, prefix string) io.Writer {
|
||||||
|
return &logWriter{
|
||||||
|
ctx: ctx,
|
||||||
|
prefix: prefix,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *logWriter) Write(bytes []byte) (n int, err error) {
|
||||||
|
p.buf.Write(bytes)
|
||||||
|
|
||||||
|
scanner := bufio.NewScanner(&p.buf)
|
||||||
|
|
||||||
|
for scanner.Scan() {
|
||||||
|
line := scanner.Text()
|
||||||
|
|
||||||
|
log.Debugf(p.ctx, "%s%s", p.prefix, line)
|
||||||
|
}
|
||||||
|
|
||||||
|
remaining := p.buf.Bytes()
|
||||||
|
p.buf.Reset()
|
||||||
|
p.buf.Write(remaining)
|
||||||
|
|
||||||
|
return len(bytes), nil
|
||||||
|
}
|
|
@ -74,6 +74,10 @@ func Load(path string) (*Root, diag.Diagnostics) {
|
||||||
return nil, diag.FromErr(err)
|
return nil, diag.FromErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return LoadFromBytes(path, raw)
|
||||||
|
}
|
||||||
|
|
||||||
|
func LoadFromBytes(path string, raw []byte) (*Root, diag.Diagnostics) {
|
||||||
r := Root{}
|
r := Root{}
|
||||||
|
|
||||||
// Load configuration tree from YAML.
|
// Load configuration tree from YAML.
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"github.com/databricks/cli/bundle"
|
"github.com/databricks/cli/bundle"
|
||||||
"github.com/databricks/cli/bundle/config"
|
"github.com/databricks/cli/bundle/config"
|
||||||
"github.com/databricks/cli/bundle/config/mutator"
|
"github.com/databricks/cli/bundle/config/mutator"
|
||||||
|
pythonmutator "github.com/databricks/cli/bundle/config/mutator/python"
|
||||||
"github.com/databricks/cli/bundle/deploy/metadata"
|
"github.com/databricks/cli/bundle/deploy/metadata"
|
||||||
"github.com/databricks/cli/bundle/deploy/terraform"
|
"github.com/databricks/cli/bundle/deploy/terraform"
|
||||||
"github.com/databricks/cli/bundle/permissions"
|
"github.com/databricks/cli/bundle/permissions"
|
||||||
|
@ -28,6 +29,9 @@ func Initialize() bundle.Mutator {
|
||||||
mutator.ExpandWorkspaceRoot(),
|
mutator.ExpandWorkspaceRoot(),
|
||||||
mutator.DefineDefaultWorkspacePaths(),
|
mutator.DefineDefaultWorkspacePaths(),
|
||||||
mutator.SetVariables(),
|
mutator.SetVariables(),
|
||||||
|
// Intentionally placed before ResolveVariableReferencesInLookup, ResolveResourceReferences
|
||||||
|
// and ResolveVariableReferences. See what is expected in ApplyPythonMutatorPhaseInit doc
|
||||||
|
pythonmutator.ApplyPythonMutator(pythonmutator.ApplyPythonMutatorPhaseInit),
|
||||||
mutator.ResolveVariableReferencesInLookup(),
|
mutator.ResolveVariableReferencesInLookup(),
|
||||||
mutator.ResolveResourceReferences(),
|
mutator.ResolveResourceReferences(),
|
||||||
mutator.ResolveVariableReferences(
|
mutator.ResolveVariableReferences(
|
||||||
|
|
|
@ -48,6 +48,27 @@ func WithStdoutPipe(dst *io.ReadCloser) execOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithStdinReader(src io.Reader) execOption {
|
||||||
|
return func(_ context.Context, c *exec.Cmd) error {
|
||||||
|
c.Stdin = src
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithStderrWriter(dst io.Writer) execOption {
|
||||||
|
return func(_ context.Context, c *exec.Cmd) error {
|
||||||
|
c.Stderr = dst
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func WithStdoutWriter(dst io.Writer) execOption {
|
||||||
|
return func(_ context.Context, c *exec.Cmd) error {
|
||||||
|
c.Stdout = dst
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithCombinedOutput(buf *bytes.Buffer) execOption {
|
func WithCombinedOutput(buf *bytes.Buffer) execOption {
|
||||||
return func(_ context.Context, c *exec.Cmd) error {
|
return func(_ context.Context, c *exec.Cmd) error {
|
||||||
c.Stdout = io.MultiWriter(buf, c.Stdout)
|
c.Stdout = io.MultiWriter(buf, c.Stdout)
|
||||||
|
|
Loading…
Reference in New Issue