Compare commits

...

4 Commits

Author SHA1 Message Date
shreyas-goenka 3e91d3c8cb
Merge be2d802d13 into b323703c1b 2024-11-22 21:35:29 +05:30
shreyas-goenka b323703c1b
Add validation for single node clusters (#1909)
## Changes
This PR adds a warning validating that the configuration for a single
node cluster is valid for interactive, job, job-task, and pipeline
clusters.

Note: We skip the validation if a cluster policy is configured because
the policy is likely to configure `spark_conf` / `custom_tags` itself.

Note: Terrform originally only had validation for interactive, job, and
job-task clusters. This PR adding the validation for pipeline clusters
as well is new.

This PR follows the same logic as we used to have in Terraform. The
validation was removed from Terraform because we had no way to demote
the error to a warning:
https://github.com/databricks/terraform-provider-databricks/pull/4222

### Background
Single-node clusters require `spark_conf` and `custom_tags` to be
correctly set in the cluster definition for them to function optimally.
The cluster will be created even if incorrectly configured, but its
performance will not be great.

For example, if both `spark_conf` and `custom_tags` are not set and
`num_workers` is 0, then only the driver process will be launched on the
cluster compute instance thus leading to sub-optimal utilization of
available compute resources and no parallelization across worker
processes when processing a spark query.

### Issue

This PR addresses some issues reported in
https://github.com/databricks/cli/issues/1546

## Tests
Unit tests and manually.

Example output of the warning:
```
➜  bundle-playground git:(master) ✗ cli bundle validate
Warning: Single node cluster is not correctly configured
  at resources.pipelines.bar.clusters[0]
  in databricks.yml:29:11

num_workers should be 0 only for single-node clusters. To create a
valid single node cluster please ensure that the following properties
are correctly set in the cluster specification:

  spark_conf:
    spark.databricks.cluster.profile: singleNode
    spark.master: local[*]

  custom_tags:
    ResourceClass: SingleNode
  

Name: foobar
Target: default
Workspace:
  User: shreyas.goenka@databricks.com
  Path: /Workspace/Users/shreyas.goenka@databricks.com/.bundle/foobar/default

Found 1 warning
```
2024-11-22 15:48:09 +00:00
Ilya Kuznetsov 490dd058aa
Extended message for warning when source-linked mode is used outside of the workspace (#1929)
## Changes

Added path and locations to the warning which displayed when
source-linked mode is used outside of the workspace
2024-11-22 14:44:33 +00:00
Shreyas Goenka be2d802d13
Skip prefixes for schema names when catalog is already namespaced to current user 2024-10-14 16:43:58 +02:00
6 changed files with 777 additions and 2 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/databricks/cli/libs/dbr" "github.com/databricks/cli/libs/dbr"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/textutil" "github.com/databricks/cli/libs/textutil"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
@ -189,6 +190,14 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
diags = diags.Extend(diag.Errorf("schema %s is not defined", key)) diags = diags.Extend(diag.Errorf("schema %s is not defined", key))
continue continue
} }
// If the catalog is already namespaced to the current user, we don't need
// to prefix the schema name since it already falls under the user's namespace.
if containsUserIdentity(s.CatalogName, b.Config.Workspace.CurrentUser) {
log.Debugf(ctx, "Skipping schema %s since catalog %s already contains the user's identity", s.Name, s.CatalogName)
continue
}
s.Name = normalizePrefix(prefix) + s.Name s.Name = normalizePrefix(prefix) + s.Name
// HTTP API for schemas doesn't yet support tags. It's only supported in // HTTP API for schemas doesn't yet support tags. It's only supported in
// the Databricks UI and via the SQL API. // the Databricks UI and via the SQL API.
@ -225,9 +234,21 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
if config.IsExplicitlyEnabled((b.Config.Presets.SourceLinkedDeployment)) { if config.IsExplicitlyEnabled((b.Config.Presets.SourceLinkedDeployment)) {
isDatabricksWorkspace := dbr.RunsOnRuntime(ctx) && strings.HasPrefix(b.SyncRootPath, "/Workspace/") isDatabricksWorkspace := dbr.RunsOnRuntime(ctx) && strings.HasPrefix(b.SyncRootPath, "/Workspace/")
if !isDatabricksWorkspace { if !isDatabricksWorkspace {
target := b.Config.Bundle.Target
path := dyn.NewPath(dyn.Key("targets"), dyn.Key(target), dyn.Key("presets"), dyn.Key("source_linked_deployment"))
diags = diags.Append(
diag.Diagnostic{
Severity: diag.Warning,
Summary: "source-linked deployment is available only in the Databricks Workspace",
Paths: []dyn.Path{
path,
},
Locations: b.Config.GetLocations(path[2:].String()),
},
)
disabled := false disabled := false
b.Config.Presets.SourceLinkedDeployment = &disabled b.Config.Presets.SourceLinkedDeployment = &disabled
diags = diags.Extend(diag.Warningf("source-linked deployment is available only in the Databricks Workspace"))
} }
} }

View File

@ -9,8 +9,11 @@ import (
"github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator" "github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/libs/dbr" "github.com/databricks/cli/libs/dbr"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -98,12 +101,53 @@ func TestApplyPresetsPrefixForUcSchema(t *testing.T) {
}, },
want: "schema1", want: "schema1",
}, },
{
name: "skip prefix because catalog contains short name",
prefix: "[prefix]",
schema: &resources.Schema{
CreateSchema: &catalog.CreateSchema{
Name: "schema1",
CatalogName: "dev_john_smith_test_catalog",
},
},
want: "schema1",
},
{
name: "skip prefix because catalog contains email",
prefix: "[prefix]",
schema: &resources.Schema{
CreateSchema: &catalog.CreateSchema{
Name: "schema1",
CatalogName: "dev_john.smith@databricks.com_test_catalog",
},
},
want: "schema1",
},
{
name: "add prefix because catalog is not namespaced to user",
prefix: "[prefix]",
schema: &resources.Schema{
CreateSchema: &catalog.CreateSchema{
Name: "schema1",
CatalogName: "test_catalog",
},
},
want: "prefix_schema1",
},
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Workspace: config.Workspace{
CurrentUser: &config.User{
ShortName: "john_smith",
User: &iam.User{
UserName: "john.smith@databricks.com",
},
},
},
Resources: config.Resources{ Resources: config.Resources{
Schemas: map[string]*resources.Schema{ Schemas: map[string]*resources.Schema{
"schema1": tt.schema, "schema1": tt.schema,
@ -435,6 +479,7 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) {
}, },
} }
bundletest.SetLocation(b, "presets.source_linked_deployment", []dyn.Location{{File: "databricks.yml"}})
diags := bundle.Apply(tt.ctx, b, mutator.ApplyPresets()) diags := bundle.Apply(tt.ctx, b, mutator.ApplyPresets())
if diags.HasError() { if diags.HasError() {
t.Fatalf("unexpected error: %v", diags) t.Fatalf("unexpected error: %v", diags)
@ -442,6 +487,7 @@ func TestApplyPresetsSourceLinkedDeployment(t *testing.T) {
if tt.expectedWarning != "" { if tt.expectedWarning != "" {
require.Equal(t, tt.expectedWarning, diags[0].Summary) require.Equal(t, tt.expectedWarning, diags[0].Summary)
require.NotEmpty(t, diags[0].Locations)
} }
require.Equal(t, tt.expectedValue, b.Config.Presets.SourceLinkedDeployment) require.Equal(t, tt.expectedValue, b.Config.Presets.SourceLinkedDeployment)

View File

@ -72,6 +72,10 @@ func transformDevelopmentMode(ctx context.Context, b *bundle.Bundle) {
} }
} }
func containsUserIdentity(s string, u *config.User) bool {
return strings.Contains(s, u.ShortName) || strings.Contains(s, u.UserName)
}
func validateDevelopmentMode(b *bundle.Bundle) diag.Diagnostics { func validateDevelopmentMode(b *bundle.Bundle) diag.Diagnostics {
var diags diag.Diagnostics var diags diag.Diagnostics
p := b.Config.Presets p := b.Config.Presets
@ -101,7 +105,7 @@ func validateDevelopmentMode(b *bundle.Bundle) diag.Diagnostics {
diags = diags.Extend(diag.Errorf("%s must start with '~/' or contain the current username to ensure uniqueness when using 'mode: development'", path)) diags = diags.Extend(diag.Errorf("%s must start with '~/' or contain the current username to ensure uniqueness when using 'mode: development'", path))
} }
} }
if p.NamePrefix != "" && !strings.Contains(p.NamePrefix, u.ShortName) && !strings.Contains(p.NamePrefix, u.UserName) { if p.NamePrefix != "" && !containsUserIdentity(p.NamePrefix, u) {
// Resources such as pipelines require a unique name, e.g. '[dev steve] my_pipeline'. // Resources such as pipelines require a unique name, e.g. '[dev steve] my_pipeline'.
// For this reason we require the name prefix to contain the current username; // For this reason we require the name prefix to contain the current username;
// it's a pitfall for users if they don't include it and later find out that // it's a pitfall for users if they don't include it and later find out that

View File

@ -0,0 +1,137 @@
package validate
import (
"context"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/log"
)
// Validates that any single node clusters defined in the bundle are correctly configured.
func SingleNodeCluster() bundle.ReadOnlyMutator {
return &singleNodeCluster{}
}
type singleNodeCluster struct{}
func (m *singleNodeCluster) Name() string {
return "validate:SingleNodeCluster"
}
const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a
valid single node cluster please ensure that the following properties
are correctly set in the cluster specification:
spark_conf:
spark.databricks.cluster.profile: singleNode
spark.master: local[*]
custom_tags:
ResourceClass: SingleNode
`
const singleNodeWarningSummary = `Single node cluster is not correctly configured`
func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool {
// Check if the user has explicitly set the num_workers to 0. Skip the warning
// if that's not the case.
numWorkers, ok := v.Get("num_workers").AsInt()
if !ok || numWorkers > 0 {
return false
}
// Convenient type that contains the common fields from compute.ClusterSpec and
// pipelines.PipelineCluster that we are interested in.
type ClusterConf struct {
SparkConf map[string]string `json:"spark_conf"`
CustomTags map[string]string `json:"custom_tags"`
PolicyId string `json:"policy_id"`
}
conf := &ClusterConf{}
err := convert.ToTyped(conf, v)
if err != nil {
return false
}
// If the policy id is set, we don't want to show the warning. This is because
// the user might have configured `spark_conf` and `custom_tags` correctly
// in their cluster policy.
if conf.PolicyId != "" {
return false
}
profile, ok := conf.SparkConf["spark.databricks.cluster.profile"]
if !ok {
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec")
return true
}
if profile != "singleNode" {
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile)
return true
}
master, ok := conf.SparkConf["spark.master"]
if !ok {
log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec")
return true
}
if !strings.HasPrefix(master, "local") {
log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master)
return true
}
resourceClass, ok := conf.CustomTags["ResourceClass"]
if !ok {
log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec")
return true
}
if resourceClass != "SingleNode" {
log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass)
return true
}
return false
}
func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{}
patterns := []dyn.Pattern{
// Interactive clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()),
// Job clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")),
// Job task clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")),
// Job for each task clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")),
// Pipeline clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()),
}
for _, p := range patterns {
_, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
warning := diag.Diagnostic{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: v.Locations(),
Paths: []dyn.Path{p},
}
if showSingleNodeClusterWarning(ctx, v) {
diags = append(diags, warning)
}
return v, nil
})
if err != nil {
log.Debugf(ctx, "Error while applying single node cluster validation: %s", err)
}
}
return diags
}

View File

@ -0,0 +1,566 @@
package validate
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
)
func failCases() []struct {
name string
sparkConf map[string]string
customTags map[string]string
} {
return []struct {
name string
sparkConf map[string]string
customTags map[string]string
}{
{
name: "no tags or conf",
},
{
name: "no tags",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
},
{
name: "no conf",
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "invalid spark cluster profile",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "invalid",
"spark.master": "local[*]",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "invalid spark.master",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "invalid",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "invalid tags",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
customTags: map[string]string{"ResourceClass": "invalid"},
},
{
name: "missing ResourceClass tag",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
customTags: map[string]string{"what": "ever"},
},
{
name: "missing spark.master",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
{
name: "missing spark.databricks.cluster.profile",
sparkConf: map[string]string{
"spark.master": "local[*]",
},
customTags: map[string]string{"ResourceClass": "SingleNode"},
},
}
}
func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Clusters: map[string]*resources.Cluster{
"foo": {
ClusterSpec: &compute.ClusterSpec{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{
{
NewCluster: compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"foo": {
PipelineSpec: &pipelines.PipelineSpec{
Clusters: []pipelines.PipelineCluster{
{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")},
},
}, diags)
})
}
}
func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) {
ctx := context.Background()
for _, tc := range failCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
},
},
},
},
},
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster", []dyn.Location{{File: "e.yml", Line: 1, Column: 1}})
// We can't set num_workers to 0 explicitly in the typed configuration.
// Do it on the dyn.Value directly.
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0))
})
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Equal(t, diag.Diagnostics{
{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: []dyn.Location{{File: "e.yml", Line: 1, Column: 1}},
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].for_each_task.task.new_cluster")},
},
}, diags)
})
}
}
func passCases() []struct {
name string
numWorkers *int
sparkConf map[string]string
customTags map[string]string
policyId string
} {
zero := 0
one := 1
return []struct {
name string
numWorkers *int
sparkConf map[string]string
customTags map[string]string
policyId string
}{
{
name: "single node cluster",
sparkConf: map[string]string{
"spark.databricks.cluster.profile": "singleNode",
"spark.master": "local[*]",
},
customTags: map[string]string{
"ResourceClass": "SingleNode",
},
numWorkers: &zero,
},
{
name: "num workers is not zero",
numWorkers: &one,
},
{
name: "num workers is not set",
},
{
name: "policy id is not empty",
policyId: "policy-abc",
numWorkers: &zero,
},
}
}
func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Clusters: map[string]*resources.Cluster{
"foo": {
ClusterSpec: &compute.ClusterSpec{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
JobClusters: []jobs.JobCluster{
{
NewCluster: compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"foo": {
PipelineSpec: &pipelines.PipelineSpec{
Clusters: []pipelines.PipelineCluster{
{
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}
func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) {
ctx := context.Background()
for _, tc := range passCases() {
t.Run(tc.name, func(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"foo": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
NewCluster: &compute.ClusterSpec{
ClusterName: "my_cluster",
SparkConf: tc.sparkConf,
CustomTags: tc.customTags,
PolicyId: tc.policyId,
},
},
},
},
},
},
},
},
},
},
}
if tc.numWorkers != nil {
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(*tc.numWorkers))
})
}
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
assert.Empty(t, diags)
})
}
}

View File

@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
ValidateSyncPatterns(), ValidateSyncPatterns(),
JobTaskClusterSpec(), JobTaskClusterSpec(),
ValidateFolderPermissions(), ValidateFolderPermissions(),
SingleNodeCluster(),
)) ))
} }