databricks-cli/bundle/config/mutator/python/python_mutator.go

537 lines
17 KiB
Go
Raw Normal View History

package python
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"github.com/databricks/cli/bundle/config/mutator/paths"
"github.com/databricks/databricks-sdk-go/logger"
"github.com/fatih/color"
"strings"
"github.com/databricks/cli/libs/python"
"github.com/databricks/cli/bundle/env"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/dyn/merge"
"github.com/databricks/cli/libs/dyn/yamlloader"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/process"
)
type phase string
const (
// PythonMutatorPhaseLoad is the phase in which bundle configuration is loaded.
//
// At this stage, PyDABs adds statically defined resources to the bundle configuration.
// Which resources are added should be deterministic and not depend on the bundle configuration.
//
// We also open for possibility of appending other sections of bundle configuration,
// for example, adding new variables. However, this is not supported yet, and CLI rejects
// such changes.
PythonMutatorPhaseLoad phase = "load"
// PythonMutatorPhaseInit is the phase after bundle configuration was loaded, and
// the list of statically declared resources is known.
//
// At this stage, PyDABs adds resources defined using generators, or mutates existing resources,
// including the ones defined using YAML.
//
// During this process, within generator and mutators, PyDABs can access:
// - selected deployment target
// - bundle variables values
// - variables provided through CLI arguments or environment variables
//
// The following is not available:
// - variables referencing other variables are in unresolved format
//
// PyDABs can output YAML containing references to variables, and CLI should resolve them.
//
// Existing resources can't be removed, and CLI rejects such changes.
PythonMutatorPhaseInit phase = "init"
)
type pythonMutator struct {
phase phase
}
func PythonMutator(phase phase) bundle.Mutator {
return &pythonMutator{
phase: phase,
}
}
func (m *pythonMutator) Name() string {
return fmt.Sprintf("PythonMutator(%s)", m.phase)
}
func getExperimental(b *bundle.Bundle) config.Experimental {
if b.Config.Experimental == nil {
return config.Experimental{}
}
return *b.Config.Experimental
}
func (m *pythonMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
experimental := getExperimental(b)
if !experimental.PyDABs.Enabled {
return nil
}
// mutateDiags is used because Mutate returns 'error' instead of 'diag.Diagnostics'
var mutateDiags diag.Diagnostics
var mutateDiagsHasError = errors.New("unexpected error")
err := b.Config.Mutate(func(leftRoot dyn.Value) (dyn.Value, error) {
pythonPath, err := detectExecutable(ctx, experimental.PyDABs.VEnvPath)
if err != nil {
return dyn.InvalidValue, fmt.Errorf("failed to get Python interpreter path: %w", err)
}
cacheDir, err := createCacheDir(ctx)
if err != nil {
return dyn.InvalidValue, fmt.Errorf("failed to create cache dir: %w", err)
}
rightRoot, diags := m.runPythonMutator(ctx, leftRoot, runPythonMutatorOpts{
cacheDir: cacheDir,
rootPath: b.BundleRootPath,
pythonPath: pythonPath,
loadLocations: experimental.PyDABs.LoadLocations,
})
mutateDiags = diags
if diags.HasError() {
return dyn.InvalidValue, mutateDiagsHasError
}
visitor, err := createOverrideVisitor(ctx, m.phase)
if err != nil {
return dyn.InvalidValue, err
}
return merge.Override(leftRoot, rightRoot, visitor)
})
if err == mutateDiagsHasError {
if !mutateDiags.HasError() {
panic("mutateDiags has no error, but error is expected")
}
return mutateDiags
}
return mutateDiags.Extend(diag.FromErr(err))
}
func createCacheDir(ctx context.Context) (string, error) {
// b.CacheDir doesn't work because target isn't yet selected
// support the same env variable as in b.CacheDir
if tempDir, exists := env.TempDir(ctx); exists {
// use 'default' as target name
cacheDir := filepath.Join(tempDir, "default", "pydabs")
err := os.MkdirAll(cacheDir, 0700)
if err != nil {
return "", err
}
return cacheDir, nil
}
return os.MkdirTemp("", "-pydabs")
}
type runPythonMutatorOpts struct {
cacheDir string
rootPath string
pythonPath string
loadLocations bool
}
func (m *pythonMutator) runPythonMutator(ctx context.Context, root dyn.Value, opts runPythonMutatorOpts) (dyn.Value, diag.Diagnostics) {
inputPath := filepath.Join(opts.cacheDir, "input.json")
outputPath := filepath.Join(opts.cacheDir, "output.json")
diagnosticsPath := filepath.Join(opts.cacheDir, "diagnostics.json")
locationsPath := filepath.Join(opts.cacheDir, "locations.json")
args := []string{
opts.pythonPath,
"-m",
"databricks.bundles.build",
"--phase",
string(m.phase),
"--input",
inputPath,
"--output",
outputPath,
"--diagnostics",
diagnosticsPath,
}
if opts.loadLocations {
args = append(args, "--locations", locationsPath)
}
if err := writeInputFile(inputPath, root); err != nil {
return dyn.InvalidValue, diag.Errorf("failed to write input file: %s", err)
}
stderrBuf := bytes.Buffer{}
stderrWriter := io.MultiWriter(
newLogWriter(ctx, "stderr: "),
&stderrBuf,
)
stdoutWriter := newLogWriter(ctx, "stdout: ")
_, processErr := process.Background(
ctx,
args,
process.WithDir(opts.rootPath),
process.WithStderrWriter(stderrWriter),
process.WithStdoutWriter(stdoutWriter),
)
if processErr != nil {
logger.Debugf(ctx, "python mutator process failed: %s", processErr)
}
pythonDiagnostics, pythonDiagnosticsErr := loadDiagnosticsFile(diagnosticsPath)
if pythonDiagnosticsErr != nil {
logger.Debugf(ctx, "failed to load diagnostics: %s", pythonDiagnosticsErr)
}
// if diagnostics file exists, it gives the most descriptive errors
// if there is any error, we treat it as fatal error, and stop processing
if pythonDiagnostics.HasError() {
return dyn.InvalidValue, pythonDiagnostics
}
// process can fail without reporting errors in diagnostics file or creating it, for instance,
// venv doesn't have PyDABs library installed
if processErr != nil {
diagnostic := diag.Diagnostic{
Severity: diag.Error,
Summary: fmt.Sprintf("python mutator process failed: %q, use --debug to enable logging", processErr),
Detail: explainProcessErr(stderrBuf.String()),
}
return dyn.InvalidValue, diag.Diagnostics{diagnostic}
}
// or we can fail to read diagnostics file, that should always be created
if pythonDiagnosticsErr != nil {
return dyn.InvalidValue, diag.Errorf("failed to load diagnostics: %s", pythonDiagnosticsErr)
}
locations, err := loadLocationsFile(locationsPath)
if err != nil {
return dyn.InvalidValue, diag.Errorf("failed to load locations: %s", err)
}
output, outputDiags := loadOutputFile(opts.rootPath, outputPath, locations)
pythonDiagnostics = pythonDiagnostics.Extend(outputDiags)
// we pass through pythonDiagnostic because it contains warnings
return output, pythonDiagnostics
}
const installExplanation = `If using Python wheels, ensure that 'databricks-pydabs' is included in the dependencies,
and that the wheel is installed in the Python environment:
$ .venv/bin/pip install -e .
If using a virtual environment, ensure it is specified as the venv_path property in databricks.yml,
or activate the environment before running CLI commands:
experimental:
pydabs:
venv_path: .venv
`
// explainProcessErr provides additional explanation for common errors.
// It's meant to be the best effort, and not all errors are covered.
// Output should be used only used for error reporting.
func explainProcessErr(stderr string) string {
// implemented in cpython/Lib/runpy.py and portable across Python 3.x, including pypy
if strings.Contains(stderr, "Error while finding module specification for 'databricks.bundles.build'") {
summary := color.CyanString("Explanation: ") + "'databricks-pydabs' library is not installed in the Python environment.\n"
return stderr + "\n" + summary + "\n" + installExplanation
}
return stderr
}
func writeInputFile(inputPath string, input dyn.Value) error {
// we need to marshal dyn.Value instead of bundle.Config to JSON to support
// non-string fields assigned with bundle variables
rootConfigJson, err := json.Marshal(input.AsAny())
if err != nil {
return fmt.Errorf("failed to marshal input: %w", err)
}
return os.WriteFile(inputPath, rootConfigJson, 0600)
}
// loadLocationsFile loads locations.json containing source locations for generated YAML.
func loadLocationsFile(locationsPath string) (*pythonLocations, error) {
if _, err := os.Stat(locationsPath); os.IsNotExist(err) {
return newPythonLocations(), nil
}
locationsFile, err := os.Open(locationsPath)
if err != nil {
return nil, fmt.Errorf("failed to open locations file: %w", err)
}
defer locationsFile.Close()
return parsePythonLocations(locationsFile)
}
func loadOutputFile(rootPath string, outputPath string, locations *pythonLocations) (dyn.Value, diag.Diagnostics) {
outputFile, err := os.Open(outputPath)
if err != nil {
return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to open output file: %w", err))
}
defer outputFile.Close()
// we need absolute path because later parts of pipeline assume all paths are absolute
// and this file will be used as location to resolve relative paths.
//
// virtualPath has to stay in rootPath, because locations outside root path are not allowed:
//
// Error: path /var/folders/.../pydabs/dist/*.whl is not contained in bundle root path
//
// for that, we pass virtualPath instead of outputPath as file location
virtualPath, err := filepath.Abs(filepath.Join(rootPath, generatedFileName))
if err != nil {
return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to get absolute path: %w", err))
}
generated, err := yamlloader.LoadYAML(virtualPath, outputFile)
if err != nil {
return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to parse output file: %w", err))
}
// paths are resolved relative to locations of their values, if we change location
// we have to update each path, until we simplify that, we don't update locations
// for such values, so we don't change how paths are resolved
_, err = paths.VisitJobPaths(generated, func(p dyn.Path, kind paths.PathKind, v dyn.Value) (dyn.Value, error) {
putPythonLocation(locations, p, v.Location())
return v, nil
})
if err != nil {
return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to update locations: %w", err))
}
// generated has dyn.Location as if it comes from generated YAML file
// earlier we loaded locations.json with source locations in Python code
generatedWithLocations, err := mergePythonLocations(generated, locations)
if err != nil {
return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to update locations: %w", err))
}
return strictNormalize(config.Root{}, generatedWithLocations)
}
func strictNormalize(dst any, generated dyn.Value) (dyn.Value, diag.Diagnostics) {
normalized, diags := convert.Normalize(dst, generated)
// warnings shouldn't happen because output should be already normalized
// when it happens, it's a bug in the mutator, and should be treated as an error
strictDiags := diag.Diagnostics{}
for _, d := range diags {
if d.Severity == diag.Warning {
d.Severity = diag.Error
}
strictDiags = strictDiags.Append(d)
}
return normalized, strictDiags
}
// loadDiagnosticsFile loads diagnostics from a file.
//
// It contains a list of warnings and errors that we should print to users.
//
// If the file doesn't exist, we return an error. We expect the file to always be
// created by the Python mutator, and it's absence means there are integration problems,
// and the diagnostics file was lost. If we treat non-existence as an empty diag.Diagnostics
// we risk loosing errors and warnings.
func loadDiagnosticsFile(path string) (diag.Diagnostics, error) {
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("failed to open diagnostics file: %w", err)
}
defer file.Close()
return parsePythonDiagnostics(file)
}
func createOverrideVisitor(ctx context.Context, phase phase) (merge.OverrideVisitor, error) {
switch phase {
case PythonMutatorPhaseLoad:
return createLoadOverrideVisitor(ctx), nil
case PythonMutatorPhaseInit:
return createInitOverrideVisitor(ctx), nil
default:
return merge.OverrideVisitor{}, fmt.Errorf("unknown phase: %s", phase)
}
}
// createLoadOverrideVisitor creates an override visitor for the load phase.
//
// During load, it's only possible to create new resources, and not modify or
// delete existing ones.
func createLoadOverrideVisitor(ctx context.Context) merge.OverrideVisitor {
resourcesPath := dyn.NewPath(dyn.Key("resources"))
jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"))
return merge.OverrideVisitor{
VisitDelete: func(valuePath dyn.Path, left dyn.Value) error {
if isOmitemptyDelete(left) {
return merge.ErrOverrideUndoDelete
}
return fmt.Errorf("unexpected change at %q (delete)", valuePath.String())
},
VisitInsert: func(valuePath dyn.Path, right dyn.Value) (dyn.Value, error) {
// insert 'resources' or 'resources.jobs' if it didn't exist before
if valuePath.Equal(resourcesPath) || valuePath.Equal(jobsPath) {
return right, nil
}
if !valuePath.HasPrefix(jobsPath) {
return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (insert)", valuePath.String())
}
insertResource := len(valuePath) == len(jobsPath)+1
// adding a property into an existing resource is not allowed, because it changes it
if !insertResource {
return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (insert)", valuePath.String())
}
log.Debugf(ctx, "Insert value at %q", valuePath.String())
return right, nil
},
VisitUpdate: func(valuePath dyn.Path, left dyn.Value, right dyn.Value) (dyn.Value, error) {
return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (update)", valuePath.String())
},
}
}
// createInitOverrideVisitor creates an override visitor for the init phase.
//
// During the init phase it's possible to create new resources, modify existing
// resources, but not delete existing resources.
func createInitOverrideVisitor(ctx context.Context) merge.OverrideVisitor {
resourcesPath := dyn.NewPath(dyn.Key("resources"))
jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"))
return merge.OverrideVisitor{
VisitDelete: func(valuePath dyn.Path, left dyn.Value) error {
if isOmitemptyDelete(left) {
return merge.ErrOverrideUndoDelete
}
if !valuePath.HasPrefix(jobsPath) {
return fmt.Errorf("unexpected change at %q (delete)", valuePath.String())
}
deleteResource := len(valuePath) == len(jobsPath)+1
if deleteResource {
return fmt.Errorf("unexpected change at %q (delete)", valuePath.String())
}
// deleting properties is allowed because it only changes an existing resource
log.Debugf(ctx, "Delete value at %q", valuePath.String())
return nil
},
VisitInsert: func(valuePath dyn.Path, right dyn.Value) (dyn.Value, error) {
// insert 'resources' or 'resources.jobs' if it didn't exist before
if valuePath.Equal(resourcesPath) || valuePath.Equal(jobsPath) {
return right, nil
}
if !valuePath.HasPrefix(jobsPath) {
return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (insert)", valuePath.String())
}
log.Debugf(ctx, "Insert value at %q", valuePath.String())
return right, nil
},
VisitUpdate: func(valuePath dyn.Path, left dyn.Value, right dyn.Value) (dyn.Value, error) {
if !valuePath.HasPrefix(jobsPath) {
return dyn.InvalidValue, fmt.Errorf("unexpected change at %q (update)", valuePath.String())
}
log.Debugf(ctx, "Update value at %q", valuePath.String())
return right, nil
},
}
}
func isOmitemptyDelete(left dyn.Value) bool {
// PyDABs can omit empty sequences/mappings in output, because we don't track them as optional,
// there is no semantic difference between empty and missing, so we keep them as they were before
// PyDABs deleted them.
switch left.Kind() {
case dyn.KindMap:
return left.MustMap().Len() == 0
case dyn.KindSequence:
return len(left.MustSequence()) == 0
case dyn.KindNil:
// map/sequence can be nil, for instance, bad YAML like: `foo:<eof>`
return true
default:
return false
}
}
// detectExecutable lookups Python interpreter in virtual environment, or if not set, in PATH.
func detectExecutable(ctx context.Context, venvPath string) (string, error) {
if venvPath == "" {
interpreter, err := python.DetectExecutable(ctx)
if err != nil {
return "", err
}
return interpreter, nil
}
return python.DetectVEnvExecutable(venvPath)
}