Compare commits

...

8 Commits

Author SHA1 Message Date
Gleb Kanterov 886762c82d
Merge 96a6cef0d6 into b323703c1b 2024-11-22 16:55:31 +00:00
shreyas-goenka b323703c1b
Add validation for single node clusters (#1909)
## Changes
This PR adds a warning validating that the configuration for a single
node cluster is valid for interactive, job, job-task, and pipeline
clusters.

Note: We skip the validation if a cluster policy is configured because
the policy is likely to configure `spark_conf` / `custom_tags` itself.

Note: Terrform originally only had validation for interactive, job, and
job-task clusters. This PR adding the validation for pipeline clusters
as well is new.

This PR follows the same logic as we used to have in Terraform. The
validation was removed from Terraform because we had no way to demote
the error to a warning:
https://github.com/databricks/terraform-provider-databricks/pull/4222

### Background
Single-node clusters require `spark_conf` and `custom_tags` to be
correctly set in the cluster definition for them to function optimally.
The cluster will be created even if incorrectly configured, but its
performance will not be great.

For example, if both `spark_conf` and `custom_tags` are not set and
`num_workers` is 0, then only the driver process will be launched on the
cluster compute instance thus leading to sub-optimal utilization of
available compute resources and no parallelization across worker
processes when processing a spark query.

### Issue

This PR addresses some issues reported in
https://github.com/databricks/cli/issues/1546

## Tests
Unit tests and manually.

Example output of the warning:
```
➜  bundle-playground git:(master) ✗ cli bundle validate
Warning: Single node cluster is not correctly configured
  at resources.pipelines.bar.clusters[0]
  in databricks.yml:29:11

num_workers should be 0 only for single-node clusters. To create a
valid single node cluster please ensure that the following properties
are correctly set in the cluster specification:

  spark_conf:
    spark.databricks.cluster.profile: singleNode
    spark.master: local[*]

  custom_tags:
    ResourceClass: SingleNode
  

Name: foobar
Target: default
Workspace:
  User: shreyas.goenka@databricks.com
  Path: /Workspace/Users/shreyas.goenka@databricks.com/.bundle/foobar/default

Found 1 warning
```
2024-11-22 15:48:09 +00:00
Ilya Kuznetsov 490dd058aa
Extended message for warning when source-linked mode is used outside of the workspace (#1929)
## Changes

Added path and locations to the warning which displayed when
source-linked mode is used outside of the workspace
2024-11-22 14:44:33 +00:00
Gleb Kanterov 96a6cef0d6
Address feedbacK 2024-10-08 10:28:51 +02:00
Gleb Kanterov bfb13afa8e
Address more feedback 2024-10-08 10:26:53 +02:00
Gleb Kanterov 43ce278299
Rename bundle root path 2024-10-08 10:18:52 +02:00
Gleb Kanterov df61375995
Address CR comments 2024-10-08 10:18:37 +02:00
Gleb Kanterov 3438455459
PythonMutator: propagate source locations 2024-10-08 10:18:36 +02:00
11 changed files with 1161 additions and 32 deletions

View File

@ -45,6 +45,12 @@ type PyDABs struct {
// These packages are imported to discover resources, resource generators, and mutators.
// This list can include namespace packages, which causes the import of nested packages.
Import []string `json:"import,omitempty"`
// LoadLocations is a flag to enable loading Python source locations from the PyDABs.
//
// Locations are only supported since PyDABs 0.6.0, and because of that,
// this flag is disabled by default.
LoadLocations bool `json:"load_locations,omitempty"`
}
type Command string

View File

@ -225,9 +225,21 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
if config.IsExplicitlyEnabled((b.Config.Presets.SourceLinkedDeployment)) {
isDatabricksWorkspace := dbr.RunsOnRuntime(ctx) && strings.HasPrefix(b.SyncRootPath, "/Workspace/")
if !isDatabricksWorkspace {
target := b.Config.Bundle.Target
path := dyn.NewPath(dyn.Key("targets"), dyn.Key(target), dyn.Key("presets"), dyn.Key("source_linked_deployment"))
diags = diags.Append(
diag.Diagnostic{
Severity: diag.Warning,
Summary: "source-linked deployment is available only in the Databricks Workspace",
Paths: []dyn.Path{
path,
},
Locations: b.Config.GetLocations(path[2:].String()),
},
)
disabled := false
b.Config.Presets.SourceLinkedDeployment = &disabled
diags = diags.Extend(diag.Warningf("source-linked deployment is available only in the Databricks Workspace"))
}
}

View File

@ -9,7 +9,9 @@ import (
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/libs/dbr"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
@ -435,6 +437,7 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) {
},
}
bundletest.SetLocation(b, "presets.source_linked_deployment", []dyn.Location{{File: "databricks.yml"}})
diags := bundle.Apply(tt.ctx, b, mutator.ApplyPresets())
if diags.HasError() {
t.Fatalf("unexpected error: %v", diags)
@ -442,6 +445,7 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) {
if tt.expectedWarning != "" {
require.Equal(t, tt.expectedWarning, diags[0].Summary)
require.NotEmpty(t, diags[0].Locations)
}
require.Equal(t, tt.expectedValue, b.Config.Presets.SourceLinkedDeployment)

View File

@ -9,6 +9,7 @@ import (
"github.com/databricks/cli/libs/dyn"
)
// pythonDiagnostic is a single entry in diagnostics.json
type pythonDiagnostic struct {
Severity pythonSeverity `json:"severity"`
Summary string `json:"summary"`

View File

@ -0,0 +1,181 @@
package python
import (
"encoding/json"
"fmt"
"io"
"path/filepath"
"github.com/databricks/cli/libs/dyn"
)
// generatedFileName is used as the virtual file name for YAML generated by PyDABs.
//
// mergePythonLocations replaces dyn.Location with generatedFileName with locations loaded
// from locations.json
const generatedFileName = "__generated_by_pydabs__.yml"
// pythonLocations is data structure for efficient location lookup for a given path
//
// Locations form a tree, and we assign locations of the closest ancestor to each dyn.Value based on its path.
// We implement it as a trie (prefix tree) where keys are components of the path. With that, lookups are O(n)
// where n is the number of components in the path.
//
// For example, with locations.json:
//
// {"path": "resources.jobs.job_0", "file": "src/examples/job_0.py", "line": 3, "column": 5}
// {"path": "resources.jobs.job_0.tasks[0].task_key", "file": "src/examples/job_0.py", "line": 10, "column": 5}
// {"path": "resources.jobs.job_1", "file": "src/examples/job_1.py", "line": 5, "column": 7}
//
// - resources.jobs.job_0.tasks[0].task_key is located at job_0.py:10:5
//
// - resources.jobs.job_0.tasks[0].email_notifications is located at job_0.py:3:5,
// because we use the location of the job as the most precise approximation.
type pythonLocations struct {
// descendants referenced by index, e.g. '.foo'
keys map[string]*pythonLocations
// descendants referenced by key, e.g. '[0]'
indexes map[int]*pythonLocations
// location for the current node if it exists
location dyn.Location
// if true, location is present
exists bool
}
// pythonLocationEntry is a single entry in locations.json
type pythonLocationEntry struct {
Path string `json:"path"`
File string `json:"file"`
Line int `json:"line"`
Column int `json:"column"`
}
// mergePythonLocations applies locations from Python mutator into given dyn.Value
//
// The primary use-case is to merge locations.json with output.json, so that any
// validation errors will point to Python source code instead of generated YAML.
func mergePythonLocations(value dyn.Value, locations *pythonLocations) (dyn.Value, error) {
return dyn.Walk(value, func(path dyn.Path, value dyn.Value) (dyn.Value, error) {
newLocation, ok := findPythonLocation(locations, path)
if !ok {
return value, nil
}
var newLocations []dyn.Location
// the first item in the list is the "last" location used for error reporting
newLocations = append(newLocations, newLocation)
for _, location := range value.Locations() {
// When loaded, dyn.Value created by PyDABs use the virtual file path as their location,
// we replace it with newLocation.
if filepath.Base(location.File) == generatedFileName {
continue
}
newLocations = append(newLocations, location)
}
return value.WithLocations(newLocations), nil
})
}
// parsePythonLocations parses locations.json from the Python mutator.
//
// locations file is newline-separated JSON objects with pythonLocationEntry structure.
func parsePythonLocations(input io.Reader) (*pythonLocations, error) {
decoder := json.NewDecoder(input)
locations := newPythonLocations()
for decoder.More() {
var entry pythonLocationEntry
err := decoder.Decode(&entry)
if err != nil {
return nil, fmt.Errorf("failed to parse python location: %s", err)
}
path, err := dyn.NewPathFromString(entry.Path)
if err != nil {
return nil, fmt.Errorf("failed to parse python location: %s", err)
}
location := dyn.Location{
File: entry.File,
Line: entry.Line,
Column: entry.Column,
}
putPythonLocation(locations, path, location)
}
return locations, nil
}
// putPythonLocation puts the location to the trie for the given path
func putPythonLocation(trie *pythonLocations, path dyn.Path, location dyn.Location) {
var currentNode = trie
for _, component := range path {
if key := component.Key(); key != "" {
if _, ok := currentNode.keys[key]; !ok {
currentNode.keys[key] = newPythonLocations()
}
currentNode = currentNode.keys[key]
} else {
index := component.Index()
if _, ok := currentNode.indexes[index]; !ok {
currentNode.indexes[index] = newPythonLocations()
}
currentNode = currentNode.indexes[index]
}
}
currentNode.location = location
currentNode.exists = true
}
// newPythonLocations creates a new trie node
func newPythonLocations() *pythonLocations {
return &pythonLocations{
keys: make(map[string]*pythonLocations),
indexes: make(map[int]*pythonLocations),
}
}
// findPythonLocation finds the location or closest ancestor location in the trie for the given path
// if no ancestor or exact location is found, false is returned.
func findPythonLocation(locations *pythonLocations, path dyn.Path) (dyn.Location, bool) {
var currentNode = locations
var lastLocation = locations.location
var exists = locations.exists
for _, component := range path {
if key := component.Key(); key != "" {
if _, ok := currentNode.keys[key]; !ok {
break
}
currentNode = currentNode.keys[key]
} else {
index := component.Index()
if _, ok := currentNode.indexes[index]; !ok {
break
}
currentNode = currentNode.indexes[index]
}
if currentNode.exists {
lastLocation = currentNode.location
exists = true
}
}
return lastLocation, exists
}

View File

@ -0,0 +1,124 @@
package python
import (
"bytes"
"testing"
"github.com/databricks/cli/libs/dyn"
assert "github.com/databricks/cli/libs/dyn/dynassert"
)
func TestMergeLocations(t *testing.T) {
pythonLocation := dyn.Location{File: "foo.py", Line: 1, Column: 1}
generatedLocation := dyn.Location{File: generatedFileName, Line: 1, Column: 1}
yamlLocation := dyn.Location{File: "foo.yml", Line: 1, Column: 1}
locations := newPythonLocations()
putPythonLocation(locations, dyn.MustPathFromString("foo"), pythonLocation)
input := dyn.NewValue(
map[string]dyn.Value{
"foo": dyn.NewValue(
map[string]dyn.Value{
"baz": dyn.NewValue("baz", []dyn.Location{yamlLocation}),
"qux": dyn.NewValue("baz", []dyn.Location{generatedLocation, yamlLocation}),
},
[]dyn.Location{},
),
"bar": dyn.NewValue("baz", []dyn.Location{generatedLocation}),
},
[]dyn.Location{yamlLocation},
)
expected := dyn.NewValue(
map[string]dyn.Value{
"foo": dyn.NewValue(
map[string]dyn.Value{
// pythonLocation is appended to the beginning of the list if absent
"baz": dyn.NewValue("baz", []dyn.Location{pythonLocation, yamlLocation}),
// generatedLocation is replaced by pythonLocation
"qux": dyn.NewValue("baz", []dyn.Location{pythonLocation, yamlLocation}),
},
[]dyn.Location{pythonLocation},
),
// if location is unknown, we keep it as-is
"bar": dyn.NewValue("baz", []dyn.Location{generatedLocation}),
},
[]dyn.Location{yamlLocation},
)
actual, err := mergePythonLocations(input, locations)
assert.NoError(t, err)
assert.Equal(t, expected, actual)
}
func TestFindLocation(t *testing.T) {
location0 := dyn.Location{File: "foo.py", Line: 1, Column: 1}
location1 := dyn.Location{File: "foo.py", Line: 2, Column: 1}
locations := newPythonLocations()
putPythonLocation(locations, dyn.MustPathFromString("foo"), location0)
putPythonLocation(locations, dyn.MustPathFromString("foo.bar"), location1)
actual, exists := findPythonLocation(locations, dyn.MustPathFromString("foo.bar"))
assert.True(t, exists)
assert.Equal(t, location1, actual)
}
func TestFindLocation_indexPathComponent(t *testing.T) {
location0 := dyn.Location{File: "foo.py", Line: 1, Column: 1}
location1 := dyn.Location{File: "foo.py", Line: 2, Column: 1}
location2 := dyn.Location{File: "foo.py", Line: 3, Column: 1}
locations := newPythonLocations()
putPythonLocation(locations, dyn.MustPathFromString("foo"), location0)
putPythonLocation(locations, dyn.MustPathFromString("foo.bar"), location1)
putPythonLocation(locations, dyn.MustPathFromString("foo.bar[0]"), location2)
actual, exists := findPythonLocation(locations, dyn.MustPathFromString("foo.bar[0]"))
assert.True(t, exists)
assert.Equal(t, location2, actual)
}
func TestFindLocation_closestAncestorLocation(t *testing.T) {
location0 := dyn.Location{File: "foo.py", Line: 1, Column: 1}
location1 := dyn.Location{File: "foo.py", Line: 2, Column: 1}
locations := newPythonLocations()
putPythonLocation(locations, dyn.MustPathFromString("foo"), location0)
putPythonLocation(locations, dyn.MustPathFromString("foo.bar"), location1)
actual, exists := findPythonLocation(locations, dyn.MustPathFromString("foo.bar.baz"))
assert.True(t, exists)
assert.Equal(t, location1, actual)
}
func TestFindLocation_unknownLocation(t *testing.T) {
location0 := dyn.Location{File: "foo.py", Line: 1, Column: 1}
location1 := dyn.Location{File: "foo.py", Line: 2, Column: 1}
locations := newPythonLocations()
putPythonLocation(locations, dyn.MustPathFromString("foo"), location0)
putPythonLocation(locations, dyn.MustPathFromString("foo.bar"), location1)
_, exists := findPythonLocation(locations, dyn.MustPathFromString("bar"))
assert.False(t, exists)
}
func TestParsePythonLocations(t *testing.T) {
expected := dyn.Location{File: "foo.py", Line: 1, Column: 2}
input := `{"path": "foo", "file": "foo.py", "line": 1, "column": 2}`
reader := bytes.NewReader([]byte(input))
locations, err := parsePythonLocations(reader)
assert.NoError(t, err)
assert.True(t, locations.keys["foo"].exists)
assert.Equal(t, expected, locations.keys["foo"].location)
}

View File

@ -7,9 +7,12 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"github.com/databricks/cli/bundle/config/mutator/paths"
"github.com/databricks/databricks-sdk-go/logger"
"github.com/fatih/color"
@ -108,7 +111,12 @@ func (m *pythonMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagno
return dyn.InvalidValue, fmt.Errorf("failed to create cache dir: %w", err)
}
rightRoot, diags := m.runPythonMutator(ctx, cacheDir, b.BundleRootPath, pythonPath, leftRoot)
rightRoot, diags := m.runPythonMutator(ctx, leftRoot, runPythonMutatorOpts{
cacheDir: cacheDir,
bundleRootPath: b.BundleRootPath,
pythonPath: pythonPath,
loadLocations: experimental.PyDABs.LoadLocations,
})
mutateDiags = diags
if diags.HasError() {
return dyn.InvalidValue, mutateDiagsHasError
@ -152,13 +160,21 @@ func createCacheDir(ctx context.Context) (string, error) {
return os.MkdirTemp("", "-pydabs")
}
func (m *pythonMutator) runPythonMutator(ctx context.Context, cacheDir string, rootPath string, pythonPath string, root dyn.Value) (dyn.Value, diag.Diagnostics) {
inputPath := filepath.Join(cacheDir, "input.json")
outputPath := filepath.Join(cacheDir, "output.json")
diagnosticsPath := filepath.Join(cacheDir, "diagnostics.json")
type runPythonMutatorOpts struct {
cacheDir string
bundleRootPath string
pythonPath string
loadLocations bool
}
func (m *pythonMutator) runPythonMutator(ctx context.Context, root dyn.Value, opts runPythonMutatorOpts) (dyn.Value, diag.Diagnostics) {
inputPath := filepath.Join(opts.cacheDir, "input.json")
outputPath := filepath.Join(opts.cacheDir, "output.json")
diagnosticsPath := filepath.Join(opts.cacheDir, "diagnostics.json")
locationsPath := filepath.Join(opts.cacheDir, "locations.json")
args := []string{
pythonPath,
opts.pythonPath,
"-m",
"databricks.bundles.build",
"--phase",
@ -171,6 +187,10 @@ func (m *pythonMutator) runPythonMutator(ctx context.Context, cacheDir string, r
diagnosticsPath,
}
if opts.loadLocations {
args = append(args, "--locations", locationsPath)
}
if err := writeInputFile(inputPath, root); err != nil {
return dyn.InvalidValue, diag.Errorf("failed to write input file: %s", err)
}
@ -185,7 +205,7 @@ func (m *pythonMutator) runPythonMutator(ctx context.Context, cacheDir string, r
_, processErr := process.Background(
ctx,
args,
process.WithDir(rootPath),
process.WithDir(opts.bundleRootPath),
process.WithStderrWriter(stderrWriter),
process.WithStdoutWriter(stdoutWriter),
)
@ -221,7 +241,12 @@ func (m *pythonMutator) runPythonMutator(ctx context.Context, cacheDir string, r
return dyn.InvalidValue, diag.Errorf("failed to load diagnostics: %s", pythonDiagnosticsErr)
}
output, outputDiags := loadOutputFile(rootPath, outputPath)
locations, err := loadLocationsFile(locationsPath)
if err != nil {
return dyn.InvalidValue, diag.Errorf("failed to load locations: %s", err)
}
output, outputDiags := loadOutputFile(opts.bundleRootPath, outputPath, locations)
pythonDiagnostics = pythonDiagnostics.Extend(outputDiags)
// we pass through pythonDiagnostic because it contains warnings
@ -266,7 +291,21 @@ func writeInputFile(inputPath string, input dyn.Value) error {
return os.WriteFile(inputPath, rootConfigJson, 0600)
}
func loadOutputFile(rootPath string, outputPath string) (dyn.Value, diag.Diagnostics) {
// loadLocationsFile loads locations.json containing source locations for generated YAML.
func loadLocationsFile(locationsPath string) (*pythonLocations, error) {
locationsFile, err := os.Open(locationsPath)
if errors.Is(err, fs.ErrNotExist) {
return newPythonLocations(), nil
} else if err != nil {
return nil, fmt.Errorf("failed to open locations file: %w", err)
}
defer locationsFile.Close()
return parsePythonLocations(locationsFile)
}
func loadOutputFile(rootPath string, outputPath string, locations *pythonLocations) (dyn.Value, diag.Diagnostics) {
outputFile, err := os.Open(outputPath)
if err != nil {
return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to open output file: %w", err))
@ -277,12 +316,12 @@ func loadOutputFile(rootPath string, outputPath string) (dyn.Value, diag.Diagnos
// we need absolute path because later parts of pipeline assume all paths are absolute
// and this file will be used as location to resolve relative paths.
//
// virtualPath has to stay in rootPath, because locations outside root path are not allowed:
// virtualPath has to stay in bundleRootPath, because locations outside root path are not allowed:
//
// Error: path /var/folders/.../pydabs/dist/*.whl is not contained in bundle root path
//
// for that, we pass virtualPath instead of outputPath as file location
virtualPath, err := filepath.Abs(filepath.Join(rootPath, "__generated_by_pydabs__.yml"))
virtualPath, err := filepath.Abs(filepath.Join(rootPath, generatedFileName))
if err != nil {
return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to get absolute path: %w", err))
}
@ -292,7 +331,25 @@ func loadOutputFile(rootPath string, outputPath string) (dyn.Value, diag.Diagnos
return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to parse output file: %w", err))
}
return strictNormalize(config.Root{}, generated)
// paths are resolved relative to locations of their values, if we change location
// we have to update each path, until we simplify that, we don't update locations
// for such values, so we don't change how paths are resolved
_, err = paths.VisitJobPaths(generated, func(p dyn.Path, kind paths.PathKind, v dyn.Value) (dyn.Value, error) {
putPythonLocation(locations, p, v.Location())
return v, nil
})
if err != nil {
return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to update locations: %w", err))
}
// generated has dyn.Location as if it comes from generated YAML file
// earlier we loaded locations.json with source locations in Python code
generatedWithLocations, err := mergePythonLocations(generated, locations)
if err != nil {
return dyn.InvalidValue, diag.FromErr(fmt.Errorf("failed to update locations: %w", err))
}
return strictNormalize(config.Root{}, generatedWithLocations)
}
func strictNormalize(dst any, generated dyn.Value) (dyn.Value, diag.Diagnostics) {

View File

@ -6,7 +6,6 @@ import (
"os"
"os/exec"
"path/filepath"
"reflect"
"runtime"
"testing"
@ -47,6 +46,7 @@ func TestPythonMutator_load(t *testing.T) {
pydabs:
enabled: true
venv_path: .venv
load_locations: true
resources:
jobs:
job0:
@ -65,7 +65,8 @@ func TestPythonMutator_load(t *testing.T) {
"experimental": {
"pydabs": {
"enabled": true,
"venv_path": ".venv"
"venv_path": ".venv",
"load_locations": true
}
},
"resources": {
@ -80,6 +81,8 @@ func TestPythonMutator_load(t *testing.T) {
}
}`,
`{"severity": "warning", "summary": "job doesn't have any tasks", "location": {"file": "src/examples/file.py", "line": 10, "column": 5}}`,
`{"path": "resources.jobs.job0", "file": "src/examples/job0.py", "line": 3, "column": 5}
{"path": "resources.jobs.job1", "file": "src/examples/job1.py", "line": 5, "column": 7}`,
)
mutator := PythonMutator(PythonMutatorPhaseLoad)
@ -97,6 +100,25 @@ func TestPythonMutator_load(t *testing.T) {
assert.Equal(t, "job_1", job1.Name)
}
// output of locations.json should be applied to underlying dyn.Value
err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
name1, err := dyn.GetByPath(v, dyn.MustPathFromString("resources.jobs.job1.name"))
if err != nil {
return dyn.InvalidValue, err
}
assert.Equal(t, []dyn.Location{
{
File: "src/examples/job1.py",
Line: 5,
Column: 7,
},
}, name1.Locations())
return v, nil
})
assert.NoError(t, err)
assert.Equal(t, 1, len(diags))
assert.Equal(t, "job doesn't have any tasks", diags[0].Summary)
assert.Equal(t, []dyn.Location{
@ -106,7 +128,6 @@ func TestPythonMutator_load(t *testing.T) {
Column: 5,
},
}, diags[0].Locations)
}
func TestPythonMutator_load_disallowed(t *testing.T) {
@ -146,7 +167,7 @@ func TestPythonMutator_load_disallowed(t *testing.T) {
}
}
}
}`, "")
}`, "", "")
mutator := PythonMutator(PythonMutatorPhaseLoad)
diag := bundle.Apply(ctx, b, mutator)
@ -191,7 +212,7 @@ func TestPythonMutator_init(t *testing.T) {
}
}
}
}`, "")
}`, "", "")
mutator := PythonMutator(PythonMutatorPhaseInit)
diag := bundle.Apply(ctx, b, mutator)
@ -252,7 +273,7 @@ func TestPythonMutator_badOutput(t *testing.T) {
}
}
}
}`, "")
}`, "", "")
mutator := PythonMutator(PythonMutatorPhaseLoad)
diag := bundle.Apply(ctx, b, mutator)
@ -588,7 +609,7 @@ or activate the environment before running CLI commands:
assert.Equal(t, expected, out)
}
func withProcessStub(t *testing.T, args []string, output string, diagnostics string) context.Context {
func withProcessStub(t *testing.T, args []string, output string, diagnostics string, locations string) context.Context {
ctx := context.Background()
ctx, stub := process.WithStub(ctx)
@ -600,32 +621,51 @@ func withProcessStub(t *testing.T, args []string, output string, diagnostics str
inputPath := filepath.Join(cacheDir, "input.json")
outputPath := filepath.Join(cacheDir, "output.json")
locationsPath := filepath.Join(cacheDir, "locations.json")
diagnosticsPath := filepath.Join(cacheDir, "diagnostics.json")
args = append(args, "--input", inputPath)
args = append(args, "--output", outputPath)
args = append(args, "--diagnostics", diagnosticsPath)
stub.WithCallback(func(actual *exec.Cmd) error {
_, err := os.Stat(inputPath)
assert.NoError(t, err)
if reflect.DeepEqual(actual.Args, args) {
err := os.WriteFile(outputPath, []byte(output), 0600)
actualInputPath := getArg(actual.Args, "--input")
actualOutputPath := getArg(actual.Args, "--output")
actualDiagnosticsPath := getArg(actual.Args, "--diagnostics")
actualLocationsPath := getArg(actual.Args, "--locations")
require.Equal(t, inputPath, actualInputPath)
require.Equal(t, outputPath, actualOutputPath)
require.Equal(t, diagnosticsPath, actualDiagnosticsPath)
// locations is an optional argument
if locations != "" {
require.Equal(t, locationsPath, actualLocationsPath)
err = os.WriteFile(locationsPath, []byte(locations), 0600)
require.NoError(t, err)
}
err = os.WriteFile(outputPath, []byte(output), 0600)
require.NoError(t, err)
err = os.WriteFile(diagnosticsPath, []byte(diagnostics), 0600)
require.NoError(t, err)
return nil
} else {
return fmt.Errorf("unexpected command: %v", actual.Args)
}
})
return ctx
}
func getArg(args []string, name string) string {
for i := 0; i < len(args); i++ {
if args[i] == name {
return args[i+1]
}
}
return ""
}
func loadYaml(name string, content string) *bundle.Bundle {
v, diag := config.LoadFromBytes(name, []byte(content))

View File

@ -0,0 +1,137 @@
package validate
import (
"context"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/log"
)
// Validates that any single node clusters defined in the bundle are correctly configured.
func SingleNodeCluster() bundle.ReadOnlyMutator {
return &singleNodeCluster{}
}
type singleNodeCluster struct{}
func (m *singleNodeCluster) Name() string {
return "validate:SingleNodeCluster"
}
const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a
valid single node cluster please ensure that the following properties
are correctly set in the cluster specification:
spark_conf:
spark.databricks.cluster.profile: singleNode
spark.master: local[*]
custom_tags:
ResourceClass: SingleNode
`
const singleNodeWarningSummary = `Single node cluster is not correctly configured`
func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool {
// Check if the user has explicitly set the num_workers to 0. Skip the warning
// if that's not the case.
numWorkers, ok := v.Get("num_workers").AsInt()
if !ok || numWorkers > 0 {
return false
}
// Convenient type that contains the common fields from compute.ClusterSpec and
// pipelines.PipelineCluster that we are interested in.
type ClusterConf struct {
SparkConf map[string]string `json:"spark_conf"`
CustomTags map[string]string `json:"custom_tags"`
PolicyId string `json:"policy_id"`
}
conf := &ClusterConf{}
err := convert.ToTyped(conf, v)
if err != nil {
return false
}
// If the policy id is set, we don't want to show the warning. This is because
// the user might have configured `spark_conf` and `custom_tags` correctly
// in their cluster policy.
if conf.PolicyId != "" {
return false
}
profile, ok := conf.SparkConf["spark.databricks.cluster.profile"]
if !ok {
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec")
return true
}
if profile != "singleNode" {
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile)
return true
}
master, ok := conf.SparkConf["spark.master"]
if !ok {
log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec")
return true
}
if !strings.HasPrefix(master, "local") {
log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master)
return true
}
resourceClass, ok := conf.CustomTags["ResourceClass"]
if !ok {
log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec")
return true
}
if resourceClass != "SingleNode" {
log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass)
return true
}
return false
}
func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{}
patterns := []dyn.Pattern{
// Interactive clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()),
// Job clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")),
// Job task clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")),
// Job for each task clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")),
// Pipeline clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()),
}
for _, p := range patterns {
_, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
warning := diag.Diagnostic{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: v.Locations(),
Paths: []dyn.Path{p},
}
if showSingleNodeClusterWarning(ctx, v) {
diags = append(diags, warning)
}
return v, nil
})
if err != nil {
log.Debugf(ctx, "Error while applying single node cluster validation: %s", err)
}
}
return diags
}

View File

@ -0,0 +1,566 @@
package validate
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
)
func failCases() []struct {
name string
sparkConf map[string]string
customTags map[string]string
} {
return []struct {
name string
sparkConf map[string]string
customTags map[string]string
}{
{
name: "no tags or conf",
},
{
name: "no tags",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
},
{
name: "no conf",
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "invalid spark cluster profile",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "invalid",
"spark.master": "local[*]",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "invalid spark.master",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "invalid",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "invalid tags",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
customTags: map[string]string{"ResourceClass": "invalid"},
},
{
name: "missing ResourceClass tag",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
customTags: map[string]string{"what": "ever"},
},
{
name: "missing spark.master",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "missing spark.databricks.cluster.profile",
sparkConf: map[string]string{
"spark.master": "local[*]",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
}
}
func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Clusters: map[string]*resources.Cluster{
"foo": {
ClusterSpec: &compute.ClusterSpec{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{
{
NewCluster: compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"foo": {
PipelineSpec: &pipelines.PipelineSpec{
Clusters: []pipelines.PipelineCluster{
{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")},
},
}, diags)
})
}
}
func passCases() []struct {
name string
numWorkers *int
sparkConf map[string]string
customTags map[string]string
policyId string
} {
zero := 0
one := 1
return []struct {
name string
numWorkers *int
sparkConf map[string]string
customTags map[string]string
policyId string
}{
{
name: "single node cluster",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
customTags: map[string]string{
"ResourceClass": "SingleNode",
},
numWorkers: &zero,
},
{
name: "num workers is not zero",
numWorkers: &one,
},
{
name: "num workers is not set",
},
{
name: "policy id is not empty",
policyId: "policy-abc",
numWorkers: &zero,
},
}
}
func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Clusters: map[string]*resources.Cluster{
"foo": {
ClusterSpec: &compute.ClusterSpec{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{
{
NewCluster: compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"foo": {
PipelineSpec: &pipelines.PipelineSpec{
Clusters: []pipelines.PipelineCluster{
{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}

View File

@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
ValidateSyncPatterns(),
JobTaskClusterSpec(),
ValidateFolderPermissions(),
SingleNodeCluster(),
))
}