package validate import ( "context" "strings" "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/dyn" "github.com/databricks/cli/libs/dyn/convert" "github.com/databricks/cli/libs/log" ) // Validates that any single node clusters defined in the bundle are correctly configured. func SingleNodeCluster() bundle.ReadOnlyMutator { return &singleNodeCluster{} } type singleNodeCluster struct{} func (m *singleNodeCluster) Name() string { return "validate:SingleNodeCluster" } const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a valid single node cluster please ensure that the following properties are correctly set in the cluster specification: spark_conf: spark.databricks.cluster.profile: singleNode spark.master: local[*] custom_tags: ResourceClass: SingleNode ` const singleNodeWarningSummary = `Single node cluster is not correctly configured` func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { // Check if the user has explicitly set the num_workers to 0. Skip the warning // if that's not the case. numWorkers, ok := v.Get("num_workers").AsInt() if !ok || numWorkers > 0 { return false } // Convenient type that contains the common fields from compute.ClusterSpec and // pipelines.PipelineCluster that we are interested in. type ClusterConf struct { SparkConf map[string]string `json:"spark_conf"` CustomTags map[string]string `json:"custom_tags"` PolicyId string `json:"policy_id"` } conf := &ClusterConf{} err := convert.ToTyped(conf, v) if err != nil { return false } // If the policy id is set, we don't want to show the warning. This is because // the user might have configured `spark_conf` and `custom_tags` correctly // in their cluster policy. if conf.PolicyId != "" { return false } profile, ok := conf.SparkConf["spark.databricks.cluster.profile"] if !ok { log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec") return true } if profile != "singleNode" { log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile) return true } master, ok := conf.SparkConf["spark.master"] if !ok { log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec") return true } if !strings.HasPrefix(master, "local") { log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master) return true } resourceClass, ok := conf.CustomTags["ResourceClass"] if !ok { log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec") return true } if resourceClass != "SingleNode" { log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass) return true } return false } func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { diags := diag.Diagnostics{} patterns := []dyn.Pattern{ // Interactive clusters dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()), // Job clusters dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")), // Job task clusters dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")), // Pipeline clusters dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()), } for _, p := range patterns { _, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { warning := diag.Diagnostic{ Severity: diag.Warning, Summary: singleNodeWarningSummary, Detail: singleNodeWarningDetail, Locations: v.Locations(), Paths: []dyn.Path{p}, } if showSingleNodeClusterWarning(ctx, v) { diags = append(diags, warning) } return v, nil }) if err != nil { log.Debugf(ctx, "Error while applying single node cluster validation: %s", err) } } return diags }