diff --git a/bundle/config/mutator/apply_presets.go b/bundle/config/mutator/apply_presets.go index 33b2ea42..57b765a8 100644 --- a/bundle/config/mutator/apply_presets.go +++ b/bundle/config/mutator/apply_presets.go @@ -238,9 +238,21 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos if config.IsExplicitlyEnabled((b.Config.Presets.SourceLinkedDeployment)) { isDatabricksWorkspace := dbr.RunsOnRuntime(ctx) && strings.HasPrefix(b.SyncRootPath, "/Workspace/") if !isDatabricksWorkspace { + target := b.Config.Bundle.Target + path := dyn.NewPath(dyn.Key("targets"), dyn.Key(target), dyn.Key("presets"), dyn.Key("source_linked_deployment")) + diags = diags.Append( + diag.Diagnostic{ + Severity: diag.Warning, + Summary: "source-linked deployment is available only in the Databricks Workspace", + Paths: []dyn.Path{ + path, + }, + Locations: b.Config.GetLocations(path[2:].String()), + }, + ) + disabled := false b.Config.Presets.SourceLinkedDeployment = &disabled - diags = diags.Extend(diag.Warningf("source-linked deployment is available only in the Databricks Workspace")) } } diff --git a/bundle/config/mutator/apply_presets_test.go b/bundle/config/mutator/apply_presets_test.go index 2f0e9c9c..9d5eb550 100644 --- a/bundle/config/mutator/apply_presets_test.go +++ b/bundle/config/mutator/apply_presets_test.go @@ -9,7 +9,9 @@ import ( "github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config/mutator" "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/bundletest" "github.com/databricks/cli/libs/dbr" + "github.com/databricks/cli/libs/dyn" "github.com/databricks/databricks-sdk-go/service/apps" "github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/jobs" @@ -436,6 +438,7 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) { }, } + bundletest.SetLocation(b, "presets.source_linked_deployment", []dyn.Location{{File: "databricks.yml"}}) diags := bundle.Apply(tt.ctx, b, mutator.ApplyPresets()) if diags.HasError() { t.Fatalf("unexpected error: %v", diags) @@ -443,6 +446,7 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) { if tt.expectedWarning != "" { require.Equal(t, tt.expectedWarning, diags[0].Summary) + require.NotEmpty(t, diags[0].Locations) } require.Equal(t, tt.expectedValue, b.Config.Presets.SourceLinkedDeployment) diff --git a/bundle/config/validate/files_to_sync.go b/bundle/config/validate/files_to_sync.go index 7cdad772..a1427848 100644 --- a/bundle/config/validate/files_to_sync.go +++ b/bundle/config/validate/files_to_sync.go @@ -21,6 +21,12 @@ func (v *filesToSync) Name() string { } func (v *filesToSync) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { + // The user may be intentional about not synchronizing any files. + // In this case, we should not show any warnings. + if len(rb.Config().Sync.Paths) == 0 { + return nil + } + sync, err := files.GetSync(ctx, rb) if err != nil { return diag.FromErr(err) @@ -31,6 +37,7 @@ func (v *filesToSync) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag. return diag.FromErr(err) } + // If there are files to sync, we don't need to show any warnings. if len(fl) != 0 { return nil } diff --git a/bundle/config/validate/files_to_sync_test.go b/bundle/config/validate/files_to_sync_test.go new file mode 100644 index 00000000..2a598fa7 --- /dev/null +++ b/bundle/config/validate/files_to_sync_test.go @@ -0,0 +1,105 @@ +package validate + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/internal/testutil" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/vfs" + sdkconfig "github.com/databricks/databricks-sdk-go/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/iam" + "github.com/databricks/databricks-sdk-go/service/workspace" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestFilesToSync_NoPaths(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Sync: config.Sync{ + Paths: []string{}, + }, + }, + } + + ctx := context.Background() + rb := bundle.ReadOnly(b) + diags := bundle.ApplyReadOnly(ctx, rb, FilesToSync()) + assert.Empty(t, diags) +} + +func setupBundleForFilesToSyncTest(t *testing.T) *bundle.Bundle { + dir := t.TempDir() + + testutil.Touch(t, dir, "file1") + testutil.Touch(t, dir, "file2") + + b := &bundle.Bundle{ + BundleRootPath: dir, + BundleRoot: vfs.MustNew(dir), + SyncRootPath: dir, + SyncRoot: vfs.MustNew(dir), + Config: config.Root{ + Bundle: config.Bundle{ + Target: "default", + }, + Workspace: config.Workspace{ + FilePath: "/this/doesnt/matter", + CurrentUser: &config.User{ + User: &iam.User{}, + }, + }, + Sync: config.Sync{ + // Paths are relative to [SyncRootPath]. + Paths: []string{"."}, + }, + }, + } + + m := mocks.NewMockWorkspaceClient(t) + m.WorkspaceClient.Config = &sdkconfig.Config{ + Host: "https://foo.com", + } + + // The initialization logic in [sync.New] performs a check on the destination path. + // Removing this check at initialization time is tbd... + m.GetMockWorkspaceAPI().EXPECT().GetStatusByPath(mock.Anything, "/this/doesnt/matter").Return(&workspace.ObjectInfo{ + ObjectType: workspace.ObjectTypeDirectory, + }, nil) + + b.SetWorkpaceClient(m.WorkspaceClient) + return b +} + +func TestFilesToSync_EverythingIgnored(t *testing.T) { + b := setupBundleForFilesToSyncTest(t) + + // Ignore all files. + testutil.WriteFile(t, "*\n.*\n", b.BundleRootPath, ".gitignore") + + ctx := context.Background() + rb := bundle.ReadOnly(b) + diags := bundle.ApplyReadOnly(ctx, rb, FilesToSync()) + require.Equal(t, 1, len(diags)) + assert.Equal(t, diag.Warning, diags[0].Severity) + assert.Equal(t, "There are no files to sync, please check your .gitignore", diags[0].Summary) +} + +func TestFilesToSync_EverythingExcluded(t *testing.T) { + b := setupBundleForFilesToSyncTest(t) + + // Exclude all files. + b.Config.Sync.Exclude = []string{"*"} + + ctx := context.Background() + rb := bundle.ReadOnly(b) + diags := bundle.ApplyReadOnly(ctx, rb, FilesToSync()) + require.Equal(t, 1, len(diags)) + assert.Equal(t, diag.Warning, diags[0].Severity) + assert.Equal(t, "There are no files to sync, please check your .gitignore and sync.exclude configuration", diags[0].Summary) +} diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go new file mode 100644 index 00000000..7c159f61 --- /dev/null +++ b/bundle/config/validate/single_node_cluster.go @@ -0,0 +1,137 @@ +package validate + +import ( + "context" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/convert" + "github.com/databricks/cli/libs/log" +) + +// Validates that any single node clusters defined in the bundle are correctly configured. +func SingleNodeCluster() bundle.ReadOnlyMutator { + return &singleNodeCluster{} +} + +type singleNodeCluster struct{} + +func (m *singleNodeCluster) Name() string { + return "validate:SingleNodeCluster" +} + +const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a +valid single node cluster please ensure that the following properties +are correctly set in the cluster specification: + + spark_conf: + spark.databricks.cluster.profile: singleNode + spark.master: local[*] + + custom_tags: + ResourceClass: SingleNode + ` + +const singleNodeWarningSummary = `Single node cluster is not correctly configured` + +func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { + // Check if the user has explicitly set the num_workers to 0. Skip the warning + // if that's not the case. + numWorkers, ok := v.Get("num_workers").AsInt() + if !ok || numWorkers > 0 { + return false + } + + // Convenient type that contains the common fields from compute.ClusterSpec and + // pipelines.PipelineCluster that we are interested in. + type ClusterConf struct { + SparkConf map[string]string `json:"spark_conf"` + CustomTags map[string]string `json:"custom_tags"` + PolicyId string `json:"policy_id"` + } + + conf := &ClusterConf{} + err := convert.ToTyped(conf, v) + if err != nil { + return false + } + + // If the policy id is set, we don't want to show the warning. This is because + // the user might have configured `spark_conf` and `custom_tags` correctly + // in their cluster policy. + if conf.PolicyId != "" { + return false + } + + profile, ok := conf.SparkConf["spark.databricks.cluster.profile"] + if !ok { + log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec") + return true + } + if profile != "singleNode" { + log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile) + return true + } + + master, ok := conf.SparkConf["spark.master"] + if !ok { + log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec") + return true + } + if !strings.HasPrefix(master, "local") { + log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master) + return true + } + + resourceClass, ok := conf.CustomTags["ResourceClass"] + if !ok { + log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec") + return true + } + if resourceClass != "SingleNode" { + log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass) + return true + } + + return false +} + +func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { + diags := diag.Diagnostics{} + + patterns := []dyn.Pattern{ + // Interactive clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()), + // Job clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Job task clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")), + // Job for each task clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")), + // Pipeline clusters + dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()), + } + + for _, p := range patterns { + _, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { + warning := diag.Diagnostic{ + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: v.Locations(), + Paths: []dyn.Path{p}, + } + + if showSingleNodeClusterWarning(ctx, v) { + diags = append(diags, warning) + } + return v, nil + }) + if err != nil { + log.Debugf(ctx, "Error while applying single node cluster validation: %s", err) + } + } + return diags +} diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go new file mode 100644 index 00000000..18771cc0 --- /dev/null +++ b/bundle/config/validate/single_node_cluster_test.go @@ -0,0 +1,566 @@ +package validate + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/bundle/internal/bundletest" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/stretchr/testify/assert" +) + +func failCases() []struct { + name string + sparkConf map[string]string + customTags map[string]string +} { + return []struct { + name string + sparkConf map[string]string + customTags map[string]string + }{ + { + name: "no tags or conf", + }, + { + name: "no tags", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + }, + { + name: "no conf", + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "invalid spark cluster profile", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "invalid", + "spark.master": "local[*]", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "invalid spark.master", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "invalid", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "invalid tags", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + customTags: map[string]string{"ResourceClass": "invalid"}, + }, + { + name: "missing ResourceClass tag", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + customTags: map[string]string{"what": "ever"}, + }, + { + name: "missing spark.master", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + { + name: "missing spark.databricks.cluster.profile", + sparkConf: map[string]string{ + "spark.master": "local[*]", + }, + customTags: map[string]string{"ResourceClass": "SingleNode"}, + }, + } +} + +func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Clusters: map[string]*resources.Cluster{ + "foo": { + ClusterSpec: &compute.ClusterSpec{ + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0)) + }) + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")}, + }, + }, diags) + + }) + } +} + +func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + { + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")}, + }, + }, diags) + }) + } +} + +func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) { + ctx := context.Background() + + for _, tc := range failCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}}) + + // We can't set num_workers to 0 explicitly in the typed configuration. + // Do it on the dyn.Value directly. + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0)) + }) + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Equal(t, diag.Diagnostics{ + { + Severity: diag.Warning, + Summary: singleNodeWarningSummary, + Detail: singleNodeWarningDetail, + Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}}, + Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")}, + }, + }, diags) + }) + } +} + +func passCases() []struct { + name string + numWorkers *int + sparkConf map[string]string + customTags map[string]string + policyId string +} { + zero := 0 + one := 1 + + return []struct { + name string + numWorkers *int + sparkConf map[string]string + customTags map[string]string + policyId string + }{ + { + name: "single node cluster", + sparkConf: map[string]string{ + "spark.databricks.cluster.profile": "singleNode", + "spark.master": "local[*]", + }, + customTags: map[string]string{ + "ResourceClass": "SingleNode", + }, + numWorkers: &zero, + }, + { + name: "num workers is not zero", + numWorkers: &one, + }, + { + name: "num workers is not set", + }, + { + name: "policy id is not empty", + policyId: "policy-abc", + numWorkers: &zero, + }, + } +} + +func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Clusters: map[string]*resources.Cluster{ + "foo": { + ClusterSpec: &compute.ClusterSpec{ + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + JobClusters: []jobs.JobCluster{ + { + NewCluster: compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Pipelines: map[string]*resources.Pipeline{ + "foo": { + PipelineSpec: &pipelines.PipelineSpec{ + Clusters: []pipelines.PipelineCluster{ + { + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} + +func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) { + ctx := context.Background() + + for _, tc := range passCases() { + t.Run(tc.name, func(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "foo": { + JobSettings: &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + ForEachTask: &jobs.ForEachTask{ + Task: jobs.Task{ + NewCluster: &compute.ClusterSpec{ + ClusterName: "my_cluster", + SparkConf: tc.sparkConf, + CustomTags: tc.customTags, + PolicyId: tc.policyId, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if tc.numWorkers != nil { + bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { + return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers)) + }) + } + + diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + assert.Empty(t, diags) + }) + } +} diff --git a/bundle/config/validate/validate.go b/bundle/config/validate/validate.go index 440477e6..eb4c3c3c 100644 --- a/bundle/config/validate/validate.go +++ b/bundle/config/validate/validate.go @@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics ValidateSyncPatterns(), JobTaskClusterSpec(), ValidateFolderPermissions(), + SingleNodeCluster(), )) } diff --git a/bundle/config/variable/lookup.go b/bundle/config/variable/lookup.go index f8cb6719..37e380f1 100755 --- a/bundle/config/variable/lookup.go +++ b/bundle/config/variable/lookup.go @@ -22,6 +22,8 @@ type Lookup struct { Metastore string `json:"metastore,omitempty"` + NotificationDestination string `json:"notification_destination,omitempty"` + Pipeline string `json:"pipeline,omitempty"` Query string `json:"query,omitempty"` @@ -63,6 +65,9 @@ func (l *Lookup) constructResolver() (resolver, error) { if l.Metastore != "" { resolvers = append(resolvers, resolveMetastore{name: l.Metastore}) } + if l.NotificationDestination != "" { + resolvers = append(resolvers, resolveNotificationDestination{name: l.NotificationDestination}) + } if l.Pipeline != "" { resolvers = append(resolvers, resolvePipeline{name: l.Pipeline}) } diff --git a/bundle/config/variable/resolve_notification_destination.go b/bundle/config/variable/resolve_notification_destination.go new file mode 100644 index 00000000..4c4cd892 --- /dev/null +++ b/bundle/config/variable/resolve_notification_destination.go @@ -0,0 +1,46 @@ +package variable + +import ( + "context" + "fmt" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/settings" +) + +type resolveNotificationDestination struct { + name string +} + +func (l resolveNotificationDestination) Resolve(ctx context.Context, w *databricks.WorkspaceClient) (string, error) { + result, err := w.NotificationDestinations.ListAll(ctx, settings.ListNotificationDestinationsRequest{ + // The default page size for this API is 20. + // We use a higher value to make fewer API calls. + PageSize: 200, + }) + if err != nil { + return "", err + } + + // Collect all notification destinations with the given name. + var entities []settings.ListNotificationDestinationsResult + for _, entity := range result { + if entity.DisplayName == l.name { + entities = append(entities, entity) + } + } + + // Return the ID of the first matching notification destination. + switch len(entities) { + case 0: + return "", fmt.Errorf("notification destination named %q does not exist", l.name) + case 1: + return entities[0].Id, nil + default: + return "", fmt.Errorf("there are %d instances of clusters named %q", len(entities), l.name) + } +} + +func (l resolveNotificationDestination) String() string { + return fmt.Sprintf("notification-destination: %s", l.name) +} diff --git a/bundle/config/variable/resolve_notification_destination_test.go b/bundle/config/variable/resolve_notification_destination_test.go new file mode 100644 index 00000000..2b8201d1 --- /dev/null +++ b/bundle/config/variable/resolve_notification_destination_test.go @@ -0,0 +1,82 @@ +package variable + +import ( + "context" + "fmt" + "testing" + + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/settings" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestResolveNotificationDestination_ResolveSuccess(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + + api := m.GetMockNotificationDestinationsAPI() + api.EXPECT(). + ListAll(mock.Anything, mock.Anything). + Return([]settings.ListNotificationDestinationsResult{ + {Id: "1234", DisplayName: "destination"}, + }, nil) + + ctx := context.Background() + l := resolveNotificationDestination{name: "destination"} + result, err := l.Resolve(ctx, m.WorkspaceClient) + require.NoError(t, err) + assert.Equal(t, "1234", result) +} + +func TestResolveNotificationDestination_ResolveError(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + + api := m.GetMockNotificationDestinationsAPI() + api.EXPECT(). + ListAll(mock.Anything, mock.Anything). + Return(nil, fmt.Errorf("bad")) + + ctx := context.Background() + l := resolveNotificationDestination{name: "destination"} + _, err := l.Resolve(ctx, m.WorkspaceClient) + assert.ErrorContains(t, err, "bad") +} + +func TestResolveNotificationDestination_ResolveNotFound(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + + api := m.GetMockNotificationDestinationsAPI() + api.EXPECT(). + ListAll(mock.Anything, mock.Anything). + Return([]settings.ListNotificationDestinationsResult{}, nil) + + ctx := context.Background() + l := resolveNotificationDestination{name: "destination"} + _, err := l.Resolve(ctx, m.WorkspaceClient) + require.Error(t, err) + assert.ErrorContains(t, err, `notification destination named "destination" does not exist`) +} + +func TestResolveNotificationDestination_ResolveMultiple(t *testing.T) { + m := mocks.NewMockWorkspaceClient(t) + + api := m.GetMockNotificationDestinationsAPI() + api.EXPECT(). + ListAll(mock.Anything, mock.Anything). + Return([]settings.ListNotificationDestinationsResult{ + {Id: "1234", DisplayName: "destination"}, + {Id: "5678", DisplayName: "destination"}, + }, nil) + + ctx := context.Background() + l := resolveNotificationDestination{name: "destination"} + _, err := l.Resolve(ctx, m.WorkspaceClient) + require.Error(t, err) + assert.ErrorContains(t, err, `there are 2 instances of clusters named "destination"`) +} + +func TestResolveNotificationDestination_String(t *testing.T) { + l := resolveNotificationDestination{name: "name"} + assert.Equal(t, "notification-destination: name", l.String()) +} diff --git a/go.mod b/go.mod index 9ae5fde0..73b9984d 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( golang.org/x/mod v0.22.0 golang.org/x/oauth2 v0.24.0 golang.org/x/sync v0.9.0 - golang.org/x/term v0.25.0 + golang.org/x/term v0.26.0 golang.org/x/text v0.20.0 gopkg.in/ini.v1 v1.67.0 // Apache 2.0 gopkg.in/yaml.v3 v3.0.1 @@ -64,7 +64,7 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect golang.org/x/crypto v0.24.0 // indirect golang.org/x/net v0.26.0 // indirect - golang.org/x/sys v0.26.0 // indirect + golang.org/x/sys v0.27.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/api v0.182.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect diff --git a/go.sum b/go.sum index 2bfcfb2f..928827d9 100644 --- a/go.sum +++ b/go.sum @@ -212,10 +212,10 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= -golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU= +golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=