Merge remote-tracking branch 'origin' into template-format

This commit is contained in:
Shreyas Goenka 2024-09-23 17:03:26 +02:00
commit 4e63a4f5af
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
51 changed files with 904 additions and 109 deletions

View File

@ -1,5 +1,32 @@
# Version changelog # Version changelog
## [Release] Release v0.228.1
Bundles:
* Added listing cluster filtering for cluster lookups ([#1754](https://github.com/databricks/cli/pull/1754)).
* Expand library globs relative to the sync root ([#1756](https://github.com/databricks/cli/pull/1756)).
* Fixed generated YAML missing 'default' for empty values ([#1765](https://github.com/databricks/cli/pull/1765)).
* Use periodic triggers in all templates ([#1739](https://github.com/databricks/cli/pull/1739)).
* Use the friendly name of service principals when shortening their name ([#1770](https://github.com/databricks/cli/pull/1770)).
* Fixed detecting full syntax variable override which includes type field ([#1775](https://github.com/databricks/cli/pull/1775)).
Internal:
* Pass copy of `dyn.Path` to callback function ([#1747](https://github.com/databricks/cli/pull/1747)).
* Make bundle JSON schema modular with `` ([#1700](https://github.com/databricks/cli/pull/1700)).
* Alias variables block in the `Target` struct ([#1748](https://github.com/databricks/cli/pull/1748)).
* Add end to end integration tests for bundle JSON schema ([#1726](https://github.com/databricks/cli/pull/1726)).
* Fix artifact upload integration tests ([#1767](https://github.com/databricks/cli/pull/1767)).
API Changes:
* Added `databricks quality-monitors regenerate-dashboard` command.
OpenAPI commit d05898328669a3f8ab0c2ecee37db2673d3ea3f7 (2024-09-04)
Dependency updates:
* Bump golang.org/x/term from 0.23.0 to 0.24.0 ([#1757](https://github.com/databricks/cli/pull/1757)).
* Bump golang.org/x/oauth2 from 0.22.0 to 0.23.0 ([#1761](https://github.com/databricks/cli/pull/1761)).
* Bump golang.org/x/text from 0.17.0 to 0.18.0 ([#1759](https://github.com/databricks/cli/pull/1759)).
* Bump github.com/databricks/databricks-sdk-go from 0.45.0 to 0.46.0 ([#1760](https://github.com/databricks/cli/pull/1760)).
## [Release] Release v0.228.0 ## [Release] Release v0.228.0
CLI: CLI:

View File

@ -38,8 +38,11 @@ type Bundle struct {
// Annotated readonly as this should be set at the target level. // Annotated readonly as this should be set at the target level.
Mode Mode `json:"mode,omitempty" bundle:"readonly"` Mode Mode `json:"mode,omitempty" bundle:"readonly"`
// Overrides the compute used for jobs and other supported assets. // DEPRECATED: Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"` ComputeId string `json:"compute_id,omitempty"`
// Overrides the cluster used for jobs and other supported assets.
ClusterId string `json:"cluster_id,omitempty"`
// Deployment section specifies deployment related configuration for bundle // Deployment section specifies deployment related configuration for bundle
Deployment Deployment `json:"deployment,omitempty"` Deployment Deployment `json:"deployment,omitempty"`

View File

@ -160,6 +160,21 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
// the Databricks UI and via the SQL API. // the Databricks UI and via the SQL API.
} }
// Clusters: Prefix, Tags
for _, c := range r.Clusters {
c.ClusterName = prefix + c.ClusterName
if c.CustomTags == nil {
c.CustomTags = make(map[string]string)
}
for _, tag := range tags {
normalisedKey := b.Tagging.NormalizeKey(tag.Key)
normalisedValue := b.Tagging.NormalizeValue(tag.Value)
if _, ok := c.CustomTags[normalisedKey]; !ok {
c.CustomTags[normalisedKey] = normalisedValue
}
}
}
return nil return nil
} }

View File

@ -0,0 +1,87 @@
package mutator
import (
"context"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
)
type computeIdToClusterId struct{}
func ComputeIdToClusterId() bundle.Mutator {
return &computeIdToClusterId{}
}
func (m *computeIdToClusterId) Name() string {
return "ComputeIdToClusterId"
}
func (m *computeIdToClusterId) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
var diags diag.Diagnostics
// The "compute_id" key is set; rewrite it to "cluster_id".
err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
v, d := rewriteComputeIdToClusterId(v, dyn.NewPath(dyn.Key("bundle")))
diags = diags.Extend(d)
// Check if the "compute_id" key is set in any target overrides.
return dyn.MapByPattern(v, dyn.NewPattern(dyn.Key("targets"), dyn.AnyKey()), func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
v, d := rewriteComputeIdToClusterId(v, dyn.Path{})
diags = diags.Extend(d)
return v, nil
})
})
diags = diags.Extend(diag.FromErr(err))
return diags
}
func rewriteComputeIdToClusterId(v dyn.Value, p dyn.Path) (dyn.Value, diag.Diagnostics) {
var diags diag.Diagnostics
computeIdPath := p.Append(dyn.Key("compute_id"))
computeId, err := dyn.GetByPath(v, computeIdPath)
// If the "compute_id" key is not set, we don't need to do anything.
if err != nil {
return v, nil
}
if computeId.Kind() == dyn.KindInvalid {
return v, nil
}
diags = diags.Append(diag.Diagnostic{
Severity: diag.Warning,
Summary: "compute_id is deprecated, please use cluster_id instead",
Locations: computeId.Locations(),
Paths: []dyn.Path{computeIdPath},
})
clusterIdPath := p.Append(dyn.Key("cluster_id"))
nv, err := dyn.SetByPath(v, clusterIdPath, computeId)
if err != nil {
return dyn.InvalidValue, diag.FromErr(err)
}
// Drop the "compute_id" key.
vout, err := dyn.Walk(nv, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
switch len(p) {
case 0:
return v, nil
case 1:
if p[0] == dyn.Key("compute_id") {
return v, dyn.ErrDrop
}
return v, nil
case 2:
if p[1] == dyn.Key("compute_id") {
return v, dyn.ErrDrop
}
}
return v, dyn.ErrSkip
})
diags = diags.Extend(diag.FromErr(err))
return vout, diags
}

View File

@ -0,0 +1,57 @@
package mutator_test
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/libs/diag"
"github.com/stretchr/testify/assert"
)
func TestComputeIdToClusterId(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
ComputeId: "compute-id",
},
},
}
diags := bundle.Apply(context.Background(), b, mutator.ComputeIdToClusterId())
assert.NoError(t, diags.Error())
assert.Equal(t, "compute-id", b.Config.Bundle.ClusterId)
assert.Empty(t, b.Config.Bundle.ComputeId)
assert.Len(t, diags, 1)
assert.Equal(t, "compute_id is deprecated, please use cluster_id instead", diags[0].Summary)
assert.Equal(t, diag.Warning, diags[0].Severity)
}
func TestComputeIdToClusterIdInTargetOverride(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Targets: map[string]*config.Target{
"dev": {
ComputeId: "compute-id-dev",
},
},
},
}
diags := bundle.Apply(context.Background(), b, mutator.ComputeIdToClusterId())
assert.NoError(t, diags.Error())
assert.Empty(t, b.Config.Targets["dev"].ComputeId)
diags = diags.Extend(bundle.Apply(context.Background(), b, mutator.SelectTarget("dev")))
assert.NoError(t, diags.Error())
assert.Equal(t, "compute-id-dev", b.Config.Bundle.ClusterId)
assert.Empty(t, b.Config.Bundle.ComputeId)
assert.Len(t, diags, 1)
assert.Equal(t, "compute_id is deprecated, please use cluster_id instead", diags[0].Summary)
assert.Equal(t, diag.Warning, diags[0].Severity)
}

View File

@ -23,6 +23,7 @@ func DefaultMutators() []bundle.Mutator {
VerifyCliVersion(), VerifyCliVersion(),
EnvironmentsToTargets(), EnvironmentsToTargets(),
ComputeIdToClusterId(),
InitializeVariables(), InitializeVariables(),
DefineDefaultTarget(), DefineDefaultTarget(),
LoadGitDetails(), LoadGitDetails(),

View File

@ -39,22 +39,22 @@ func overrideJobCompute(j *resources.Job, compute string) {
func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
if b.Config.Bundle.Mode != config.Development { if b.Config.Bundle.Mode != config.Development {
if b.Config.Bundle.ComputeID != "" { if b.Config.Bundle.ClusterId != "" {
return diag.Errorf("cannot override compute for an target that does not use 'mode: development'") return diag.Errorf("cannot override compute for an target that does not use 'mode: development'")
} }
return nil return nil
} }
if v := env.Get(ctx, "DATABRICKS_CLUSTER_ID"); v != "" { if v := env.Get(ctx, "DATABRICKS_CLUSTER_ID"); v != "" {
b.Config.Bundle.ComputeID = v b.Config.Bundle.ClusterId = v
} }
if b.Config.Bundle.ComputeID == "" { if b.Config.Bundle.ClusterId == "" {
return nil return nil
} }
r := b.Config.Resources r := b.Config.Resources
for i := range r.Jobs { for i := range r.Jobs {
overrideJobCompute(r.Jobs[i], b.Config.Bundle.ComputeID) overrideJobCompute(r.Jobs[i], b.Config.Bundle.ClusterId)
} }
return nil return nil

View File

@ -20,7 +20,7 @@ func TestOverrideDevelopment(t *testing.T) {
Config: config.Root{ Config: config.Root{
Bundle: config.Bundle{ Bundle: config.Bundle{
Mode: config.Development, Mode: config.Development,
ComputeID: "newClusterID", ClusterId: "newClusterID",
}, },
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{
@ -144,7 +144,7 @@ func TestOverrideProduction(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Bundle: config.Bundle{ Bundle: config.Bundle{
ComputeID: "newClusterID", ClusterId: "newClusterID",
}, },
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{

View File

@ -13,6 +13,7 @@ import (
"github.com/databricks/cli/libs/tags" "github.com/databricks/cli/libs/tags"
sdkconfig "github.com/databricks/databricks-sdk-go/config" sdkconfig "github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml" "github.com/databricks/databricks-sdk-go/service/ml"
@ -119,6 +120,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle {
Schemas: map[string]*resources.Schema{ Schemas: map[string]*resources.Schema{
"schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}}, "schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}},
}, },
Clusters: map[string]*resources.Cluster{
"cluster1": {ClusterSpec: &compute.ClusterSpec{ClusterName: "cluster1", SparkVersion: "13.2.x", NumWorkers: 1}},
},
}, },
}, },
// Use AWS implementation for testing. // Use AWS implementation for testing.
@ -177,6 +181,9 @@ func TestProcessTargetModeDevelopment(t *testing.T) {
// Schema 1 // Schema 1
assert.Equal(t, "dev_lennart_schema1", b.Config.Resources.Schemas["schema1"].Name) assert.Equal(t, "dev_lennart_schema1", b.Config.Resources.Schemas["schema1"].Name)
// Clusters
assert.Equal(t, "[dev lennart] cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
} }
func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) { func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) {
@ -281,6 +288,7 @@ func TestProcessTargetModeDefault(t *testing.T) {
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
} }
func TestProcessTargetModeProduction(t *testing.T) { func TestProcessTargetModeProduction(t *testing.T) {
@ -312,6 +320,7 @@ func TestProcessTargetModeProduction(t *testing.T) {
b.Config.Resources.Experiments["experiment2"].Permissions = permissions b.Config.Resources.Experiments["experiment2"].Permissions = permissions
b.Config.Resources.Models["model1"].Permissions = permissions b.Config.Resources.Models["model1"].Permissions = permissions
b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Permissions = permissions b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Permissions = permissions
b.Config.Resources.Clusters["cluster1"].Permissions = permissions
diags = validateProductionMode(context.Background(), b, false) diags = validateProductionMode(context.Background(), b, false)
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
@ -322,6 +331,7 @@ func TestProcessTargetModeProduction(t *testing.T) {
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
} }
func TestProcessTargetModeProductionOkForPrincipal(t *testing.T) { func TestProcessTargetModeProductionOkForPrincipal(t *testing.T) {

View File

@ -32,6 +32,7 @@ func allResourceTypes(t *testing.T) []string {
// the dyn library gives us the correct list of all resources supported. Please // the dyn library gives us the correct list of all resources supported. Please
// also update this check when adding a new resource // also update this check when adding a new resource
require.Equal(t, []string{ require.Equal(t, []string{
"clusters",
"experiments", "experiments",
"jobs", "jobs",
"model_serving_endpoints", "model_serving_endpoints",
@ -133,6 +134,7 @@ func TestRunAsErrorForUnsupportedResources(t *testing.T) {
// some point in the future. These resources are (implicitly) on the deny list, since // some point in the future. These resources are (implicitly) on the deny list, since
// they are not on the allow list below. // they are not on the allow list below.
allowList := []string{ allowList := []string{
"clusters",
"jobs", "jobs",
"models", "models",
"registered_models", "registered_models",

View File

@ -19,6 +19,7 @@ type Resources struct {
RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"` RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"`
QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"` QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"`
Schemas map[string]*resources.Schema `json:"schemas,omitempty"` Schemas map[string]*resources.Schema `json:"schemas,omitempty"`
Clusters map[string]*resources.Cluster `json:"clusters,omitempty"`
} }
type ConfigResource interface { type ConfigResource interface {

View File

@ -0,0 +1,39 @@
package resources
import (
"context"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/compute"
)
type Cluster struct {
ID string `json:"id,omitempty" bundle:"readonly"`
Permissions []Permission `json:"permissions,omitempty"`
ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
*compute.ClusterSpec
}
func (s *Cluster) UnmarshalJSON(b []byte) error {
return marshal.Unmarshal(b, s)
}
func (s Cluster) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s)
}
func (s *Cluster) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
_, err := w.Clusters.GetByClusterId(ctx, id)
if err != nil {
log.Debugf(ctx, "cluster %s does not exist", id)
return false, err
}
return true, nil
}
func (s *Cluster) TerraformResourceName() string {
return "databricks_cluster"
}

View File

@ -366,9 +366,9 @@ func (r *Root) MergeTargetOverrides(name string) error {
} }
} }
// Merge `compute_id`. This field must be overwritten if set, not merged. // Merge `cluster_id`. This field must be overwritten if set, not merged.
if v := target.Get("compute_id"); v.Kind() != dyn.KindInvalid { if v := target.Get("cluster_id"); v.Kind() != dyn.KindInvalid {
root, err = dyn.SetByPath(root, dyn.NewPath(dyn.Key("bundle"), dyn.Key("compute_id")), v) root, err = dyn.SetByPath(root, dyn.NewPath(dyn.Key("bundle"), dyn.Key("cluster_id")), v)
if err != nil { if err != nil {
return err return err
} }
@ -409,18 +409,33 @@ func (r *Root) MergeTargetOverrides(name string) error {
var variableKeywords = []string{"default", "lookup"} var variableKeywords = []string{"default", "lookup"}
// isFullVariableOverrideDef checks if the given value is a full syntax varaible override. // isFullVariableOverrideDef checks if the given value is a full syntax varaible override.
// A full syntax variable override is a map with only one of the following // A full syntax variable override is a map with either 1 of 2 keys.
// keys: "default", "lookup". // If it's 2 keys, the keys should be "default" and "type".
// If it's 1 key, the key should be one of the following keys: "default", "lookup".
func isFullVariableOverrideDef(v dyn.Value) bool { func isFullVariableOverrideDef(v dyn.Value) bool {
mv, ok := v.AsMap() mv, ok := v.AsMap()
if !ok { if !ok {
return false return false
} }
if mv.Len() != 1 { // If the map has more than 2 keys, it is not a full variable override.
if mv.Len() > 2 {
return false return false
} }
// If the map has 2 keys, one of them should be "default" and the other is "type"
if mv.Len() == 2 {
if _, ok := mv.GetByString("type"); !ok {
return false
}
if _, ok := mv.GetByString("default"); !ok {
return false
}
return true
}
for _, keyword := range variableKeywords { for _, keyword := range variableKeywords {
if _, ok := mv.GetByString(keyword); ok { if _, ok := mv.GetByString(keyword); ok {
return true return true

View File

@ -24,8 +24,11 @@ type Target struct {
// name prefix of deployed resources. // name prefix of deployed resources.
Presets Presets `json:"presets,omitempty"` Presets Presets `json:"presets,omitempty"`
// Overrides the compute used for jobs and other supported assets. // DEPRECATED: Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"` ComputeId string `json:"compute_id,omitempty"`
// Overrides the cluster used for jobs and other supported assets.
ClusterId string `json:"cluster_id,omitempty"`
Bundle *Bundle `json:"bundle,omitempty"` Bundle *Bundle `json:"bundle,omitempty"`

View File

@ -8,9 +8,12 @@ import (
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/sync"
) )
type upload struct{} type upload struct {
outputHandler sync.OutputHandler
}
func (m *upload) Name() string { func (m *upload) Name() string {
return "files.Upload" return "files.Upload"
@ -18,11 +21,18 @@ func (m *upload) Name() string {
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath)) cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath))
sync, err := GetSync(ctx, bundle.ReadOnly(b)) opts, err := GetSyncOptions(ctx, bundle.ReadOnly(b))
if err != nil { if err != nil {
return diag.FromErr(err) return diag.FromErr(err)
} }
opts.OutputHandler = m.outputHandler
sync, err := sync.New(ctx, *opts)
if err != nil {
return diag.FromErr(err)
}
defer sync.Close()
b.Files, err = sync.RunOnce(ctx) b.Files, err = sync.RunOnce(ctx)
if err != nil { if err != nil {
return diag.FromErr(err) return diag.FromErr(err)
@ -32,6 +42,6 @@ func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
return nil return nil
} }
func Upload() bundle.Mutator { func Upload(outputHandler sync.OutputHandler) bundle.Mutator {
return &upload{} return &upload{outputHandler}
} }

View File

@ -231,6 +231,13 @@ func BundleToTerraform(config *config.Root) *schema.Root {
tfroot.Resource.QualityMonitor[k] = &dst tfroot.Resource.QualityMonitor[k] = &dst
} }
for k, src := range config.Resources.Clusters {
noResources = false
var dst schema.ResourceCluster
conv(src, &dst)
tfroot.Resource.Cluster[k] = &dst
}
// We explicitly set "resource" to nil to omit it from a JSON encoding. // We explicitly set "resource" to nil to omit it from a JSON encoding.
// This is required because the terraform CLI requires >= 1 resources defined // This is required because the terraform CLI requires >= 1 resources defined
// if the "resource" property is used in a .tf.json file. // if the "resource" property is used in a .tf.json file.
@ -394,6 +401,16 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
} }
cur.ID = instance.Attributes.ID cur.ID = instance.Attributes.ID
config.Resources.Schemas[resource.Name] = cur config.Resources.Schemas[resource.Name] = cur
case "databricks_cluster":
if config.Resources.Clusters == nil {
config.Resources.Clusters = make(map[string]*resources.Cluster)
}
cur := config.Resources.Clusters[resource.Name]
if cur == nil {
cur = &resources.Cluster{ModifiedStatus: resources.ModifiedStatusDeleted}
}
cur.ID = instance.Attributes.ID
config.Resources.Clusters[resource.Name] = cur
case "databricks_permissions": case "databricks_permissions":
case "databricks_grants": case "databricks_grants":
// Ignore; no need to pull these back into the configuration. // Ignore; no need to pull these back into the configuration.
@ -443,6 +460,11 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
src.ModifiedStatus = resources.ModifiedStatusCreated src.ModifiedStatus = resources.ModifiedStatusCreated
} }
} }
for _, src := range config.Resources.Clusters {
if src.ModifiedStatus == "" && src.ID == "" {
src.ModifiedStatus = resources.ModifiedStatusCreated
}
}
return nil return nil
} }

View File

@ -663,6 +663,14 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "1"}}, {Attributes: stateInstanceAttributes{ID: "1"}},
}, },
}, },
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
}, },
} }
err := TerraformToBundle(&tfState, &config) err := TerraformToBundle(&tfState, &config)
@ -692,6 +700,9 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID) assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus)
assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }
@ -754,6 +765,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
}, },
}, },
}, },
Clusters: map[string]*resources.Cluster{
"test_cluster": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster",
},
},
},
}, },
} }
var tfState = resourcesState{ var tfState = resourcesState{
@ -786,6 +804,9 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID) assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }
@ -888,6 +909,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
}, },
}, },
}, },
Clusters: map[string]*resources.Cluster{
"test_cluster": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster",
},
},
"test_cluster_new": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster_new",
},
},
},
}, },
} }
var tfState = resourcesState{ var tfState = resourcesState{
@ -1020,6 +1053,22 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "2"}}, {Attributes: stateInstanceAttributes{ID: "2"}},
}, },
}, },
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster_old",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "2"}},
},
},
}, },
} }
err := TerraformToBundle(&tfState, &config) err := TerraformToBundle(&tfState, &config)
@ -1081,6 +1130,13 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID) assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus)
assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ModifiedStatus)
assert.Equal(t, "2", config.Resources.Clusters["test_cluster_old"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster_old"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Clusters["test_cluster_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster_new"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }

View File

@ -58,6 +58,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D
path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...) path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...)
case dyn.Key("schemas"): case dyn.Key("schemas"):
path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...) path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...)
case dyn.Key("clusters"):
path = dyn.NewPath(dyn.Key("databricks_cluster")).Append(path[2:]...)
default: default:
// Trigger "key not found" for unknown resource types. // Trigger "key not found" for unknown resource types.
return dyn.GetByPath(root, path) return dyn.GetByPath(root, path)

View File

@ -31,6 +31,7 @@ func TestInterpolate(t *testing.T) {
"other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}", "other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}",
"other_registered_model": "${resources.registered_models.other_registered_model.id}", "other_registered_model": "${resources.registered_models.other_registered_model.id}",
"other_schema": "${resources.schemas.other_schema.id}", "other_schema": "${resources.schemas.other_schema.id}",
"other_cluster": "${resources.clusters.other_cluster.id}",
}, },
Tasks: []jobs.Task{ Tasks: []jobs.Task{
{ {
@ -67,6 +68,7 @@ func TestInterpolate(t *testing.T) {
assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"]) assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"])
assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"]) assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"])
assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"]) assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"])
assert.Equal(t, "${databricks_cluster.other_cluster.id}", j.Tags["other_cluster"])
m := b.Config.Resources.Models["my_model"] m := b.Config.Resources.Models["my_model"]
assert.Equal(t, "my_model", m.Model.Name) assert.Equal(t, "my_model", m.Model.Name)

View File

@ -0,0 +1,52 @@
package tfdyn
import (
"context"
"fmt"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/compute"
)
func convertClusterResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the output value to the target schema.
vout, diags := convert.Normalize(compute.ClusterSpec{}, vin)
for _, diag := range diags {
log.Debugf(ctx, "cluster normalization diagnostic: %s", diag.Summary)
}
return vout, nil
}
type clusterConverter struct{}
func (clusterConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error {
vout, err := convertClusterResource(ctx, vin)
if err != nil {
return err
}
// We always set no_wait as it allows DABs not to wait for cluster to be started.
vout, err = dyn.Set(vout, "no_wait", dyn.V(true))
if err != nil {
return err
}
// Add the converted resource to the output.
out.Cluster[key] = vout.AsAny()
// Configure permissions for this resource.
if permissions := convertPermissionsResource(ctx, vin); permissions != nil {
permissions.JobId = fmt.Sprintf("${databricks_cluster.%s.id}", key)
out.Permissions["cluster_"+key] = permissions
}
return nil
}
func init() {
registerConverter("clusters", clusterConverter{})
}

View File

@ -0,0 +1,97 @@
package tfdyn
import (
"context"
"testing"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConvertCluster(t *testing.T) {
var src = resources.Cluster{
ClusterSpec: &compute.ClusterSpec{
NumWorkers: 3,
SparkVersion: "13.3.x-scala2.12",
ClusterName: "cluster",
SparkConf: map[string]string{
"spark.executor.memory": "2g",
},
AwsAttributes: &compute.AwsAttributes{
Availability: "ON_DEMAND",
},
AzureAttributes: &compute.AzureAttributes{
Availability: "SPOT",
},
DataSecurityMode: "USER_ISOLATION",
NodeTypeId: "m5.xlarge",
Autoscale: &compute.AutoScale{
MinWorkers: 1,
MaxWorkers: 10,
},
},
Permissions: []resources.Permission{
{
Level: "CAN_RUN",
UserName: "jack@gmail.com",
},
{
Level: "CAN_MANAGE",
ServicePrincipalName: "sp",
},
},
}
vin, err := convert.FromTyped(src, dyn.NilValue)
require.NoError(t, err)
ctx := context.Background()
out := schema.NewResources()
err = clusterConverter{}.Convert(ctx, "my_cluster", vin, out)
require.NoError(t, err)
cluster := out.Cluster["my_cluster"]
assert.Equal(t, map[string]any{
"num_workers": int64(3),
"spark_version": "13.3.x-scala2.12",
"cluster_name": "cluster",
"spark_conf": map[string]any{
"spark.executor.memory": "2g",
},
"aws_attributes": map[string]any{
"availability": "ON_DEMAND",
},
"azure_attributes": map[string]any{
"availability": "SPOT",
},
"data_security_mode": "USER_ISOLATION",
"no_wait": true,
"node_type_id": "m5.xlarge",
"autoscale": map[string]any{
"min_workers": int64(1),
"max_workers": int64(10),
},
}, cluster)
// Assert equality on the permissions
assert.Equal(t, &schema.ResourcePermissions{
JobId: "${databricks_cluster.my_cluster.id}",
AccessControl: []schema.ResourcePermissionsAccessControl{
{
PermissionLevel: "CAN_RUN",
UserName: "jack@gmail.com",
},
{
PermissionLevel: "CAN_MANAGE",
ServicePrincipalName: "sp",
},
},
}, out.Permissions["cluster_my_cluster"])
}

View File

@ -51,9 +51,15 @@ func (r *root) Generate(path string) error {
} }
func Run(ctx context.Context, schema *tfjson.ProviderSchema, path string) error { func Run(ctx context.Context, schema *tfjson.ProviderSchema, path string) error {
// Generate types for resources. // Generate types for resources
var resources []*namedBlock var resources []*namedBlock
for _, k := range sortKeys(schema.ResourceSchemas) { for _, k := range sortKeys(schema.ResourceSchemas) {
// Skipping all plugin framework struct generation.
// TODO: This is a temporary fix, generation should be fixed in the future.
if strings.HasSuffix(k, "_pluginframework") {
continue
}
v := schema.ResourceSchemas[k] v := schema.ResourceSchemas[k]
b := &namedBlock{ b := &namedBlock{
filePattern: "resource_%s.go", filePattern: "resource_%s.go",
@ -71,6 +77,12 @@ func Run(ctx context.Context, schema *tfjson.ProviderSchema, path string) error
// Generate types for data sources. // Generate types for data sources.
var dataSources []*namedBlock var dataSources []*namedBlock
for _, k := range sortKeys(schema.DataSourceSchemas) { for _, k := range sortKeys(schema.DataSourceSchemas) {
// Skipping all plugin framework struct generation.
// TODO: This is a temporary fix, generation should be fixed in the future.
if strings.HasSuffix(k, "_pluginframework") {
continue
}
v := schema.DataSourceSchemas[k] v := schema.DataSourceSchemas[k]
b := &namedBlock{ b := &namedBlock{
filePattern: "data_source_%s.go", filePattern: "data_source_%s.go",

View File

@ -1,3 +1,3 @@
package schema package schema
const ProviderVersion = "1.50.0" const ProviderVersion = "1.52.0"

View File

@ -2,8 +2,16 @@
package schema package schema
type DataSourceClustersFilterBy struct {
ClusterSources []string `json:"cluster_sources,omitempty"`
ClusterStates []string `json:"cluster_states,omitempty"`
IsPinned bool `json:"is_pinned,omitempty"`
PolicyId string `json:"policy_id,omitempty"`
}
type DataSourceClusters struct { type DataSourceClusters struct {
ClusterNameContains string `json:"cluster_name_contains,omitempty"` ClusterNameContains string `json:"cluster_name_contains,omitempty"`
Id string `json:"id,omitempty"` Id string `json:"id,omitempty"`
Ids []string `json:"ids,omitempty"` Ids []string `json:"ids,omitempty"`
FilterBy *DataSourceClustersFilterBy `json:"filter_by,omitempty"`
} }

View File

@ -19,6 +19,7 @@ type DataSourceExternalLocationExternalLocationInfo struct {
CreatedBy string `json:"created_by,omitempty"` CreatedBy string `json:"created_by,omitempty"`
CredentialId string `json:"credential_id,omitempty"` CredentialId string `json:"credential_id,omitempty"`
CredentialName string `json:"credential_name,omitempty"` CredentialName string `json:"credential_name,omitempty"`
Fallback bool `json:"fallback,omitempty"`
IsolationMode string `json:"isolation_mode,omitempty"` IsolationMode string `json:"isolation_mode,omitempty"`
MetastoreId string `json:"metastore_id,omitempty"` MetastoreId string `json:"metastore_id,omitempty"`
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`

View File

@ -18,12 +18,14 @@ type DataSourceShareObject struct {
AddedBy string `json:"added_by,omitempty"` AddedBy string `json:"added_by,omitempty"`
CdfEnabled bool `json:"cdf_enabled,omitempty"` CdfEnabled bool `json:"cdf_enabled,omitempty"`
Comment string `json:"comment,omitempty"` Comment string `json:"comment,omitempty"`
Content string `json:"content,omitempty"`
DataObjectType string `json:"data_object_type"` DataObjectType string `json:"data_object_type"`
HistoryDataSharingStatus string `json:"history_data_sharing_status,omitempty"` HistoryDataSharingStatus string `json:"history_data_sharing_status,omitempty"`
Name string `json:"name"` Name string `json:"name"`
SharedAs string `json:"shared_as,omitempty"` SharedAs string `json:"shared_as,omitempty"`
StartVersion int `json:"start_version,omitempty"` StartVersion int `json:"start_version,omitempty"`
Status string `json:"status,omitempty"` Status string `json:"status,omitempty"`
StringSharedAs string `json:"string_shared_as,omitempty"`
Partition []DataSourceShareObjectPartition `json:"partition,omitempty"` Partition []DataSourceShareObjectPartition `json:"partition,omitempty"`
} }

View File

@ -2,20 +2,14 @@
package schema package schema
type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceEnablementDetails struct {
ForcedForComplianceMode bool `json:"forced_for_compliance_mode,omitempty"`
UnavailableForDisabledEntitlement bool `json:"unavailable_for_disabled_entitlement,omitempty"`
UnavailableForNonEnterpriseTier bool `json:"unavailable_for_non_enterprise_tier,omitempty"`
}
type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindowWeekDayBasedScheduleWindowStartTime struct { type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindowWeekDayBasedScheduleWindowStartTime struct {
Hours int `json:"hours,omitempty"` Hours int `json:"hours"`
Minutes int `json:"minutes,omitempty"` Minutes int `json:"minutes"`
} }
type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindowWeekDayBasedSchedule struct { type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindowWeekDayBasedSchedule struct {
DayOfWeek string `json:"day_of_week,omitempty"` DayOfWeek string `json:"day_of_week"`
Frequency string `json:"frequency,omitempty"` Frequency string `json:"frequency"`
WindowStartTime *ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindowWeekDayBasedScheduleWindowStartTime `json:"window_start_time,omitempty"` WindowStartTime *ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindowWeekDayBasedScheduleWindowStartTime `json:"window_start_time,omitempty"`
} }
@ -25,9 +19,9 @@ type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspa
type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspace struct { type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspace struct {
CanToggle bool `json:"can_toggle,omitempty"` CanToggle bool `json:"can_toggle,omitempty"`
Enabled bool `json:"enabled,omitempty"` Enabled bool `json:"enabled"`
EnablementDetails []any `json:"enablement_details,omitempty"`
RestartEvenIfNoUpdatesAvailable bool `json:"restart_even_if_no_updates_available,omitempty"` RestartEvenIfNoUpdatesAvailable bool `json:"restart_even_if_no_updates_available,omitempty"`
EnablementDetails *ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceEnablementDetails `json:"enablement_details,omitempty"`
MaintenanceWindow *ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindow `json:"maintenance_window,omitempty"` MaintenanceWindow *ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindow `json:"maintenance_window,omitempty"`
} }

View File

@ -176,6 +176,7 @@ type ResourceCluster struct {
IdempotencyToken string `json:"idempotency_token,omitempty"` IdempotencyToken string `json:"idempotency_token,omitempty"`
InstancePoolId string `json:"instance_pool_id,omitempty"` InstancePoolId string `json:"instance_pool_id,omitempty"`
IsPinned bool `json:"is_pinned,omitempty"` IsPinned bool `json:"is_pinned,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
NodeTypeId string `json:"node_type_id,omitempty"` NodeTypeId string `json:"node_type_id,omitempty"`
NumWorkers int `json:"num_workers,omitempty"` NumWorkers int `json:"num_workers,omitempty"`
PolicyId string `json:"policy_id,omitempty"` PolicyId string `json:"policy_id,omitempty"`

View File

@ -3,8 +3,8 @@
package schema package schema
type ResourceComplianceSecurityProfileWorkspaceSettingComplianceSecurityProfileWorkspace struct { type ResourceComplianceSecurityProfileWorkspaceSettingComplianceSecurityProfileWorkspace struct {
ComplianceStandards []string `json:"compliance_standards,omitempty"` ComplianceStandards []string `json:"compliance_standards"`
IsEnabled bool `json:"is_enabled,omitempty"` IsEnabled bool `json:"is_enabled"`
} }
type ResourceComplianceSecurityProfileWorkspaceSetting struct { type ResourceComplianceSecurityProfileWorkspaceSetting struct {

View File

@ -3,7 +3,7 @@
package schema package schema
type ResourceEnhancedSecurityMonitoringWorkspaceSettingEnhancedSecurityMonitoringWorkspace struct { type ResourceEnhancedSecurityMonitoringWorkspaceSettingEnhancedSecurityMonitoringWorkspace struct {
IsEnabled bool `json:"is_enabled,omitempty"` IsEnabled bool `json:"is_enabled"`
} }
type ResourceEnhancedSecurityMonitoringWorkspaceSetting struct { type ResourceEnhancedSecurityMonitoringWorkspaceSetting struct {

View File

@ -97,11 +97,13 @@ type ResourceModelServingConfigServedEntities struct {
type ResourceModelServingConfigServedModels struct { type ResourceModelServingConfigServedModels struct {
EnvironmentVars map[string]string `json:"environment_vars,omitempty"` EnvironmentVars map[string]string `json:"environment_vars,omitempty"`
InstanceProfileArn string `json:"instance_profile_arn,omitempty"` InstanceProfileArn string `json:"instance_profile_arn,omitempty"`
MaxProvisionedThroughput int `json:"max_provisioned_throughput,omitempty"`
MinProvisionedThroughput int `json:"min_provisioned_throughput,omitempty"`
ModelName string `json:"model_name"` ModelName string `json:"model_name"`
ModelVersion string `json:"model_version"` ModelVersion string `json:"model_version"`
Name string `json:"name,omitempty"` Name string `json:"name,omitempty"`
ScaleToZeroEnabled bool `json:"scale_to_zero_enabled,omitempty"` ScaleToZeroEnabled bool `json:"scale_to_zero_enabled,omitempty"`
WorkloadSize string `json:"workload_size"` WorkloadSize string `json:"workload_size,omitempty"`
WorkloadType string `json:"workload_type,omitempty"` WorkloadType string `json:"workload_type,omitempty"`
} }

View File

@ -18,20 +18,27 @@ type ResourceShareObject struct {
AddedBy string `json:"added_by,omitempty"` AddedBy string `json:"added_by,omitempty"`
CdfEnabled bool `json:"cdf_enabled,omitempty"` CdfEnabled bool `json:"cdf_enabled,omitempty"`
Comment string `json:"comment,omitempty"` Comment string `json:"comment,omitempty"`
Content string `json:"content,omitempty"`
DataObjectType string `json:"data_object_type"` DataObjectType string `json:"data_object_type"`
HistoryDataSharingStatus string `json:"history_data_sharing_status,omitempty"` HistoryDataSharingStatus string `json:"history_data_sharing_status,omitempty"`
Name string `json:"name"` Name string `json:"name"`
SharedAs string `json:"shared_as,omitempty"` SharedAs string `json:"shared_as,omitempty"`
StartVersion int `json:"start_version,omitempty"` StartVersion int `json:"start_version,omitempty"`
Status string `json:"status,omitempty"` Status string `json:"status,omitempty"`
StringSharedAs string `json:"string_shared_as,omitempty"`
Partition []ResourceShareObjectPartition `json:"partition,omitempty"` Partition []ResourceShareObjectPartition `json:"partition,omitempty"`
} }
type ResourceShare struct { type ResourceShare struct {
Comment string `json:"comment,omitempty"`
CreatedAt int `json:"created_at,omitempty"` CreatedAt int `json:"created_at,omitempty"`
CreatedBy string `json:"created_by,omitempty"` CreatedBy string `json:"created_by,omitempty"`
Id string `json:"id,omitempty"` Id string `json:"id,omitempty"`
Name string `json:"name"` Name string `json:"name"`
Owner string `json:"owner,omitempty"` Owner string `json:"owner,omitempty"`
StorageLocation string `json:"storage_location,omitempty"`
StorageRoot string `json:"storage_root,omitempty"`
UpdatedAt int `json:"updated_at,omitempty"`
UpdatedBy string `json:"updated_by,omitempty"`
Object []ResourceShareObject `json:"object,omitempty"` Object []ResourceShareObject `json:"object,omitempty"`
} }

View File

@ -15,6 +15,7 @@ type ResourceSqlTable struct {
ClusterKeys []string `json:"cluster_keys,omitempty"` ClusterKeys []string `json:"cluster_keys,omitempty"`
Comment string `json:"comment,omitempty"` Comment string `json:"comment,omitempty"`
DataSourceFormat string `json:"data_source_format,omitempty"` DataSourceFormat string `json:"data_source_format,omitempty"`
EffectiveProperties map[string]string `json:"effective_properties,omitempty"`
Id string `json:"id,omitempty"` Id string `json:"id,omitempty"`
Name string `json:"name"` Name string `json:"name"`
Options map[string]string `json:"options,omitempty"` Options map[string]string `json:"options,omitempty"`

View File

@ -21,7 +21,7 @@ type Root struct {
const ProviderHost = "registry.terraform.io" const ProviderHost = "registry.terraform.io"
const ProviderSource = "databricks/databricks" const ProviderSource = "databricks/databricks"
const ProviderVersion = "1.50.0" const ProviderVersion = "1.52.0"
func NewRoot() *Root { func NewRoot() *Root {
return &Root{ return &Root{

View File

@ -18,6 +18,7 @@ import (
"github.com/databricks/cli/bundle/python" "github.com/databricks/cli/bundle/python"
"github.com/databricks/cli/bundle/scripts" "github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/sync"
terraformlib "github.com/databricks/cli/libs/terraform" terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json" tfjson "github.com/hashicorp/terraform-json"
) )
@ -128,7 +129,7 @@ properties such as the 'catalog' or 'storage' are changed:`
} }
// The deploy phase deploys artifacts and resources. // The deploy phase deploys artifacts and resources.
func Deploy() bundle.Mutator { func Deploy(outputHandler sync.OutputHandler) bundle.Mutator {
// Core mutators that CRUD resources and modify deployment state. These // Core mutators that CRUD resources and modify deployment state. These
// mutators need informed consent if they are potentially destructive. // mutators need informed consent if they are potentially destructive.
deployCore := bundle.Defer( deployCore := bundle.Defer(
@ -157,7 +158,7 @@ func Deploy() bundle.Mutator {
libraries.ExpandGlobReferences(), libraries.ExpandGlobReferences(),
libraries.Upload(), libraries.Upload(),
python.TransformWheelTask(), python.TransformWheelTask(),
files.Upload(), files.Upload(outputHandler),
deploy.StateUpdate(), deploy.StateUpdate(),
deploy.StatePush(), deploy.StatePush(),
permissions.ApplyWorkspaceRootPermissions(), permissions.ApplyWorkspaceRootPermissions(),

View File

@ -0,0 +1,36 @@
bundle:
name: clusters
workspace:
host: https://acme.cloud.databricks.com/
resources:
clusters:
foo:
cluster_name: foo
num_workers: 2
node_type_id: "i3.xlarge"
autoscale:
min_workers: 2
max_workers: 7
spark_version: "13.3.x-scala2.12"
spark_conf:
"spark.executor.memory": "2g"
targets:
default:
development:
resources:
clusters:
foo:
cluster_name: foo-override
num_workers: 3
node_type_id: "m5.xlarge"
autoscale:
min_workers: 1
max_workers: 3
spark_version: "15.2.x-scala2.12"
spark_conf:
"spark.executor.memory": "4g"
"spark.executor.memory2": "4g"

View File

@ -0,0 +1,36 @@
package config_tests
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestClusters(t *testing.T) {
b := load(t, "./clusters")
assert.Equal(t, "clusters", b.Config.Bundle.Name)
cluster := b.Config.Resources.Clusters["foo"]
assert.Equal(t, "foo", cluster.ClusterName)
assert.Equal(t, "13.3.x-scala2.12", cluster.SparkVersion)
assert.Equal(t, "i3.xlarge", cluster.NodeTypeId)
assert.Equal(t, 2, cluster.NumWorkers)
assert.Equal(t, "2g", cluster.SparkConf["spark.executor.memory"])
assert.Equal(t, 2, cluster.Autoscale.MinWorkers)
assert.Equal(t, 7, cluster.Autoscale.MaxWorkers)
}
func TestClustersOverride(t *testing.T) {
b := loadTarget(t, "./clusters", "development")
assert.Equal(t, "clusters", b.Config.Bundle.Name)
cluster := b.Config.Resources.Clusters["foo"]
assert.Equal(t, "foo-override", cluster.ClusterName)
assert.Equal(t, "15.2.x-scala2.12", cluster.SparkVersion)
assert.Equal(t, "m5.xlarge", cluster.NodeTypeId)
assert.Equal(t, 3, cluster.NumWorkers)
assert.Equal(t, "4g", cluster.SparkConf["spark.executor.memory"])
assert.Equal(t, "4g", cluster.SparkConf["spark.executor.memory2"])
assert.Equal(t, 1, cluster.Autoscale.MinWorkers)
assert.Equal(t, 3, cluster.Autoscale.MaxWorkers)
}

View File

@ -88,3 +88,21 @@ func TestComplexVariablesOverrideWithMultipleFiles(t *testing.T) {
require.Equalf(t, "false", cluster.NewCluster.SparkConf["spark.speculation"], "cluster: %v", cluster.JobClusterKey) require.Equalf(t, "false", cluster.NewCluster.SparkConf["spark.speculation"], "cluster: %v", cluster.JobClusterKey)
} }
} }
func TestComplexVariablesOverrideWithFullSyntax(t *testing.T) {
b, diags := loadTargetWithDiags("variables/complex", "dev")
require.Empty(t, diags)
diags = bundle.Apply(context.Background(), b, bundle.Seq(
mutator.SetVariables(),
mutator.ResolveVariableReferencesInComplexVariables(),
mutator.ResolveVariableReferences(
"variables",
),
))
require.NoError(t, diags.Error())
require.Empty(t, diags)
complexvar := b.Config.Variables["complexvar"].Value
require.Equal(t, map[string]interface{}{"key1": "1", "key2": "2", "key3": "3"}, complexvar)
}

View File

@ -35,6 +35,13 @@ variables:
- jar: "/path/to/jar" - jar: "/path/to/jar"
- egg: "/path/to/egg" - egg: "/path/to/egg"
- whl: "/path/to/whl" - whl: "/path/to/whl"
complexvar:
type: complex
description: "A complex variable"
default:
key1: "value1"
key2: "value2"
key3: "value3"
targets: targets:
@ -49,3 +56,9 @@ targets:
spark_conf: spark_conf:
spark.speculation: false spark.speculation: false
spark.databricks.delta.retentionDurationCheck.enabled: false spark.databricks.delta.retentionDurationCheck.enabled: false
complexvar:
type: complex
default:
key1: "1"
key2: "2"
key3: "3"

View File

@ -10,6 +10,7 @@ import (
"github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/cmd/root" "github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/sync"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -23,13 +24,19 @@ func newDeployCommand() *cobra.Command {
var force bool var force bool
var forceLock bool var forceLock bool
var failOnActiveRuns bool var failOnActiveRuns bool
var computeID string var clusterId string
var autoApprove bool var autoApprove bool
var verbose bool
cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.") cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.")
cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.") cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.")
cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.") cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.")
cmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.") cmd.Flags().StringVar(&clusterId, "compute-id", "", "Override cluster in the deployment with the given compute ID.")
cmd.Flags().StringVarP(&clusterId, "cluster-id", "c", "", "Override cluster in the deployment with the given cluster ID.")
cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals that might be required for deployment.") cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals that might be required for deployment.")
cmd.Flags().MarkDeprecated("compute-id", "use --cluster-id instead")
cmd.Flags().BoolVar(&verbose, "verbose", false, "Enable verbose output.")
// Verbose flag currently only affects file sync output, it's used by the vscode extension
cmd.Flags().MarkHidden("verbose")
cmd.RunE = func(cmd *cobra.Command, args []string) error { cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context() ctx := cmd.Context()
@ -42,7 +49,10 @@ func newDeployCommand() *cobra.Command {
b.AutoApprove = autoApprove b.AutoApprove = autoApprove
if cmd.Flag("compute-id").Changed { if cmd.Flag("compute-id").Changed {
b.Config.Bundle.ComputeID = computeID b.Config.Bundle.ClusterId = clusterId
}
if cmd.Flag("cluster-id").Changed {
b.Config.Bundle.ClusterId = clusterId
} }
if cmd.Flag("fail-on-active-runs").Changed { if cmd.Flag("fail-on-active-runs").Changed {
b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns
@ -51,11 +61,18 @@ func newDeployCommand() *cobra.Command {
return nil return nil
}) })
var outputHandler sync.OutputHandler
if verbose {
outputHandler = func(ctx context.Context, c <-chan sync.Event) {
sync.TextOutput(ctx, c, cmd.OutOrStdout())
}
}
diags = diags.Extend( diags = diags.Extend(
bundle.Apply(ctx, b, bundle.Seq( bundle.Apply(ctx, b, bundle.Seq(
phases.Initialize(), phases.Initialize(),
phases.Build(), phases.Build(),
phases.Deploy(), phases.Deploy(outputHandler),
)), )),
) )
} }

View File

@ -55,13 +55,7 @@ task or a Python wheel task, the second example applies.
return diags.Error() return diags.Error()
} }
diags = bundle.Apply(ctx, b, bundle.Seq( diags = bundle.Apply(ctx, b, phases.Initialize())
phases.Initialize(),
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
terraform.Load(terraform.ErrorOnEmptyState),
))
if err := diags.Error(); err != nil { if err := diags.Error(); err != nil {
return err return err
} }
@ -84,6 +78,16 @@ task or a Python wheel task, the second example applies.
return fmt.Errorf("expected a KEY of the resource to run") return fmt.Errorf("expected a KEY of the resource to run")
} }
diags = bundle.Apply(ctx, b, bundle.Seq(
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
terraform.Load(terraform.ErrorOnEmptyState),
))
if err := diags.Error(); err != nil {
return err
}
runner, err := run.Find(b, args[0]) runner, err := run.Find(b, args[0])
if err != nil { if err != nil {
return err return err

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
stdsync "sync"
"time" "time"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
@ -46,6 +45,21 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
return nil, flag.ErrHelp return nil, flag.ErrHelp
} }
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch f.output {
case flags.OutputText:
outputFunc = sync.TextOutput
case flags.OutputJSON:
outputFunc = sync.JsonOutput
}
var outputHandler sync.OutputHandler
if outputFunc != nil {
outputHandler = func(ctx context.Context, events <-chan sync.Event) {
outputFunc(ctx, events, cmd.OutOrStdout())
}
}
opts := sync.SyncOptions{ opts := sync.SyncOptions{
LocalRoot: vfs.MustNew(args[0]), LocalRoot: vfs.MustNew(args[0]),
Paths: []string{"."}, Paths: []string{"."},
@ -62,6 +76,8 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
// exist and add it to the `.gitignore` file in the root. // exist and add it to the `.gitignore` file in the root.
SnapshotBasePath: filepath.Join(args[0], ".databricks"), SnapshotBasePath: filepath.Join(args[0], ".databricks"),
WorkspaceClient: root.WorkspaceClient(cmd.Context()), WorkspaceClient: root.WorkspaceClient(cmd.Context()),
OutputHandler: outputHandler,
} }
return &opts, nil return &opts, nil
} }
@ -118,23 +134,7 @@ func New() *cobra.Command {
if err != nil { if err != nil {
return err return err
} }
defer s.Close()
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch f.output {
case flags.OutputText:
outputFunc = textOutput
case flags.OutputJSON:
outputFunc = jsonOutput
}
var wg stdsync.WaitGroup
if outputFunc != nil {
wg.Add(1)
go func() {
defer wg.Done()
outputFunc(ctx, s.Events(), cmd.OutOrStdout())
}()
}
if f.watch { if f.watch {
err = s.RunContinuous(ctx) err = s.RunContinuous(ctx)
@ -142,8 +142,6 @@ func New() *cobra.Command {
_, err = s.RunOnce(ctx) _, err = s.RunOnce(ctx)
} }
s.Close()
wg.Wait()
return err return err
} }

View File

@ -0,0 +1,16 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for job name"
},
"spark_version": {
"type": "string",
"description": "Spark version used for job cluster"
},
"node_type_id": {
"type": "string",
"description": "Node type id for job cluster"
}
}
}

View File

@ -0,0 +1,24 @@
bundle:
name: basic
workspace:
root_path: "~/.bundle/{{.unique_id}}"
resources:
clusters:
test_cluster:
cluster_name: "test-cluster-{{.unique_id}}"
spark_version: "{{.spark_version}}"
node_type_id: "{{.node_type_id}}"
num_workers: 2
spark_conf:
"spark.executor.memory": "2g"
jobs:
foo:
name: test-job-with-cluster-{{.unique_id}}
tasks:
- task_key: my_notebook_task
existing_cluster_id: "${resources.clusters.test_cluster.cluster_id}"
spark_python_task:
python_file: ./hello_world.py

View File

@ -0,0 +1 @@
print("Hello World!")

View File

@ -0,0 +1,56 @@
package bundle
import (
"fmt"
"testing"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/internal/acc"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/env"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
func TestAccDeployBundleWithCluster(t *testing.T) {
ctx, wt := acc.WorkspaceTest(t)
if testutil.IsAWSCloud(wt.T) {
t.Skip("Skipping test for AWS cloud because it is not permitted to create clusters")
}
nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
uniqueId := uuid.New().String()
root, err := initTestTemplate(t, ctx, "clusters", map[string]any{
"unique_id": uniqueId,
"node_type_id": nodeTypeId,
"spark_version": defaultSparkVersion,
})
require.NoError(t, err)
t.Cleanup(func() {
err = destroyBundle(t, ctx, root)
require.NoError(t, err)
cluster, err := wt.W.Clusters.GetByClusterName(ctx, fmt.Sprintf("test-cluster-%s", uniqueId))
if err != nil {
require.ErrorContains(t, err, "does not exist")
} else {
require.Contains(t, []compute.State{compute.StateTerminated, compute.StateTerminating}, cluster.State)
}
})
err = deployBundle(t, ctx, root)
require.NoError(t, err)
// Cluster should exists after bundle deployment
cluster, err := wt.W.Clusters.GetByClusterName(ctx, fmt.Sprintf("test-cluster-%s", uniqueId))
require.NoError(t, err)
require.NotNil(t, cluster)
out, err := runResource(t, ctx, root, "foo")
require.NoError(t, err)
require.Contains(t, out, "Hello World!")
}

View File

@ -49,3 +49,7 @@ func GetCloud(t *testing.T) Cloud {
} }
return -1 return -1
} }
func IsAWSCloud(t *testing.T) bool {
return GetCloud(t) == AWS
}

View File

@ -209,7 +209,26 @@ func TestRepositoryGitConfigWhenNotARepo(t *testing.T) {
} }
func TestRepositoryOriginUrlRemovesUserCreds(t *testing.T) { func TestRepositoryOriginUrlRemovesUserCreds(t *testing.T) {
repo := newTestRepository(t) tcases := []struct {
repo.addOriginUrl("https://username:token@github.com/databricks/foobar.git") url string
repo.assertOriginUrl("https://github.com/databricks/foobar.git") expected string
}{
{
url: "https://username:token@github.com/databricks/foobar.git",
expected: "https://github.com/databricks/foobar.git",
},
{
// Note: The token is still considered and parsed as a username here.
// However credentials integrations by Git providers like GitHub
// allow for setting a PAT token as a username.
url: "https://token@github.com/databricks/foobar.git",
expected: "https://github.com/databricks/foobar.git",
},
}
for _, tc := range tcases {
repo := newTestRepository(t)
repo.addOriginUrl(tc.url)
repo.assertOriginUrl(tc.expected)
}
} }

View File

@ -5,12 +5,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"io" "io"
"github.com/databricks/cli/libs/sync"
) )
// Read synchronization events and write them as JSON to the specified writer (typically stdout). // Read synchronization events and write them as JSON to the specified writer (typically stdout).
func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { func JsonOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
for { for {
select { select {
@ -31,7 +29,7 @@ func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) {
} }
// Read synchronization events and write them as text to the specified writer (typically stdout). // Read synchronization events and write them as text to the specified writer (typically stdout).
func textOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { func TextOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
bw := bufio.NewWriter(w) bw := bufio.NewWriter(w)
for { for {

View File

@ -3,6 +3,7 @@ package sync
import ( import (
"context" "context"
"fmt" "fmt"
stdsync "sync"
"time" "time"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
@ -15,6 +16,8 @@ import (
"github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/iam"
) )
type OutputHandler func(context.Context, <-chan Event)
type SyncOptions struct { type SyncOptions struct {
LocalRoot vfs.Path LocalRoot vfs.Path
Paths []string Paths []string
@ -34,6 +37,8 @@ type SyncOptions struct {
CurrentUser *iam.User CurrentUser *iam.User
Host string Host string
OutputHandler OutputHandler
} }
type Sync struct { type Sync struct {
@ -49,6 +54,10 @@ type Sync struct {
// Synchronization progress events are sent to this event notifier. // Synchronization progress events are sent to this event notifier.
notifier EventNotifier notifier EventNotifier
seq int seq int
// WaitGroup is automatically created when an output handler is provided in the SyncOptions.
// Close call is required to ensure the output handler goroutine handles all events in time.
outputWaitGroup *stdsync.WaitGroup
} }
// New initializes and returns a new [Sync] instance. // New initializes and returns a new [Sync] instance.
@ -106,6 +115,20 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
return nil, err return nil, err
} }
var notifier EventNotifier
var outputWaitGroup = &stdsync.WaitGroup{}
if opts.OutputHandler != nil {
ch := make(chan Event, MaxRequestsInFlight)
notifier = &ChannelNotifier{ch}
outputWaitGroup.Add(1)
go func() {
defer outputWaitGroup.Done()
opts.OutputHandler(ctx, ch)
}()
} else {
notifier = &NopNotifier{}
}
return &Sync{ return &Sync{
SyncOptions: &opts, SyncOptions: &opts,
@ -114,23 +137,19 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
excludeFileSet: excludeFileSet, excludeFileSet: excludeFileSet,
snapshot: snapshot, snapshot: snapshot,
filer: filer, filer: filer,
notifier: &NopNotifier{}, notifier: notifier,
outputWaitGroup: outputWaitGroup,
seq: 0, seq: 0,
}, nil }, nil
} }
func (s *Sync) Events() <-chan Event {
ch := make(chan Event, MaxRequestsInFlight)
s.notifier = &ChannelNotifier{ch}
return ch
}
func (s *Sync) Close() { func (s *Sync) Close() {
if s.notifier == nil { if s.notifier == nil {
return return
} }
s.notifier.Close() s.notifier.Close()
s.notifier = nil s.notifier = nil
s.outputWaitGroup.Wait()
} }
func (s *Sync) notifyStart(ctx context.Context, d diff) { func (s *Sync) notifyStart(ctx context.Context, d diff) {

View File

@ -3,6 +3,12 @@ resources:
pipelines: pipelines:
{{.project_name}}_pipeline: {{.project_name}}_pipeline:
name: {{.project_name}}_pipeline name: {{.project_name}}_pipeline
{{- if eq default_catalog ""}}
## Specify the 'catalog' field to configure this pipeline to make use of Unity Catalog:
# catalog: catalog_name
{{- else}}
catalog: {{default_catalog}}
{{- end}}
target: {{.project_name}}_${bundle.environment} target: {{.project_name}}_${bundle.environment}
libraries: libraries:
- notebook: - notebook: