databricks-cli/bundle/config/validate/single_node_cluster.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

138 lines
4.3 KiB
Go
Raw Normal View History

Add validation for single node clusters (#1909) ## Changes This PR adds a warning validating that the configuration for a single node cluster is valid for interactive, job, job-task, and pipeline clusters. Note: We skip the validation if a cluster policy is configured because the policy is likely to configure `spark_conf` / `custom_tags` itself. Note: Terrform originally only had validation for interactive, job, and job-task clusters. This PR adding the validation for pipeline clusters as well is new. This PR follows the same logic as we used to have in Terraform. The validation was removed from Terraform because we had no way to demote the error to a warning: https://github.com/databricks/terraform-provider-databricks/pull/4222 ### Background Single-node clusters require `spark_conf` and `custom_tags` to be correctly set in the cluster definition for them to function optimally. The cluster will be created even if incorrectly configured, but its performance will not be great. For example, if both `spark_conf` and `custom_tags` are not set and `num_workers` is 0, then only the driver process will be launched on the cluster compute instance thus leading to sub-optimal utilization of available compute resources and no parallelization across worker processes when processing a spark query. ### Issue This PR addresses some issues reported in https://github.com/databricks/cli/issues/1546 ## Tests Unit tests and manually. Example output of the warning: ``` ➜ bundle-playground git:(master) ✗ cli bundle validate Warning: Single node cluster is not correctly configured at resources.pipelines.bar.clusters[0] in databricks.yml:29:11 num_workers should be 0 only for single-node clusters. To create a valid single node cluster please ensure that the following properties are correctly set in the cluster specification: spark_conf: spark.databricks.cluster.profile: singleNode spark.master: local[*] custom_tags: ResourceClass: SingleNode Name: foobar Target: default Workspace: User: shreyas.goenka@databricks.com Path: /Workspace/Users/shreyas.goenka@databricks.com/.bundle/foobar/default Found 1 warning ```
2024-11-22 15:48:09 +00:00
package validate
import (
"context"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/log"
)
// Validates that any single node clusters defined in the bundle are correctly configured.
func SingleNodeCluster() bundle.ReadOnlyMutator {
return &singleNodeCluster{}
}
type singleNodeCluster struct{}
func (m *singleNodeCluster) Name() string {
return "validate:SingleNodeCluster"
}
const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a
valid single node cluster please ensure that the following properties
are correctly set in the cluster specification:
spark_conf:
spark.databricks.cluster.profile: singleNode
spark.master: local[*]
custom_tags:
ResourceClass: SingleNode
`
const singleNodeWarningSummary = `Single node cluster is not correctly configured`
func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool {
// Check if the user has explicitly set the num_workers to 0. Skip the warning
// if that's not the case.
numWorkers, ok := v.Get("num_workers").AsInt()
if !ok || numWorkers > 0 {
return false
}
// Convenient type that contains the common fields from compute.ClusterSpec and
// pipelines.PipelineCluster that we are interested in.
type ClusterConf struct {
SparkConf map[string]string `json:"spark_conf"`
CustomTags map[string]string `json:"custom_tags"`
PolicyId string `json:"policy_id"`
}
conf := &ClusterConf{}
err := convert.ToTyped(conf, v)
if err != nil {
return false
}
// If the policy id is set, we don't want to show the warning. This is because
// the user might have configured `spark_conf` and `custom_tags` correctly
// in their cluster policy.
if conf.PolicyId != "" {
return false
}
profile, ok := conf.SparkConf["spark.databricks.cluster.profile"]
if !ok {
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec")
return true
}
if profile != "singleNode" {
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile)
return true
}
master, ok := conf.SparkConf["spark.master"]
if !ok {
log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec")
return true
}
if !strings.HasPrefix(master, "local") {
log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master)
return true
}
resourceClass, ok := conf.CustomTags["ResourceClass"]
if !ok {
log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec")
return true
}
if resourceClass != "SingleNode" {
log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass)
return true
}
return false
}
func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{}
patterns := []dyn.Pattern{
// Interactive clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()),
// Job clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")),
// Job task clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")),
// Job for each task clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")),
// Pipeline clusters
dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()),
}
for _, p := range patterns {
_, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
warning := diag.Diagnostic{
Severity: diag.Warning,
Summary: singleNodeWarningSummary,
Detail: singleNodeWarningDetail,
Locations: v.Locations(),
Paths: []dyn.Path{p},
}
if showSingleNodeClusterWarning(ctx, v) {
diags = append(diags, warning)
}
return v, nil
})
if err != nil {
log.Debugf(ctx, "Error while applying single node cluster validation: %s", err)
}
}
return diags
}