package python import ( "bytes" "context" "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "reflect" "strings" "github.com/databricks/databricks-sdk-go/logger" "github.com/fatih/color" "github.com/databricks/cli/libs/python" "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn/convert" "github.com/databricks/cli/libs/dyn/merge" "github.com/databricks/cli/libs/dyn/yamlloader" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/process" ) type phase string const ( // PythonMutatorPhaseLoad is the phase in which bundle configuration is loaded. // // At this stage, PyDABs adds statically defined resources to the bundle configuration. // Which resources are added should be deterministic and not depend on the bundle configuration. // // We also open for possibility of appending other sections of bundle configuration, // for example, adding new variables. However, this is not supported yet, and CLI rejects // such changes. // // Deprecated, left for backward-compatibility with PyDABs. PythonMutatorPhaseLoad phase = "load" // PythonMutatorPhaseInit is the phase after bundle configuration was loaded, and // the list of statically declared resources is known. // // At this stage, PyDABs adds resources defined using generators, or mutates existing resources, // including the ones defined using YAML. // // During this process, within generator and mutators, PyDABs can access: // - selected deployment target // - bundle variables values // - variables provided through CLI arguments or environment variables // // The following is not available: // - variables referencing other variables are in unresolved format // // PyDABs can output YAML containing references to variables, and CLI should resolve them. // // Existing resources can't be removed, and CLI rejects such changes. // // Deprecated, left for backward-compatibility with PyDABs. PythonMutatorPhaseInit phase = "init" // PythonMutatorPhaseLoadResources is the phase in which YAML configuration was loaded. // // At this stage, we execute Python code to load resources defined in Python. // // During this process, Python code can access: // - selected deployment target // - bundle variable values // - variables provided through CLI argument or environment variables // // The following is not available: // - variables referencing other variables are in unresolved format // // Python code can output YAML referencing variables, and CLI should resolve them. // // Existing resources can't be removed or modified, and CLI rejects such changes. // While it's called 'load_resources', this phase is executed in 'init' phase of mutator pipeline. PythonMutatorPhaseLoadResources phase = "load_resources" // PythonMutatorPhaseApplyMutators is the phase in which resources defined in YAML or Python // are already loaded. // // At this stage, we execute Python code to mutate resources defined in YAML or Python. // // During this process, Python code can access: // - selected deployment target // - bundle variable values // - variables provided through CLI argument or environment variables // // The following is not available: // - variables referencing other variables are in unresolved format // // Python code can output YAML referencing variables, and CLI should resolve them. // // Resources can't be added or removed, and CLI rejects such changes. Python code is // allowed to modify existing resources, but not other parts of bundle configuration. PythonMutatorPhaseApplyMutators phase = "apply_mutators" ) type pythonMutator struct { phase phase } func PythonMutator(phase phase) bundle.Mutator { return &pythonMutator{ phase: phase, } } func (m *pythonMutator) Name() string { return fmt.Sprintf("PythonMutator(%s)", m.phase) } // opts is a common structure for deprecated PyDABs and upcoming Python // configuration sections type opts struct { enabled bool venvPath string } // getOpts adapts deprecated PyDABs and upcoming Python configuration // into a common structure. func getOpts(b *bundle.Bundle, phase phase) (opts, error) { experimental := b.Config.Experimental if experimental == nil { return opts{}, nil } // using reflect.DeepEquals in case we add more fields pydabsEnabled := !reflect.DeepEqual(experimental.PyDABs, config.PyDABs{}) pythonEnabled := !reflect.DeepEqual(experimental.Python, config.Python{}) if pydabsEnabled && pythonEnabled { return opts{}, errors.New("both experimental/pydabs and experimental/python are enabled, only one can be enabled") } else if pydabsEnabled { if !experimental.PyDABs.Enabled { return opts{}, nil } // don't execute for phases for 'python' section if phase == PythonMutatorPhaseInit || phase == PythonMutatorPhaseLoad { return opts{ enabled: true, venvPath: experimental.PyDABs.VEnvPath, }, nil } else { return opts{}, nil } } else if pythonEnabled { // don't execute for phases for 'pydabs' section if phase == PythonMutatorPhaseLoadResources || phase == PythonMutatorPhaseApplyMutators { return opts{ enabled: true, venvPath: experimental.Python.VEnvPath, }, nil } else { return opts{}, nil } } else { return opts{}, nil } } func (m *pythonMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { opts, err := getOpts(b, m.phase) if err != nil { return diag.Errorf("failed to apply python mutator: %s", err) } if !opts.enabled { return nil } // mutateDiags is used because Mutate returns 'error' instead of 'diag.Diagnostics' var mutateDiags diag.Diagnostics mutateDiagsHasError := errors.New("unexpected error") err = b.Config.Mutate(func(leftRoot dyn.Value) (dyn.Value, error) { pythonPath, err := detectExecutable(ctx, opts.venvPath) if err != nil { return dyn.InvalidValue, fmt.Errorf("failed to get Python interpreter path: %w", err) } cacheDir, err := createCacheDir(ctx) if err != nil { return dyn.InvalidValue, fmt.Errorf("failed to create cache dir: %w", err) } rightRoot, diags := m.runPythonMutator(ctx, cacheDir, b.BundleRootPath, pythonPath, leftRoot) mutateDiags = diags if diags.HasError() { return dyn.InvalidValue, mutateDiagsHasError } visitor, err := createOverrideVisitor(ctx, m.phase) if err != nil { return dyn.InvalidValue, err } return merge.Override(leftRoot, rightRoot, visitor) }) if err == mutateDiagsHasError { if !mutateDiags.HasError() { panic("mutateDiags has no error, but error is expected") } return mutateDiags } return mutateDiags.Extend(diag.FromErr(err)) } func createCacheDir(ctx context.Context) (string, error) { // b.CacheDir doesn't work because target isn't yet selected // support the same env variable as in b.CacheDir if tempDir, exists := env.TempDir(ctx); exists { // use 'default' as target name cacheDir := filepath.Join(tempDir, "default", "python") err := os.MkdirAll(cacheDir, 0o700) if err != nil { return "", err } return cacheDir, nil } return os.MkdirTemp("", "-python") } func (m *pythonMutator) runPythonMutator(ctx context.Context, cacheDir, rootPath, pythonPath string, root dyn.Value) (dyn.Value, diag.Diagnostics) { inputPath := filepath.Join(cacheDir, "input.json") outputPath := filepath.Join(cacheDir, "output.json") diagnosticsPath := filepath.Join(cacheDir, "diagnostics.json") args := []string{ pythonPath, "-m", "databricks.bundles.build", "--phase", string(m.phase), "--input", inputPath, "--output", outputPath, "--diagnostics", diagnosticsPath, } if err := writeInputFile(inputPath, root); err != nil { return dyn.InvalidValue, diag.Errorf("failed to write input file: %s", err) } stderrBuf := bytes.Buffer{} stderrWriter := io.MultiWriter( newLogWriter(ctx, "stderr: "), &stderrBuf, ) stdoutWriter := newLogWriter(ctx, "stdout: ") _, processErr := process.Background( ctx, args, process.WithDir(rootPath), process.WithStderrWriter(stderrWriter), process.WithStdoutWriter(stdoutWriter), ) if processErr != nil { logger.Debugf(ctx, "python mutator process failed: %s", processErr) } pythonDiagnostics, pythonDiagnosticsErr := loadDiagnosticsFile(diagnosticsPath) if pythonDiagnosticsErr != nil { logger.Debugf(ctx, "failed to load diagnostics: %s", pythonDiagnosticsErr) } // if diagnostics file exists, it gives the most descriptive errors // if there is any error, we treat it as fatal error, and stop processing if pythonDiagnostics.HasError() { return dyn.InvalidValue, pythonDiagnostics } // process can fail without reporting errors in diagnostics file or creating it, for instance, // venv doesn't have 'databricks-bundles' library installed if processErr != nil { diagnostic := diag.Diagnostic{ Severity: diag.Error, Summary: fmt.Sprintf("python mutator process failed: %q, use --debug to enable logging", processErr), Detail: explainProcessErr(stderrBuf.String()), } return dyn.InvalidValue, diag.Diagnostics{diagnostic} } // or we can fail to read diagnostics file, that should always be created if pythonDiagnosticsErr != nil { return dyn.InvalidValue, diag.Errorf("failed to load diagnostics: %s", pythonDiagnosticsErr) } output, outputDiags := loadOutputFile(rootPath, outputPath) pythonDiagnostics = pythonDiagnostics.Extend(outputDiags) // we pass through pythonDiagnostic because it contains warnings return output, pythonDiagnostics } const pythonInstallExplanation = `Ensure that 'databricks-bundles' is installed in Python environment: $ .venv/bin/pip install databricks-bundles If using a virtual environment, ensure it is specified as the venv_path property in databricks.yml, or activate the environment before running CLI commands: experimental: python: venv_path: .venv ` // explainProcessErr provides additional explanation for common errors. // It's meant to be the best effort, and not all errors are covered. // Output should be used only used for error reporting. func explainProcessErr(stderr string) string { // implemented in cpython/Lib/runpy.py and portable across Python 3.x, including pypy if strings.Contains(stderr, "Error while finding module specification for 'databricks.bundles.build'") { summary := color.CyanString("Explanation: ") + "'databricks-bundles' library is not installed in the Python environment.\n" return stderr + "\n" + summary + "\n" + pythonInstallExplanation } return stderr } func writeInputFile(inputPath string, input dyn.Value) error { // we need to marshal dyn.Value instead of bundle.Config to JSON to support // non-string fields assigned with bundle variables rootConfigJson, err := json.Marshal(input.AsAny()) if err != nil { return fmt.Errorf("failed to marshal input: %w", err) } return os.WriteFile(inputPath, rootConfigJson, 0o600) } func loadOutputFile(rootPath, outputPath string) (dyn.Value, diag.Diagnostics) { outputFile, err := os.Open(outputPath) if err != nil { return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to open output file: %w", err)) } defer outputFile.Close() // we need absolute path because later parts of pipeline assume all paths are absolute // and this file will be used as location to resolve relative paths. // // virtualPath has to stay in rootPath, because locations outside root path are not allowed: // // Error: path /var/folders/.../python/dist/*.whl is not contained in bundle root path // // for that, we pass virtualPath instead of outputPath as file location virtualPath, err := filepath.Abs(filepath.Join(rootPath, "__generated_by_python__.yml")) if err != nil { return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to get absolute path: %w", err)) } generated, err := yamlloader.LoadYAML(virtualPath, outputFile) if err != nil { return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to parse output file: %w", err)) } return strictNormalize(config.Root{}, generated) } func strictNormalize(dst any, generated dyn.Value) (dyn.Value, diag.Diagnostics) { normalized, diags := convert.Normalize(dst, generated) // warnings shouldn't happen because output should be already normalized // when it happens, it's a bug in the mutator, and should be treated as an error strictDiags := diag.Diagnostics{} for _, d := range diags { if d.Severity == diag.Warning { d.Severity = diag.Error } strictDiags = strictDiags.Append(d) } return normalized, strictDiags } // loadDiagnosticsFile loads diagnostics from a file. // // It contains a list of warnings and errors that we should print to users. // // If the file doesn't exist, we return an error. We expect the file to always be // created by the Python mutator, and it's absence means there are integration problems, // and the diagnostics file was lost. If we treat non-existence as an empty diag.Diagnostics // we risk loosing errors and warnings. func loadDiagnosticsFile(path string) (diag.Diagnostics, error) { file, err := os.Open(path) if err != nil { return nil, fmt.Errorf("failed to open diagnostics file: %w", err) } defer file.Close() return parsePythonDiagnostics(file) } func createOverrideVisitor(ctx context.Context, phase phase) (merge.OverrideVisitor, error) { switch phase { case PythonMutatorPhaseLoad: return createLoadResourcesOverrideVisitor(ctx), nil case PythonMutatorPhaseInit: return createInitOverrideVisitor(ctx, insertResourceModeAllow), nil case PythonMutatorPhaseLoadResources: return createLoadResourcesOverrideVisitor(ctx), nil case PythonMutatorPhaseApplyMutators: return createInitOverrideVisitor(ctx, insertResourceModeDisallow), nil default: return merge.OverrideVisitor{}, fmt.Errorf("unknown phase: %s", phase) } } // createLoadResourcesOverrideVisitor creates an override visitor for the load_resources phase. // // During load_resources, it's only possible to create new resources, and not modify or // delete existing ones. func createLoadResourcesOverrideVisitor(ctx context.Context) merge.OverrideVisitor { resourcesPath := dyn.NewPath(dyn.Key("resources")) jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs")) return merge.OverrideVisitor{ VisitDelete: func(valuePath dyn.Path, left dyn.Value) error { if isOmitemptyDelete(left) { return merge.ErrOverrideUndoDelete } return fmt.Errorf("unexpected change at %q (delete)", valuePath.String()) }, VisitInsert: func(valuePath dyn.Path, right dyn.Value) (dyn.Value, error) { // insert 'resources' or 'resources.jobs' if it didn't exist before if valuePath.Equal(resourcesPath) || valuePath.Equal(jobsPath) { return right, nil } if !valuePath.HasPrefix(jobsPath) { return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (insert)", valuePath.String()) } insertResource := len(valuePath) == len(jobsPath)+1 // adding a property into an existing resource is not allowed, because it changes it if !insertResource { return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (insert)", valuePath.String()) } log.Debugf(ctx, "Insert value at %q", valuePath.String()) return right, nil }, VisitUpdate: func(valuePath dyn.Path, left, right dyn.Value) (dyn.Value, error) { return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (update)", valuePath.String()) }, } } // insertResourceMode controls whether createInitOverrideVisitor allows or disallows inserting new resources. type insertResourceMode int const ( insertResourceModeDisallow insertResourceMode = iota insertResourceModeAllow insertResourceMode = iota ) // createInitOverrideVisitor creates an override visitor for the init phase. // // During the init phase it's possible to create new resources, modify existing // resources, but not delete existing resources. // // If mode is insertResourceModeDisallow, it matching expected behaviour of apply_mutators func createInitOverrideVisitor(ctx context.Context, mode insertResourceMode) merge.OverrideVisitor { resourcesPath := dyn.NewPath(dyn.Key("resources")) jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs")) return merge.OverrideVisitor{ VisitDelete: func(valuePath dyn.Path, left dyn.Value) error { if isOmitemptyDelete(left) { return merge.ErrOverrideUndoDelete } if !valuePath.HasPrefix(jobsPath) { return fmt.Errorf("unexpected change at %q (delete)", valuePath.String()) } deleteResource := len(valuePath) == len(jobsPath)+1 if deleteResource { return fmt.Errorf("unexpected change at %q (delete)", valuePath.String()) } // deleting properties is allowed because it only changes an existing resource log.Debugf(ctx, "Delete value at %q", valuePath.String()) return nil }, VisitInsert: func(valuePath dyn.Path, right dyn.Value) (dyn.Value, error) { // insert 'resources' or 'resources.jobs' if it didn't exist before if valuePath.Equal(resourcesPath) || valuePath.Equal(jobsPath) { return right, nil } if !valuePath.HasPrefix(jobsPath) { return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (insert)", valuePath.String()) } insertResource := len(valuePath) == len(jobsPath)+1 if mode == insertResourceModeDisallow && insertResource { return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (insert)", valuePath.String()) } log.Debugf(ctx, "Insert value at %q", valuePath.String()) return right, nil }, VisitUpdate: func(valuePath dyn.Path, left, right dyn.Value) (dyn.Value, error) { if !valuePath.HasPrefix(jobsPath) { return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (update)", valuePath.String()) } log.Debugf(ctx, "Update value at %q", valuePath.String()) return right, nil }, } } func isOmitemptyDelete(left dyn.Value) bool { // Python output can omit empty sequences/mappings, because we don't track them as optional, // there is no semantic difference between empty and missing, so we keep them as they were before // Python mutator deleted them. switch left.Kind() { case dyn.KindMap: return left.MustMap().Len() == 0 case dyn.KindSequence: return len(left.MustSequence()) == 0 case dyn.KindNil: // map/sequence can be nil, for instance, bad YAML like: `foo:` return true default: return false } } // detectExecutable lookups Python interpreter in virtual environment, or if not set, in PATH. func detectExecutable(ctx context.Context, venvPath string) (string, error) { if venvPath == "" { interpreter, err := python.DetectExecutable(ctx) if err != nil { return "", err } return interpreter, nil } return python.DetectVEnvExecutable(venvPath) }