Added validate mutator to surface additional bundle warnings (#1352)

## Changes
All these validators will return warnings as part of `bundle validate`
run

Added 2 mutators: 
1. To check that if tasks use job_cluster_key it is actually defined
2. To check if there are any files to sync as part of deployment

Also added `bundle.Parallel` to run them in parallel

To make sure mutators under bundle.Parallel do not mutate config,
introduced new `ReadOnlyMutator`, `ReadOnlyBundle` and `ReadOnlyConfig`.

Example 

```
databricks bundle validate -p deco-staging
Warning: unknown field: new_cluster
  at resources.jobs.my_job
  in bundle.yml:24:7

Warning: job_cluster_key high_cpu_workload_job_cluster is not defined
  at resources.jobs.my_job.tasks[0].job_cluster_key
  in bundle.yml:35:28

Warning: There are no files to sync, please check your your .gitignore and sync.exclude configuration
  at sync.exclude
  in bundle.yml:18:5

Name: test
Target: default
Workspace:
  Host: https://acme.databricks.com
  User: andrew.nester@databricks.com
  Path: /Users/andrew.nester@databricks.com/.bundle/test/default

Found 3 warnings
```

## Tests
Added unit tests
This commit is contained in:
Andrew Nester 2024-04-18 17:13:16 +02:00 committed by GitHub
parent eb9665d2ee
commit 27f51c760f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 624 additions and 21 deletions

View File

@ -0,0 +1,36 @@
package bundle
import (
"context"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/databricks-sdk-go"
)
type ReadOnlyBundle struct {
b *Bundle
}
func ReadOnly(b *Bundle) ReadOnlyBundle {
return ReadOnlyBundle{b: b}
}
func (r ReadOnlyBundle) Config() config.Root {
return r.b.Config
}
func (r ReadOnlyBundle) RootPath() string {
return r.b.RootPath
}
func (r ReadOnlyBundle) WorkspaceClient() *databricks.WorkspaceClient {
return r.b.WorkspaceClient()
}
func (r ReadOnlyBundle) CacheDir(ctx context.Context, paths ...string) (string, error) {
return r.b.CacheDir(ctx, paths...)
}
func (r ReadOnlyBundle) GetSyncIncludePatterns(ctx context.Context) ([]string, error) {
return r.b.GetSyncIncludePatterns(ctx)
}

View File

@ -452,7 +452,7 @@ func validateVariableOverrides(root, target dyn.Value) (err error) {
// Best effort to get the location of configuration value at the specified path.
// This function is useful to annotate error messages with the location, because
// we don't want to fail with a different error message if we cannot retrieve the location.
func (r *Root) GetLocation(path string) dyn.Location {
func (r Root) GetLocation(path string) dyn.Location {
v, err := dyn.Get(r.value, path)
if err != nil {
return dyn.Location{}

View File

@ -0,0 +1,54 @@
package validate
import (
"context"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/deploy/files"
"github.com/databricks/cli/libs/diag"
)
func FilesToSync() bundle.ReadOnlyMutator {
return &filesToSync{}
}
type filesToSync struct {
}
func (v *filesToSync) Name() string {
return "validate:files_to_sync"
}
func (v *filesToSync) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
sync, err := files.GetSync(ctx, rb)
if err != nil {
return diag.FromErr(err)
}
fl, err := sync.GetFileList(ctx)
if err != nil {
return diag.FromErr(err)
}
if len(fl) != 0 {
return nil
}
diags := diag.Diagnostics{}
if len(rb.Config().Sync.Exclude) == 0 {
diags = diags.Append(diag.Diagnostic{
Severity: diag.Warning,
Summary: "There are no files to sync, please check your .gitignore",
})
} else {
loc := location{path: "sync.exclude", rb: rb}
diags = diags.Append(diag.Diagnostic{
Severity: diag.Warning,
Summary: "There are no files to sync, please check your .gitignore and sync.exclude configuration",
Location: loc.Location(),
Path: loc.Path(),
})
}
return diags
}

View File

@ -0,0 +1,53 @@
package validate
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
)
func JobClusterKeyDefined() bundle.ReadOnlyMutator {
return &jobClusterKeyDefined{}
}
type jobClusterKeyDefined struct {
}
func (v *jobClusterKeyDefined) Name() string {
return "validate:job_cluster_key_defined"
}
func (v *jobClusterKeyDefined) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{}
for k, job := range rb.Config().Resources.Jobs {
jobClusterKeys := make(map[string]bool)
for _, cluster := range job.JobClusters {
if cluster.JobClusterKey != "" {
jobClusterKeys[cluster.JobClusterKey] = true
}
}
for index, task := range job.Tasks {
if task.JobClusterKey != "" {
if _, ok := jobClusterKeys[task.JobClusterKey]; !ok {
loc := location{
path: fmt.Sprintf("resources.jobs.%s.tasks[%d].job_cluster_key", k, index),
rb: rb,
}
diags = diags.Append(diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("job_cluster_key %s is not defined", task.JobClusterKey),
Location: loc.Location(),
Path: loc.Path(),
})
}
}
}
}
return diags
}

View File

@ -0,0 +1,97 @@
package validate
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)
func TestJobClusterKeyDefined(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
Name: "job1",
JobClusters: []jobs.JobCluster{
{JobClusterKey: "do-not-exist"},
},
Tasks: []jobs.Task{
{JobClusterKey: "do-not-exist"},
},
},
},
},
},
},
}
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobClusterKeyDefined())
require.Len(t, diags, 0)
require.NoError(t, diags.Error())
}
func TestJobClusterKeyNotDefined(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
Name: "job1",
Tasks: []jobs.Task{
{JobClusterKey: "do-not-exist"},
},
},
},
},
},
},
}
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobClusterKeyDefined())
require.Len(t, diags, 1)
require.NoError(t, diags.Error())
require.Equal(t, diags[0].Severity, diag.Warning)
require.Equal(t, diags[0].Summary, "job_cluster_key do-not-exist is not defined")
}
func TestJobClusterKeyDefinedInDifferentJob(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
Name: "job1",
Tasks: []jobs.Task{
{JobClusterKey: "do-not-exist"},
},
},
},
"job2": {
JobSettings: &jobs.JobSettings{
Name: "job2",
JobClusters: []jobs.JobCluster{
{JobClusterKey: "do-not-exist"},
},
},
},
},
},
},
}
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobClusterKeyDefined())
require.Len(t, diags, 1)
require.NoError(t, diags.Error())
require.Equal(t, diags[0].Severity, diag.Warning)
require.Equal(t, diags[0].Summary, "job_cluster_key do-not-exist is not defined")
}

View File

@ -0,0 +1,43 @@
package validate
import (
"context"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
)
type validate struct {
}
type location struct {
path string
rb bundle.ReadOnlyBundle
}
func (l location) Location() dyn.Location {
return l.rb.Config().GetLocation(l.path)
}
func (l location) Path() dyn.Path {
return dyn.MustPathFromString(l.path)
}
// Apply implements bundle.Mutator.
func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
return bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), bundle.Parallel(
JobClusterKeyDefined(),
FilesToSync(),
ValidateSyncPatterns(),
))
}
// Name implements bundle.Mutator.
func (v *validate) Name() string {
return "validate"
}
func Validate() bundle.Mutator {
return &validate{}
}

View File

@ -0,0 +1,79 @@
package validate
import (
"context"
"fmt"
"sync"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/fileset"
"golang.org/x/sync/errgroup"
)
func ValidateSyncPatterns() bundle.ReadOnlyMutator {
return &validateSyncPatterns{}
}
type validateSyncPatterns struct {
}
func (v *validateSyncPatterns) Name() string {
return "validate:validate_sync_patterns"
}
func (v *validateSyncPatterns) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
s := rb.Config().Sync
if len(s.Exclude) == 0 && len(s.Include) == 0 {
return nil
}
diags, err := checkPatterns(s.Exclude, "sync.exclude", rb)
if err != nil {
return diag.FromErr(err)
}
includeDiags, err := checkPatterns(s.Include, "sync.include", rb)
if err != nil {
return diag.FromErr(err)
}
return diags.Extend(includeDiags)
}
func checkPatterns(patterns []string, path string, rb bundle.ReadOnlyBundle) (diag.Diagnostics, error) {
var mu sync.Mutex
var errs errgroup.Group
var diags diag.Diagnostics
for i, pattern := range patterns {
index := i
p := pattern
errs.Go(func() error {
fs, err := fileset.NewGlobSet(rb.RootPath(), []string{p})
if err != nil {
return err
}
all, err := fs.All()
if err != nil {
return err
}
if len(all) == 0 {
loc := location{path: fmt.Sprintf("%s[%d]", path, index), rb: rb}
mu.Lock()
diags = diags.Append(diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Pattern %s does not match any files", p),
Location: loc.Location(),
Path: loc.Path(),
})
mu.Unlock()
}
return nil
})
}
return diags, errs.Wait()
}

View File

@ -46,7 +46,7 @@ func (m *delete) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
}
// Clean up sync snapshot file
sync, err := GetSync(ctx, b)
sync, err := GetSync(ctx, bundle.ReadOnly(b))
if err != nil {
return diag.FromErr(err)
}

View File

@ -8,40 +8,40 @@ import (
"github.com/databricks/cli/libs/sync"
)
func GetSync(ctx context.Context, b *bundle.Bundle) (*sync.Sync, error) {
opts, err := GetSyncOptions(ctx, b)
func GetSync(ctx context.Context, rb bundle.ReadOnlyBundle) (*sync.Sync, error) {
opts, err := GetSyncOptions(ctx, rb)
if err != nil {
return nil, fmt.Errorf("cannot get sync options: %w", err)
}
return sync.New(ctx, *opts)
}
func GetSyncOptions(ctx context.Context, b *bundle.Bundle) (*sync.SyncOptions, error) {
cacheDir, err := b.CacheDir(ctx)
func GetSyncOptions(ctx context.Context, rb bundle.ReadOnlyBundle) (*sync.SyncOptions, error) {
cacheDir, err := rb.CacheDir(ctx)
if err != nil {
return nil, fmt.Errorf("cannot get bundle cache directory: %w", err)
}
includes, err := b.GetSyncIncludePatterns(ctx)
includes, err := rb.GetSyncIncludePatterns(ctx)
if err != nil {
return nil, fmt.Errorf("cannot get list of sync includes: %w", err)
}
opts := &sync.SyncOptions{
LocalPath: b.RootPath,
RemotePath: b.Config.Workspace.FilePath,
LocalPath: rb.RootPath(),
RemotePath: rb.Config().Workspace.FilePath,
Include: includes,
Exclude: b.Config.Sync.Exclude,
Host: b.WorkspaceClient().Config.Host,
Exclude: rb.Config().Sync.Exclude,
Host: rb.WorkspaceClient().Config.Host,
Full: false,
SnapshotBasePath: cacheDir,
WorkspaceClient: b.WorkspaceClient(),
WorkspaceClient: rb.WorkspaceClient(),
}
if b.Config.Workspace.CurrentUser != nil {
opts.CurrentUser = b.Config.Workspace.CurrentUser.User
if rb.Config().Workspace.CurrentUser != nil {
opts.CurrentUser = rb.Config().Workspace.CurrentUser.User
}
return opts, nil

View File

@ -18,7 +18,7 @@ func (m *upload) Name() string {
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath))
sync, err := GetSync(ctx, b)
sync, err := GetSync(ctx, bundle.ReadOnly(b))
if err != nil {
return diag.FromErr(err)
}

View File

@ -79,7 +79,7 @@ func (s *statePull) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostic
}
// Create a new snapshot based on the deployment state file.
opts, err := files.GetSyncOptions(ctx, b)
opts, err := files.GetSyncOptions(ctx, bundle.ReadOnly(b))
if err != nil {
return diag.FromErr(err)
}

View File

@ -85,7 +85,7 @@ func testStatePull(t *testing.T, opts statePullOpts) {
}
if opts.withExistingSnapshot {
opts, err := files.GetSyncOptions(ctx, b)
opts, err := files.GetSyncOptions(ctx, bundle.ReadOnly(b))
require.NoError(t, err)
snapshotPath, err := sync.SnapshotPath(opts)
@ -127,7 +127,7 @@ func testStatePull(t *testing.T, opts statePullOpts) {
}
if opts.expects.snapshotState != nil {
syncOpts, err := files.GetSyncOptions(ctx, b)
syncOpts, err := files.GetSyncOptions(ctx, bundle.ReadOnly(b))
require.NoError(t, err)
snapshotPath, err := sync.SnapshotPath(syncOpts)

View File

@ -39,7 +39,7 @@ func (s *stateUpdate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost
state.Version = DeploymentStateVersion
// Get the current file list.
sync, err := files.GetSync(ctx, b)
sync, err := files.GetSync(ctx, bundle.ReadOnly(b))
if err != nil {
return diag.FromErr(err)
}

View File

@ -0,0 +1,29 @@
package bundle
import (
"context"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/log"
)
// ReadOnlyMutator is the interface type that allows access to bundle configuration but does not allow any mutations.
type ReadOnlyMutator interface {
// Name returns the mutators name.
Name() string
// Apply access the specified read-only bundle object.
Apply(context.Context, ReadOnlyBundle) diag.Diagnostics
}
func ApplyReadOnly(ctx context.Context, rb ReadOnlyBundle, m ReadOnlyMutator) diag.Diagnostics {
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("mutator (read-only)", m.Name()))
log.Debugf(ctx, "ApplyReadOnly")
diags := m.Apply(ctx, rb)
if err := diags.Error(); err != nil {
log.Errorf(ctx, "Error: %s", err)
}
return diags
}

43
bundle/parallel.go Normal file
View File

@ -0,0 +1,43 @@
package bundle
import (
"context"
"sync"
"github.com/databricks/cli/libs/diag"
)
type parallel struct {
mutators []ReadOnlyMutator
}
func (m *parallel) Name() string {
return "parallel"
}
func (m *parallel) Apply(ctx context.Context, rb ReadOnlyBundle) diag.Diagnostics {
var wg sync.WaitGroup
var mu sync.Mutex
var diags diag.Diagnostics
wg.Add(len(m.mutators))
for _, mutator := range m.mutators {
go func(mutator ReadOnlyMutator) {
defer wg.Done()
d := ApplyReadOnly(ctx, rb, mutator)
mu.Lock()
diags = diags.Extend(d)
mu.Unlock()
}(mutator)
}
wg.Wait()
return diags
}
// Parallel runs the given mutators in parallel.
func Parallel(mutators ...ReadOnlyMutator) ReadOnlyMutator {
return &parallel{
mutators: mutators,
}
}

73
bundle/parallel_test.go Normal file
View File

@ -0,0 +1,73 @@
package bundle
import (
"context"
"testing"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/libs/diag"
"github.com/stretchr/testify/require"
)
type addToContainer struct {
container *[]int
value int
err bool
}
func (m *addToContainer) Apply(ctx context.Context, b ReadOnlyBundle) diag.Diagnostics {
if m.err {
return diag.Errorf("error")
}
c := *m.container
c = append(c, m.value)
*m.container = c
return nil
}
func (m *addToContainer) Name() string {
return "addToContainer"
}
func TestParallelMutatorWork(t *testing.T) {
b := &Bundle{
Config: config.Root{},
}
container := []int{}
m1 := &addToContainer{container: &container, value: 1}
m2 := &addToContainer{container: &container, value: 2}
m3 := &addToContainer{container: &container, value: 3}
m := Parallel(m1, m2, m3)
// Apply the mutator
diags := ApplyReadOnly(context.Background(), ReadOnly(b), m)
require.Empty(t, diags)
require.Len(t, container, 3)
require.Contains(t, container, 1)
require.Contains(t, container, 2)
require.Contains(t, container, 3)
}
func TestParallelMutatorWorkWithErrors(t *testing.T) {
b := &Bundle{
Config: config.Root{},
}
container := []int{}
m1 := &addToContainer{container: &container, value: 1}
m2 := &addToContainer{container: &container, err: true, value: 2}
m3 := &addToContainer{container: &container, value: 3}
m := Parallel(m1, m2, m3)
// Apply the mutator
diags := ApplyReadOnly(context.Background(), ReadOnly(b), m)
require.Len(t, diags, 1)
require.Equal(t, "error", diags[0].Summary)
require.Len(t, container, 2)
require.Contains(t, container, 1)
require.Contains(t, container, 3)
}

View File

@ -0,0 +1,27 @@
bundle:
name: job_cluster_key
workspace:
host: https://acme.cloud.databricks.com/
targets:
default:
resources:
jobs:
foo:
name: job
tasks:
- task_key: test
job_cluster_key: key
development:
resources:
jobs:
foo:
job_clusters:
- job_cluster_key: key
new_cluster:
node_type_id: i3.xlarge
num_workers: 1
tasks:
- task_key: test
job_cluster_key: key

View File

@ -0,0 +1,28 @@
package config_tests
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/validate"
"github.com/databricks/cli/libs/diag"
"github.com/stretchr/testify/require"
)
func TestJobClusterKeyNotDefinedTest(t *testing.T) {
b := loadTarget(t, "./job_cluster_key", "default")
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.JobClusterKeyDefined())
require.Len(t, diags, 1)
require.NoError(t, diags.Error())
require.Equal(t, diags[0].Severity, diag.Warning)
require.Equal(t, diags[0].Summary, "job_cluster_key key is not defined")
}
func TestJobClusterKeyDefinedTest(t *testing.T) {
b := loadTarget(t, "./job_cluster_key", "development")
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.JobClusterKeyDefined())
require.Len(t, diags, 0)
}

View File

@ -0,0 +1,39 @@
package config_tests
import (
"context"
"fmt"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/validate"
"github.com/databricks/cli/libs/diag"
"github.com/stretchr/testify/require"
)
func TestSyncIncludeExcludeNoMatchesTest(t *testing.T) {
b := loadTarget(t, "./override_sync", "development")
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.ValidateSyncPatterns())
require.Len(t, diags, 3)
require.NoError(t, diags.Error())
require.Equal(t, diags[0].Severity, diag.Warning)
require.Equal(t, diags[0].Summary, "Pattern dist does not match any files")
require.Equal(t, diags[0].Location.File, filepath.Join("override_sync", "databricks.yml"))
require.Equal(t, diags[0].Location.Line, 17)
require.Equal(t, diags[0].Location.Column, 11)
require.Equal(t, diags[0].Path.String(), "sync.exclude[0]")
summaries := []string{
fmt.Sprintf("Pattern %s does not match any files", filepath.Join("src", "*")),
fmt.Sprintf("Pattern %s does not match any files", filepath.Join("tests", "*")),
}
require.Equal(t, diags[1].Severity, diag.Warning)
require.Contains(t, summaries, diags[1].Summary)
require.Equal(t, diags[2].Severity, diag.Warning)
require.Contains(t, summaries, diags[2].Summary)
}

View File

@ -21,7 +21,7 @@ type syncFlags struct {
}
func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOptions, error) {
opts, err := files.GetSyncOptions(cmd.Context(), b)
opts, err := files.GetSyncOptions(cmd.Context(), bundle.ReadOnly(b))
if err != nil {
return nil, fmt.Errorf("cannot get sync options: %w", err)
}

View File

@ -8,6 +8,7 @@ import (
"text/template"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/validate"
"github.com/databricks/cli/bundle/phases"
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/cmd/root"
@ -140,6 +141,7 @@ func newValidateCommand() *cobra.Command {
}
diags = diags.Extend(bundle.Apply(ctx, b, phases.Initialize()))
diags = diags.Extend(bundle.Apply(ctx, b, validate.Validate()))
if err := diags.Error(); err != nil {
return err
}

View File

@ -30,7 +30,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b *
return nil, fmt.Errorf("SRC and DST are not configurable in the context of a bundle")
}
opts, err := files.GetSyncOptions(cmd.Context(), b)
opts, err := files.GetSyncOptions(cmd.Context(), bundle.ReadOnly(b))
if err != nil {
return nil, fmt.Errorf("cannot get sync options: %w", err)
}