mirror of https://github.com/databricks/cli.git
Added support for creating all-purpose clusters
This commit is contained in:
parent
242d4b51ed
commit
c55a2dcdb6
|
@ -155,12 +155,25 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
|||
|
||||
// Schemas: Prefix
|
||||
for i := range r.Schemas {
|
||||
prefix = "dev_" + b.Config.Workspace.CurrentUser.ShortName + "_"
|
||||
r.Schemas[i].Name = prefix + r.Schemas[i].Name
|
||||
schemaPrefix := "dev_" + b.Config.Workspace.CurrentUser.ShortName + "_"
|
||||
r.Schemas[i].Name = schemaPrefix + r.Schemas[i].Name
|
||||
// HTTP API for schemas doesn't yet support tags. It's only supported in
|
||||
// the Databricks UI and via the SQL API.
|
||||
}
|
||||
|
||||
// Clusters: Prefix, Tags
|
||||
for _, c := range r.Clusters {
|
||||
c.ClusterName = prefix + c.ClusterName
|
||||
if c.CustomTags == nil {
|
||||
c.CustomTags = make(map[string]string)
|
||||
}
|
||||
for _, tag := range tags {
|
||||
if c.CustomTags[tag.Key] == "" {
|
||||
c.CustomTags[tag.Key] = tag.Value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/databricks/cli/libs/tags"
|
||||
sdkconfig "github.com/databricks/databricks-sdk-go/config"
|
||||
"github.com/databricks/databricks-sdk-go/service/catalog"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
"github.com/databricks/databricks-sdk-go/service/iam"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/databricks/databricks-sdk-go/service/ml"
|
||||
|
@ -119,6 +120,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle {
|
|||
Schemas: map[string]*resources.Schema{
|
||||
"schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}},
|
||||
},
|
||||
Clusters: map[string]*resources.Cluster{
|
||||
"cluster1": {ClusterSpec: &compute.ClusterSpec{ClusterName: "cluster1", SparkVersion: "13.2.x", NumWorkers: 1}},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Use AWS implementation for testing.
|
||||
|
@ -177,6 +181,9 @@ func TestProcessTargetModeDevelopment(t *testing.T) {
|
|||
|
||||
// Schema 1
|
||||
assert.Equal(t, "dev_lennart_schema1", b.Config.Resources.Schemas["schema1"].Name)
|
||||
|
||||
// Clusters
|
||||
assert.Equal(t, "[dev lennart] cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
|
||||
}
|
||||
|
||||
func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) {
|
||||
|
@ -271,6 +278,7 @@ func TestProcessTargetModeDefault(t *testing.T) {
|
|||
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
|
||||
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
|
||||
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
|
||||
assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
|
||||
}
|
||||
|
||||
func TestProcessTargetModeProduction(t *testing.T) {
|
||||
|
@ -302,6 +310,7 @@ func TestProcessTargetModeProduction(t *testing.T) {
|
|||
b.Config.Resources.Experiments["experiment2"].Permissions = permissions
|
||||
b.Config.Resources.Models["model1"].Permissions = permissions
|
||||
b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Permissions = permissions
|
||||
b.Config.Resources.Clusters["cluster1"].Permissions = permissions
|
||||
|
||||
diags = validateProductionMode(context.Background(), b, false)
|
||||
require.NoError(t, diags.Error())
|
||||
|
@ -312,6 +321,7 @@ func TestProcessTargetModeProduction(t *testing.T) {
|
|||
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
|
||||
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
|
||||
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
|
||||
assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
|
||||
}
|
||||
|
||||
func TestProcessTargetModeProductionOkForPrincipal(t *testing.T) {
|
||||
|
|
|
@ -32,6 +32,7 @@ func allResourceTypes(t *testing.T) []string {
|
|||
// the dyn library gives us the correct list of all resources supported. Please
|
||||
// also update this check when adding a new resource
|
||||
require.Equal(t, []string{
|
||||
"clusters",
|
||||
"experiments",
|
||||
"jobs",
|
||||
"model_serving_endpoints",
|
||||
|
@ -133,6 +134,7 @@ func TestRunAsErrorForUnsupportedResources(t *testing.T) {
|
|||
// some point in the future. These resources are (implicitly) on the deny list, since
|
||||
// they are not on the allow list below.
|
||||
allowList := []string{
|
||||
"clusters",
|
||||
"jobs",
|
||||
"models",
|
||||
"registered_models",
|
||||
|
|
|
@ -19,6 +19,7 @@ type Resources struct {
|
|||
RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"`
|
||||
QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"`
|
||||
Schemas map[string]*resources.Schema `json:"schemas,omitempty"`
|
||||
Clusters map[string]*resources.Cluster `json:"clusters,omitempty"`
|
||||
}
|
||||
|
||||
type ConfigResource interface {
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package resources
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/databricks/cli/libs/log"
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/marshal"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
)
|
||||
|
||||
type Cluster struct {
|
||||
ID string `json:"id,omitempty" bundle:"readonly"`
|
||||
Permissions []Permission `json:"permissions,omitempty"`
|
||||
ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
|
||||
|
||||
*compute.ClusterSpec
|
||||
}
|
||||
|
||||
func (s *Cluster) UnmarshalJSON(b []byte) error {
|
||||
return marshal.Unmarshal(b, s)
|
||||
}
|
||||
|
||||
func (s Cluster) MarshalJSON() ([]byte, error) {
|
||||
return marshal.Marshal(s)
|
||||
}
|
||||
|
||||
func (s *Cluster) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
|
||||
_, err := w.Clusters.GetByClusterId(ctx, id)
|
||||
if err != nil {
|
||||
log.Debugf(ctx, "cluster %s does not exist", id)
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (s *Cluster) TerraformResourceName() string {
|
||||
return "databricks_cluster"
|
||||
}
|
|
@ -231,6 +231,13 @@ func BundleToTerraform(config *config.Root) *schema.Root {
|
|||
tfroot.Resource.QualityMonitor[k] = &dst
|
||||
}
|
||||
|
||||
for k, src := range config.Resources.Clusters {
|
||||
noResources = false
|
||||
var dst schema.ResourceCluster
|
||||
conv(src, &dst)
|
||||
tfroot.Resource.Cluster[k] = &dst
|
||||
}
|
||||
|
||||
// We explicitly set "resource" to nil to omit it from a JSON encoding.
|
||||
// This is required because the terraform CLI requires >= 1 resources defined
|
||||
// if the "resource" property is used in a .tf.json file.
|
||||
|
@ -394,6 +401,16 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
|
|||
}
|
||||
cur.ID = instance.Attributes.ID
|
||||
config.Resources.Schemas[resource.Name] = cur
|
||||
case "databricks_cluster":
|
||||
if config.Resources.Clusters == nil {
|
||||
config.Resources.Clusters = make(map[string]*resources.Cluster)
|
||||
}
|
||||
cur := config.Resources.Clusters[resource.Name]
|
||||
if cur == nil {
|
||||
cur = &resources.Cluster{ModifiedStatus: resources.ModifiedStatusDeleted}
|
||||
}
|
||||
cur.ID = instance.Attributes.ID
|
||||
config.Resources.Clusters[resource.Name] = cur
|
||||
case "databricks_permissions":
|
||||
case "databricks_grants":
|
||||
// Ignore; no need to pull these back into the configuration.
|
||||
|
@ -443,6 +460,11 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
|
|||
src.ModifiedStatus = resources.ModifiedStatusCreated
|
||||
}
|
||||
}
|
||||
for _, src := range config.Resources.Clusters {
|
||||
if src.ModifiedStatus == "" && src.ID == "" {
|
||||
src.ModifiedStatus = resources.ModifiedStatusCreated
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -663,6 +663,14 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
|
|||
{Attributes: stateInstanceAttributes{ID: "1"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "databricks_cluster",
|
||||
Mode: "managed",
|
||||
Name: "test_cluster",
|
||||
Instances: []stateResourceInstance{
|
||||
{Attributes: stateInstanceAttributes{ID: "1"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := TerraformToBundle(&tfState, &config)
|
||||
|
@ -692,6 +700,9 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
|
|||
assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID)
|
||||
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus)
|
||||
|
||||
assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID)
|
||||
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster"].ModifiedStatus)
|
||||
|
||||
AssertFullResourceCoverage(t, &config)
|
||||
}
|
||||
|
||||
|
@ -754,6 +765,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*resources.Cluster{
|
||||
"test_cluster": {
|
||||
ClusterSpec: &compute.ClusterSpec{
|
||||
ClusterName: "test_cluster",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
var tfState = resourcesState{
|
||||
|
@ -786,6 +804,9 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
|
|||
assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID)
|
||||
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus)
|
||||
|
||||
assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ID)
|
||||
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster"].ModifiedStatus)
|
||||
|
||||
AssertFullResourceCoverage(t, &config)
|
||||
}
|
||||
|
||||
|
@ -888,6 +909,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*resources.Cluster{
|
||||
"test_cluster": {
|
||||
ClusterSpec: &compute.ClusterSpec{
|
||||
ClusterName: "test_cluster",
|
||||
},
|
||||
},
|
||||
"test_cluster_new": {
|
||||
ClusterSpec: &compute.ClusterSpec{
|
||||
ClusterName: "test_cluster_new",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
var tfState = resourcesState{
|
||||
|
@ -1020,6 +1053,22 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
|
|||
{Attributes: stateInstanceAttributes{ID: "2"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "databricks_cluster",
|
||||
Mode: "managed",
|
||||
Name: "test_cluster",
|
||||
Instances: []stateResourceInstance{
|
||||
{Attributes: stateInstanceAttributes{ID: "1"}},
|
||||
},
|
||||
},
|
||||
{
|
||||
Type: "databricks_cluster",
|
||||
Mode: "managed",
|
||||
Name: "test_cluster_old",
|
||||
Instances: []stateResourceInstance{
|
||||
{Attributes: stateInstanceAttributes{ID: "2"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err := TerraformToBundle(&tfState, &config)
|
||||
|
@ -1081,6 +1130,13 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
|
|||
assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID)
|
||||
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus)
|
||||
|
||||
assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID)
|
||||
assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ModifiedStatus)
|
||||
assert.Equal(t, "2", config.Resources.Clusters["test_cluster_old"].ID)
|
||||
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster_old"].ModifiedStatus)
|
||||
assert.Equal(t, "", config.Resources.Clusters["test_cluster_new"].ID)
|
||||
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster_new"].ModifiedStatus)
|
||||
|
||||
AssertFullResourceCoverage(t, &config)
|
||||
}
|
||||
|
||||
|
|
|
@ -58,6 +58,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D
|
|||
path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...)
|
||||
case dyn.Key("schemas"):
|
||||
path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...)
|
||||
case dyn.Key("clusters"):
|
||||
path = dyn.NewPath(dyn.Key("databricks_cluster")).Append(path[2:]...)
|
||||
default:
|
||||
// Trigger "key not found" for unknown resource types.
|
||||
return dyn.GetByPath(root, path)
|
||||
|
|
|
@ -31,6 +31,7 @@ func TestInterpolate(t *testing.T) {
|
|||
"other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}",
|
||||
"other_registered_model": "${resources.registered_models.other_registered_model.id}",
|
||||
"other_schema": "${resources.schemas.other_schema.id}",
|
||||
"other_cluster": "${resources.clusters.other_cluster.id}",
|
||||
},
|
||||
Tasks: []jobs.Task{
|
||||
{
|
||||
|
@ -67,6 +68,7 @@ func TestInterpolate(t *testing.T) {
|
|||
assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"])
|
||||
assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"])
|
||||
assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"])
|
||||
assert.Equal(t, "${databricks_cluster.other_cluster.id}", j.Tags["other_cluster"])
|
||||
|
||||
m := b.Config.Resources.Models["my_model"]
|
||||
assert.Equal(t, "my_model", m.Model.Name)
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
package tfdyn
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/databricks/cli/bundle/internal/tf/schema"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/dyn/convert"
|
||||
"github.com/databricks/cli/libs/log"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
)
|
||||
|
||||
func convertClusterResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
|
||||
// Normalize the output value to the target schema.
|
||||
vout, diags := convert.Normalize(compute.ClusterSpec{}, vin)
|
||||
for _, diag := range diags {
|
||||
log.Debugf(ctx, "cluster normalization diagnostic: %s", diag.Summary)
|
||||
}
|
||||
|
||||
return vout, nil
|
||||
}
|
||||
|
||||
type clusterConverter struct{}
|
||||
|
||||
func (clusterConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error {
|
||||
vout, err := convertClusterResource(ctx, vin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Add the converted resource to the output.
|
||||
out.Cluster[key] = vout.AsAny()
|
||||
|
||||
// Configure permissions for this resource.
|
||||
if permissions := convertPermissionsResource(ctx, vin); permissions != nil {
|
||||
permissions.JobId = fmt.Sprintf("${databricks_cluster.%s.id}", key)
|
||||
out.Permissions["cluster_"+key] = permissions
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
registerConverter("clusters", clusterConverter{})
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
package tfdyn
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/cli/bundle/internal/tf/schema"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/dyn/convert"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestConvertCluster(t *testing.T) {
|
||||
var src = resources.Cluster{
|
||||
ClusterSpec: &compute.ClusterSpec{
|
||||
NumWorkers: 3,
|
||||
SparkVersion: "13.3.x-scala2.12",
|
||||
ClusterName: "cluster",
|
||||
SparkConf: map[string]string{
|
||||
"spark.executor.memory": "2g",
|
||||
},
|
||||
AwsAttributes: &compute.AwsAttributes{
|
||||
Availability: "ON_DEMAND",
|
||||
},
|
||||
AzureAttributes: &compute.AzureAttributes{
|
||||
Availability: "SPOT",
|
||||
},
|
||||
DataSecurityMode: "USER_ISOLATION",
|
||||
NodeTypeId: "m5.xlarge",
|
||||
Autoscale: &compute.AutoScale{
|
||||
MinWorkers: 1,
|
||||
MaxWorkers: 10,
|
||||
},
|
||||
},
|
||||
|
||||
Permissions: []resources.Permission{
|
||||
{
|
||||
Level: "CAN_RUN",
|
||||
UserName: "jack@gmail.com",
|
||||
},
|
||||
{
|
||||
Level: "CAN_MANAGE",
|
||||
ServicePrincipalName: "sp",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
vin, err := convert.FromTyped(src, dyn.NilValue)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background()
|
||||
out := schema.NewResources()
|
||||
err = clusterConverter{}.Convert(ctx, "my_cluster", vin, out)
|
||||
require.NoError(t, err)
|
||||
|
||||
cluster := out.Cluster["my_cluster"]
|
||||
assert.Equal(t, map[string]any{
|
||||
"num_workers": int64(3),
|
||||
"spark_version": "13.3.x-scala2.12",
|
||||
"cluster_name": "cluster",
|
||||
"spark_conf": map[string]any{
|
||||
"spark.executor.memory": "2g",
|
||||
},
|
||||
"aws_attributes": map[string]any{
|
||||
"availability": "ON_DEMAND",
|
||||
},
|
||||
"azure_attributes": map[string]any{
|
||||
"availability": "SPOT",
|
||||
},
|
||||
"data_security_mode": "USER_ISOLATION",
|
||||
"node_type_id": "m5.xlarge",
|
||||
"autoscale": map[string]any{
|
||||
"min_workers": int64(1),
|
||||
"max_workers": int64(10),
|
||||
},
|
||||
}, cluster)
|
||||
|
||||
// Assert equality on the permissions
|
||||
assert.Equal(t, &schema.ResourcePermissions{
|
||||
JobId: "${databricks_cluster.my_cluster.id}",
|
||||
AccessControl: []schema.ResourcePermissionsAccessControl{
|
||||
{
|
||||
PermissionLevel: "CAN_RUN",
|
||||
UserName: "jack@gmail.com",
|
||||
},
|
||||
{
|
||||
PermissionLevel: "CAN_MANAGE",
|
||||
ServicePrincipalName: "sp",
|
||||
},
|
||||
},
|
||||
}, out.Permissions["cluster_my_cluster"])
|
||||
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
bundle:
|
||||
name: clusters
|
||||
|
||||
workspace:
|
||||
host: https://acme.cloud.databricks.com/
|
||||
|
||||
resources:
|
||||
clusters:
|
||||
foo:
|
||||
cluster_name: foo
|
||||
num_workers: 2
|
||||
node_type_id: "i3.xlarge"
|
||||
autoscale:
|
||||
min_workers: 2
|
||||
max_workers: 7
|
||||
spark_version: "13.3.x-scala2.12"
|
||||
spark_conf:
|
||||
"spark.executor.memory": "2g"
|
||||
|
||||
targets:
|
||||
default:
|
||||
|
||||
development:
|
||||
resources:
|
||||
clusters:
|
||||
foo:
|
||||
cluster_name: foo-override
|
||||
num_workers: 3
|
||||
node_type_id: "m5.xlarge"
|
||||
autoscale:
|
||||
min_workers: 1
|
||||
max_workers: 3
|
||||
spark_version: "15.2.x-scala2.12"
|
||||
spark_conf:
|
||||
"spark.executor.memory": "4g"
|
||||
"spark.executor.memory2": "4g"
|
|
@ -0,0 +1,36 @@
|
|||
package config_tests
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestClusters(t *testing.T) {
|
||||
b := load(t, "./clusters")
|
||||
assert.Equal(t, "clusters", b.Config.Bundle.Name)
|
||||
|
||||
cluster := b.Config.Resources.Clusters["foo"]
|
||||
assert.Equal(t, "foo", cluster.ClusterName)
|
||||
assert.Equal(t, "13.3.x-scala2.12", cluster.SparkVersion)
|
||||
assert.Equal(t, "i3.xlarge", cluster.NodeTypeId)
|
||||
assert.Equal(t, 2, cluster.NumWorkers)
|
||||
assert.Equal(t, "2g", cluster.SparkConf["spark.executor.memory"])
|
||||
assert.Equal(t, 2, cluster.Autoscale.MinWorkers)
|
||||
assert.Equal(t, 7, cluster.Autoscale.MaxWorkers)
|
||||
}
|
||||
|
||||
func TestClustersOverride(t *testing.T) {
|
||||
b := loadTarget(t, "./clusters", "development")
|
||||
assert.Equal(t, "clusters", b.Config.Bundle.Name)
|
||||
|
||||
cluster := b.Config.Resources.Clusters["foo"]
|
||||
assert.Equal(t, "foo-override", cluster.ClusterName)
|
||||
assert.Equal(t, "15.2.x-scala2.12", cluster.SparkVersion)
|
||||
assert.Equal(t, "m5.xlarge", cluster.NodeTypeId)
|
||||
assert.Equal(t, 3, cluster.NumWorkers)
|
||||
assert.Equal(t, "4g", cluster.SparkConf["spark.executor.memory"])
|
||||
assert.Equal(t, "4g", cluster.SparkConf["spark.executor.memory2"])
|
||||
assert.Equal(t, 1, cluster.Autoscale.MinWorkers)
|
||||
assert.Equal(t, 3, cluster.Autoscale.MaxWorkers)
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"properties": {
|
||||
"unique_id": {
|
||||
"type": "string",
|
||||
"description": "Unique ID for job name"
|
||||
},
|
||||
"spark_version": {
|
||||
"type": "string",
|
||||
"description": "Spark version used for job cluster"
|
||||
},
|
||||
"node_type_id": {
|
||||
"type": "string",
|
||||
"description": "Node type id for job cluster"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
bundle:
|
||||
name: basic
|
||||
|
||||
workspace:
|
||||
root_path: "~/.bundle/{{.unique_id}}"
|
||||
|
||||
resources:
|
||||
jobs:
|
||||
foo:
|
||||
name: test-job-basic-{{.unique_id}}
|
||||
tasks:
|
||||
- task_key: my_notebook_task
|
||||
new_cluster:
|
||||
num_workers: 1
|
||||
spark_version: "{{.spark_version}}"
|
||||
node_type_id: "{{.node_type_id}}"
|
||||
spark_python_task:
|
||||
python_file: ./hello_world.py
|
|
@ -0,0 +1 @@
|
|||
print("Hello World!")
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"properties": {
|
||||
"unique_id": {
|
||||
"type": "string",
|
||||
"description": "Unique ID for job name"
|
||||
},
|
||||
"spark_version": {
|
||||
"type": "string",
|
||||
"description": "Spark version used for job cluster"
|
||||
},
|
||||
"node_type_id": {
|
||||
"type": "string",
|
||||
"description": "Node type id for job cluster"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,27 @@
|
|||
bundle:
|
||||
name: basic
|
||||
|
||||
workspace:
|
||||
root_path: "~/.bundle/{{.unique_id}}"
|
||||
|
||||
resources:
|
||||
clusters:
|
||||
test_cluster:
|
||||
cluster_name: "test-cluster-{{.unique_id}}"
|
||||
spark_version: "{{.spark_version}}"
|
||||
node_type_id: "{{.node_type_id}}"
|
||||
num_workers: 2
|
||||
autoscale:
|
||||
min_workers: 2
|
||||
max_workers: 6
|
||||
spark_conf:
|
||||
"spark.executor.memory": "2g"
|
||||
|
||||
jobs:
|
||||
foo:
|
||||
name: test-job-with-cluster-{{.unique_id}}
|
||||
tasks:
|
||||
- task_key: my_notebook_task
|
||||
existing_cluster_id: "${resources.clusters.test_cluster.cluster_id}"
|
||||
spark_python_task:
|
||||
python_file: ./hello_world.py
|
|
@ -0,0 +1 @@
|
|||
print("Hello World!")
|
|
@ -0,0 +1,51 @@
|
|||
package bundle
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/internal"
|
||||
"github.com/databricks/cli/internal/acc"
|
||||
"github.com/databricks/cli/libs/env"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAccDeployBundleWithCluster(t *testing.T) {
|
||||
ctx, wt := acc.WorkspaceTest(t)
|
||||
|
||||
nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
|
||||
uniqueId := uuid.New().String()
|
||||
root, err := initTestTemplate(t, ctx, "clusters", map[string]any{
|
||||
"unique_id": uniqueId,
|
||||
"node_type_id": nodeTypeId,
|
||||
"spark_version": defaultSparkVersion,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
err = destroyBundle(t, ctx, root)
|
||||
require.NoError(t, err)
|
||||
|
||||
cluster, err := wt.W.Clusters.GetByClusterName(ctx, fmt.Sprintf("test-cluster-%s", uniqueId))
|
||||
if err != nil {
|
||||
require.ErrorContains(t, err, "does not exist")
|
||||
} else {
|
||||
require.Contains(t, []compute.State{compute.StateTerminated, compute.StateTerminating}, cluster.State)
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
err = deployBundle(t, ctx, root)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Cluster should exists after bundle deployment
|
||||
cluster, err := wt.W.Clusters.GetByClusterName(ctx, fmt.Sprintf("test-cluster-%s", uniqueId))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, cluster)
|
||||
|
||||
out, err := runResource(t, ctx, root, "foo")
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, out, "Hello World!")
|
||||
}
|
Loading…
Reference in New Issue