mirror of https://github.com/databricks/cli.git
138 lines
4.3 KiB
Go
138 lines
4.3 KiB
Go
package validate
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
|
|
"github.com/databricks/cli/bundle"
|
|
"github.com/databricks/cli/libs/diag"
|
|
"github.com/databricks/cli/libs/dyn"
|
|
"github.com/databricks/cli/libs/dyn/convert"
|
|
"github.com/databricks/cli/libs/log"
|
|
)
|
|
|
|
// Validates that any single node clusters defined in the bundle are correctly configured.
|
|
func SingleNodeCluster() bundle.ReadOnlyMutator {
|
|
return &singleNodeCluster{}
|
|
}
|
|
|
|
type singleNodeCluster struct{}
|
|
|
|
func (m *singleNodeCluster) Name() string {
|
|
return "validate:SingleNodeCluster"
|
|
}
|
|
|
|
const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a
|
|
valid single node cluster please ensure that the following properties
|
|
are correctly set in the cluster specification:
|
|
|
|
spark_conf:
|
|
spark.databricks.cluster.profile: singleNode
|
|
spark.master: local[*]
|
|
|
|
custom_tags:
|
|
ResourceClass: SingleNode
|
|
`
|
|
|
|
const singleNodeWarningSummary = `Single node cluster is not correctly configured`
|
|
|
|
func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool {
|
|
// Check if the user has explicitly set the num_workers to 0. Skip the warning
|
|
// if that's not the case.
|
|
numWorkers, ok := v.Get("num_workers").AsInt()
|
|
if !ok || numWorkers > 0 {
|
|
return false
|
|
}
|
|
|
|
// Convenient type that contains the common fields from compute.ClusterSpec and
|
|
// pipelines.PipelineCluster that we are interested in.
|
|
type ClusterConf struct {
|
|
SparkConf map[string]string `json:"spark_conf"`
|
|
CustomTags map[string]string `json:"custom_tags"`
|
|
PolicyId string `json:"policy_id"`
|
|
}
|
|
|
|
conf := &ClusterConf{}
|
|
err := convert.ToTyped(conf, v)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
// If the policy id is set, we don't want to show the warning. This is because
|
|
// the user might have configured `spark_conf` and `custom_tags` correctly
|
|
// in their cluster policy.
|
|
if conf.PolicyId != "" {
|
|
return false
|
|
}
|
|
|
|
profile, ok := conf.SparkConf["spark.databricks.cluster.profile"]
|
|
if !ok {
|
|
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec")
|
|
return true
|
|
}
|
|
if profile != "singleNode" {
|
|
log.Debugf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile)
|
|
return true
|
|
}
|
|
|
|
master, ok := conf.SparkConf["spark.master"]
|
|
if !ok {
|
|
log.Debugf(ctx, "spark_conf spark.master not found in single-node cluster spec")
|
|
return true
|
|
}
|
|
if !strings.HasPrefix(master, "local") {
|
|
log.Debugf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master)
|
|
return true
|
|
}
|
|
|
|
resourceClass, ok := conf.CustomTags["ResourceClass"]
|
|
if !ok {
|
|
log.Debugf(ctx, "custom_tag ResourceClass not found in single-node cluster spec")
|
|
return true
|
|
}
|
|
if resourceClass != "SingleNode" {
|
|
log.Debugf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
|
diags := diag.Diagnostics{}
|
|
|
|
patterns := []dyn.Pattern{
|
|
// Interactive clusters
|
|
dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()),
|
|
// Job clusters
|
|
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")),
|
|
// Job task clusters
|
|
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")),
|
|
// Job for each task clusters
|
|
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("for_each_task"), dyn.Key("task"), dyn.Key("new_cluster")),
|
|
// Pipeline clusters
|
|
dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()),
|
|
}
|
|
|
|
for _, p := range patterns {
|
|
_, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
|
warning := diag.Diagnostic{
|
|
Severity: diag.Warning,
|
|
Summary: singleNodeWarningSummary,
|
|
Detail: singleNodeWarningDetail,
|
|
Locations: v.Locations(),
|
|
Paths: []dyn.Path{p},
|
|
}
|
|
|
|
if showSingleNodeClusterWarning(ctx, v) {
|
|
diags = append(diags, warning)
|
|
}
|
|
return v, nil
|
|
})
|
|
if err != nil {
|
|
log.Debugf(ctx, "Error while applying single node cluster validation: %s", err)
|
|
}
|
|
}
|
|
return diags
|
|
}
|