Compare commits

..

No commits in common. "4e63a4f5af085c84680fccaffcd2945572170dd3" and "941d50e86849be1724d33fff551e7b254658b950" have entirely different histories.

51 changed files with 108 additions and 903 deletions

View File

@ -1,32 +1,5 @@
# Version changelog
## [Release] Release v0.228.1
Bundles:
* Added listing cluster filtering for cluster lookups ([#1754](https://github.com/databricks/cli/pull/1754)).
* Expand library globs relative to the sync root ([#1756](https://github.com/databricks/cli/pull/1756)).
* Fixed generated YAML missing 'default' for empty values ([#1765](https://github.com/databricks/cli/pull/1765)).
* Use periodic triggers in all templates ([#1739](https://github.com/databricks/cli/pull/1739)).
* Use the friendly name of service principals when shortening their name ([#1770](https://github.com/databricks/cli/pull/1770)).
* Fixed detecting full syntax variable override which includes type field ([#1775](https://github.com/databricks/cli/pull/1775)).
Internal:
* Pass copy of `dyn.Path` to callback function ([#1747](https://github.com/databricks/cli/pull/1747)).
* Make bundle JSON schema modular with `` ([#1700](https://github.com/databricks/cli/pull/1700)).
* Alias variables block in the `Target` struct ([#1748](https://github.com/databricks/cli/pull/1748)).
* Add end to end integration tests for bundle JSON schema ([#1726](https://github.com/databricks/cli/pull/1726)).
* Fix artifact upload integration tests ([#1767](https://github.com/databricks/cli/pull/1767)).
API Changes:
* Added `databricks quality-monitors regenerate-dashboard` command.
OpenAPI commit d05898328669a3f8ab0c2ecee37db2673d3ea3f7 (2024-09-04)
Dependency updates:
* Bump golang.org/x/term from 0.23.0 to 0.24.0 ([#1757](https://github.com/databricks/cli/pull/1757)).
* Bump golang.org/x/oauth2 from 0.22.0 to 0.23.0 ([#1761](https://github.com/databricks/cli/pull/1761)).
* Bump golang.org/x/text from 0.17.0 to 0.18.0 ([#1759](https://github.com/databricks/cli/pull/1759)).
* Bump github.com/databricks/databricks-sdk-go from 0.45.0 to 0.46.0 ([#1760](https://github.com/databricks/cli/pull/1760)).
## [Release] Release v0.228.0
CLI:

View File

@ -38,11 +38,8 @@ type Bundle struct {
// Annotated readonly as this should be set at the target level.
Mode Mode `json:"mode,omitempty" bundle:"readonly"`
// DEPRECATED: Overrides the compute used for jobs and other supported assets.
ComputeId string `json:"compute_id,omitempty"`
// Overrides the cluster used for jobs and other supported assets.
ClusterId string `json:"cluster_id,omitempty"`
// Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"`
// Deployment section specifies deployment related configuration for bundle
Deployment Deployment `json:"deployment,omitempty"`

View File

@ -160,21 +160,6 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
// the Databricks UI and via the SQL API.
}
// Clusters: Prefix, Tags
for _, c := range r.Clusters {
c.ClusterName = prefix + c.ClusterName
if c.CustomTags == nil {
c.CustomTags = make(map[string]string)
}
for _, tag := range tags {
normalisedKey := b.Tagging.NormalizeKey(tag.Key)
normalisedValue := b.Tagging.NormalizeValue(tag.Value)
if _, ok := c.CustomTags[normalisedKey]; !ok {
c.CustomTags[normalisedKey] = normalisedValue
}
}
}
return nil
}

View File

@ -1,87 +0,0 @@
package mutator
import (
"context"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
)
type computeIdToClusterId struct{}
func ComputeIdToClusterId() bundle.Mutator {
return &computeIdToClusterId{}
}
func (m *computeIdToClusterId) Name() string {
return "ComputeIdToClusterId"
}
func (m *computeIdToClusterId) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
var diags diag.Diagnostics
// The "compute_id" key is set; rewrite it to "cluster_id".
err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
v, d := rewriteComputeIdToClusterId(v, dyn.NewPath(dyn.Key("bundle")))
diags = diags.Extend(d)
// Check if the "compute_id" key is set in any target overrides.
return dyn.MapByPattern(v, dyn.NewPattern(dyn.Key("targets"), dyn.AnyKey()), func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
v, d := rewriteComputeIdToClusterId(v, dyn.Path{})
diags = diags.Extend(d)
return v, nil
})
})
diags = diags.Extend(diag.FromErr(err))
return diags
}
func rewriteComputeIdToClusterId(v dyn.Value, p dyn.Path) (dyn.Value, diag.Diagnostics) {
var diags diag.Diagnostics
computeIdPath := p.Append(dyn.Key("compute_id"))
computeId, err := dyn.GetByPath(v, computeIdPath)
// If the "compute_id" key is not set, we don't need to do anything.
if err != nil {
return v, nil
}
if computeId.Kind() == dyn.KindInvalid {
return v, nil
}
diags = diags.Append(diag.Diagnostic{
Severity: diag.Warning,
Summary: "compute_id is deprecated, please use cluster_id instead",
Locations: computeId.Locations(),
Paths: []dyn.Path{computeIdPath},
})
clusterIdPath := p.Append(dyn.Key("cluster_id"))
nv, err := dyn.SetByPath(v, clusterIdPath, computeId)
if err != nil {
return dyn.InvalidValue, diag.FromErr(err)
}
// Drop the "compute_id" key.
vout, err := dyn.Walk(nv, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
switch len(p) {
case 0:
return v, nil
case 1:
if p[0] == dyn.Key("compute_id") {
return v, dyn.ErrDrop
}
return v, nil
case 2:
if p[1] == dyn.Key("compute_id") {
return v, dyn.ErrDrop
}
}
return v, dyn.ErrSkip
})
diags = diags.Extend(diag.FromErr(err))
return vout, diags
}

View File

@ -1,57 +0,0 @@
package mutator_test
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/libs/diag"
"github.com/stretchr/testify/assert"
)
func TestComputeIdToClusterId(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
ComputeId: "compute-id",
},
},
}
diags := bundle.Apply(context.Background(), b, mutator.ComputeIdToClusterId())
assert.NoError(t, diags.Error())
assert.Equal(t, "compute-id", b.Config.Bundle.ClusterId)
assert.Empty(t, b.Config.Bundle.ComputeId)
assert.Len(t, diags, 1)
assert.Equal(t, "compute_id is deprecated, please use cluster_id instead", diags[0].Summary)
assert.Equal(t, diag.Warning, diags[0].Severity)
}
func TestComputeIdToClusterIdInTargetOverride(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Targets: map[string]*config.Target{
"dev": {
ComputeId: "compute-id-dev",
},
},
},
}
diags := bundle.Apply(context.Background(), b, mutator.ComputeIdToClusterId())
assert.NoError(t, diags.Error())
assert.Empty(t, b.Config.Targets["dev"].ComputeId)
diags = diags.Extend(bundle.Apply(context.Background(), b, mutator.SelectTarget("dev")))
assert.NoError(t, diags.Error())
assert.Equal(t, "compute-id-dev", b.Config.Bundle.ClusterId)
assert.Empty(t, b.Config.Bundle.ComputeId)
assert.Len(t, diags, 1)
assert.Equal(t, "compute_id is deprecated, please use cluster_id instead", diags[0].Summary)
assert.Equal(t, diag.Warning, diags[0].Severity)
}

View File

@ -23,7 +23,6 @@ func DefaultMutators() []bundle.Mutator {
VerifyCliVersion(),
EnvironmentsToTargets(),
ComputeIdToClusterId(),
InitializeVariables(),
DefineDefaultTarget(),
LoadGitDetails(),

View File

@ -39,22 +39,22 @@ func overrideJobCompute(j *resources.Job, compute string) {
func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
if b.Config.Bundle.Mode != config.Development {
if b.Config.Bundle.ClusterId != "" {
if b.Config.Bundle.ComputeID != "" {
return diag.Errorf("cannot override compute for an target that does not use 'mode: development'")
}
return nil
}
if v := env.Get(ctx, "DATABRICKS_CLUSTER_ID"); v != "" {
b.Config.Bundle.ClusterId = v
b.Config.Bundle.ComputeID = v
}
if b.Config.Bundle.ClusterId == "" {
if b.Config.Bundle.ComputeID == "" {
return nil
}
r := b.Config.Resources
for i := range r.Jobs {
overrideJobCompute(r.Jobs[i], b.Config.Bundle.ClusterId)
overrideJobCompute(r.Jobs[i], b.Config.Bundle.ComputeID)
}
return nil

View File

@ -20,7 +20,7 @@ func TestOverrideDevelopment(t *testing.T) {
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Development,
ClusterId: "newClusterID",
ComputeID: "newClusterID",
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
@ -144,7 +144,7 @@ func TestOverrideProduction(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
ClusterId: "newClusterID",
ComputeID: "newClusterID",
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{

View File

@ -13,7 +13,6 @@ import (
"github.com/databricks/cli/libs/tags"
sdkconfig "github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml"
@ -120,9 +119,6 @@ func mockBundle(mode config.Mode) *bundle.Bundle {
Schemas: map[string]*resources.Schema{
"schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}},
},
Clusters: map[string]*resources.Cluster{
"cluster1": {ClusterSpec: &compute.ClusterSpec{ClusterName: "cluster1", SparkVersion: "13.2.x", NumWorkers: 1}},
},
},
},
// Use AWS implementation for testing.
@ -181,9 +177,6 @@ func TestProcessTargetModeDevelopment(t *testing.T) {
// Schema 1
assert.Equal(t, "dev_lennart_schema1", b.Config.Resources.Schemas["schema1"].Name)
// Clusters
assert.Equal(t, "[dev lennart] cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
}
func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) {
@ -288,7 +281,6 @@ func TestProcessTargetModeDefault(t *testing.T) {
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
}
func TestProcessTargetModeProduction(t *testing.T) {
@ -320,7 +312,6 @@ func TestProcessTargetModeProduction(t *testing.T) {
b.Config.Resources.Experiments["experiment2"].Permissions = permissions
b.Config.Resources.Models["model1"].Permissions = permissions
b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Permissions = permissions
b.Config.Resources.Clusters["cluster1"].Permissions = permissions
diags = validateProductionMode(context.Background(), b, false)
require.NoError(t, diags.Error())
@ -331,7 +322,6 @@ func TestProcessTargetModeProduction(t *testing.T) {
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
}
func TestProcessTargetModeProductionOkForPrincipal(t *testing.T) {

View File

@ -32,7 +32,6 @@ func allResourceTypes(t *testing.T) []string {
// the dyn library gives us the correct list of all resources supported. Please
// also update this check when adding a new resource
require.Equal(t, []string{
"clusters",
"experiments",
"jobs",
"model_serving_endpoints",
@ -134,7 +133,6 @@ func TestRunAsErrorForUnsupportedResources(t *testing.T) {
// some point in the future. These resources are (implicitly) on the deny list, since
// they are not on the allow list below.
allowList := []string{
"clusters",
"jobs",
"models",
"registered_models",

View File

@ -19,7 +19,6 @@ type Resources struct {
RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"`
QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"`
Schemas map[string]*resources.Schema `json:"schemas,omitempty"`
Clusters map[string]*resources.Cluster `json:"clusters,omitempty"`
}
type ConfigResource interface {

View File

@ -1,39 +0,0 @@
package resources
import (
"context"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/compute"
)
type Cluster struct {
ID string `json:"id,omitempty" bundle:"readonly"`
Permissions []Permission `json:"permissions,omitempty"`
ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
*compute.ClusterSpec
}
func (s *Cluster) UnmarshalJSON(b []byte) error {
return marshal.Unmarshal(b, s)
}
func (s Cluster) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s)
}
func (s *Cluster) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
_, err := w.Clusters.GetByClusterId(ctx, id)
if err != nil {
log.Debugf(ctx, "cluster %s does not exist", id)
return false, err
}
return true, nil
}
func (s *Cluster) TerraformResourceName() string {
return "databricks_cluster"
}

View File

@ -366,9 +366,9 @@ func (r *Root) MergeTargetOverrides(name string) error {
}
}
// Merge `cluster_id`. This field must be overwritten if set, not merged.
if v := target.Get("cluster_id"); v.Kind() != dyn.KindInvalid {
root, err = dyn.SetByPath(root, dyn.NewPath(dyn.Key("bundle"), dyn.Key("cluster_id")), v)
// Merge `compute_id`. This field must be overwritten if set, not merged.
if v := target.Get("compute_id"); v.Kind() != dyn.KindInvalid {
root, err = dyn.SetByPath(root, dyn.NewPath(dyn.Key("bundle"), dyn.Key("compute_id")), v)
if err != nil {
return err
}
@ -409,33 +409,18 @@ func (r *Root) MergeTargetOverrides(name string) error {
var variableKeywords = []string{"default", "lookup"}
// isFullVariableOverrideDef checks if the given value is a full syntax varaible override.
// A full syntax variable override is a map with either 1 of 2 keys.
// If it's 2 keys, the keys should be "default" and "type".
// If it's 1 key, the key should be one of the following keys: "default", "lookup".
// A full syntax variable override is a map with only one of the following
// keys: "default", "lookup".
func isFullVariableOverrideDef(v dyn.Value) bool {
mv, ok := v.AsMap()
if !ok {
return false
}
// If the map has more than 2 keys, it is not a full variable override.
if mv.Len() > 2 {
if mv.Len() != 1 {
return false
}
// If the map has 2 keys, one of them should be "default" and the other is "type"
if mv.Len() == 2 {
if _, ok := mv.GetByString("type"); !ok {
return false
}
if _, ok := mv.GetByString("default"); !ok {
return false
}
return true
}
for _, keyword := range variableKeywords {
if _, ok := mv.GetByString(keyword); ok {
return true

View File

@ -24,11 +24,8 @@ type Target struct {
// name prefix of deployed resources.
Presets Presets `json:"presets,omitempty"`
// DEPRECATED: Overrides the compute used for jobs and other supported assets.
ComputeId string `json:"compute_id,omitempty"`
// Overrides the cluster used for jobs and other supported assets.
ClusterId string `json:"cluster_id,omitempty"`
// Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"`
Bundle *Bundle `json:"bundle,omitempty"`

View File

@ -8,12 +8,9 @@ import (
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/sync"
)
type upload struct {
outputHandler sync.OutputHandler
}
type upload struct{}
func (m *upload) Name() string {
return "files.Upload"
@ -21,18 +18,11 @@ func (m *upload) Name() string {
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath))
opts, err := GetSyncOptions(ctx, bundle.ReadOnly(b))
sync, err := GetSync(ctx, bundle.ReadOnly(b))
if err != nil {
return diag.FromErr(err)
}
opts.OutputHandler = m.outputHandler
sync, err := sync.New(ctx, *opts)
if err != nil {
return diag.FromErr(err)
}
defer sync.Close()
b.Files, err = sync.RunOnce(ctx)
if err != nil {
return diag.FromErr(err)
@ -42,6 +32,6 @@ func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
return nil
}
func Upload(outputHandler sync.OutputHandler) bundle.Mutator {
return &upload{outputHandler}
func Upload() bundle.Mutator {
return &upload{}
}

View File

@ -231,13 +231,6 @@ func BundleToTerraform(config *config.Root) *schema.Root {
tfroot.Resource.QualityMonitor[k] = &dst
}
for k, src := range config.Resources.Clusters {
noResources = false
var dst schema.ResourceCluster
conv(src, &dst)
tfroot.Resource.Cluster[k] = &dst
}
// We explicitly set "resource" to nil to omit it from a JSON encoding.
// This is required because the terraform CLI requires >= 1 resources defined
// if the "resource" property is used in a .tf.json file.
@ -401,16 +394,6 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
}
cur.ID = instance.Attributes.ID
config.Resources.Schemas[resource.Name] = cur
case "databricks_cluster":
if config.Resources.Clusters == nil {
config.Resources.Clusters = make(map[string]*resources.Cluster)
}
cur := config.Resources.Clusters[resource.Name]
if cur == nil {
cur = &resources.Cluster{ModifiedStatus: resources.ModifiedStatusDeleted}
}
cur.ID = instance.Attributes.ID
config.Resources.Clusters[resource.Name] = cur
case "databricks_permissions":
case "databricks_grants":
// Ignore; no need to pull these back into the configuration.
@ -460,11 +443,6 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
src.ModifiedStatus = resources.ModifiedStatusCreated
}
}
for _, src := range config.Resources.Clusters {
if src.ModifiedStatus == "" && src.ID == "" {
src.ModifiedStatus = resources.ModifiedStatusCreated
}
}
return nil
}

View File

@ -663,14 +663,6 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
},
}
err := TerraformToBundle(&tfState, &config)
@ -700,9 +692,6 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus)
assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster"].ModifiedStatus)
AssertFullResourceCoverage(t, &config)
}
@ -765,13 +754,6 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
},
},
},
Clusters: map[string]*resources.Cluster{
"test_cluster": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster",
},
},
},
},
}
var tfState = resourcesState{
@ -804,9 +786,6 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster"].ModifiedStatus)
AssertFullResourceCoverage(t, &config)
}
@ -909,18 +888,6 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
},
},
},
Clusters: map[string]*resources.Cluster{
"test_cluster": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster",
},
},
"test_cluster_new": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster_new",
},
},
},
},
}
var tfState = resourcesState{
@ -1053,22 +1020,6 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "2"}},
},
},
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster_old",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "2"}},
},
},
},
}
err := TerraformToBundle(&tfState, &config)
@ -1130,13 +1081,6 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus)
assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ModifiedStatus)
assert.Equal(t, "2", config.Resources.Clusters["test_cluster_old"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster_old"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Clusters["test_cluster_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster_new"].ModifiedStatus)
AssertFullResourceCoverage(t, &config)
}

View File

@ -58,8 +58,6 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D
path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...)
case dyn.Key("schemas"):
path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...)
case dyn.Key("clusters"):
path = dyn.NewPath(dyn.Key("databricks_cluster")).Append(path[2:]...)
default:
// Trigger "key not found" for unknown resource types.
return dyn.GetByPath(root, path)

View File

@ -31,7 +31,6 @@ func TestInterpolate(t *testing.T) {
"other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}",
"other_registered_model": "${resources.registered_models.other_registered_model.id}",
"other_schema": "${resources.schemas.other_schema.id}",
"other_cluster": "${resources.clusters.other_cluster.id}",
},
Tasks: []jobs.Task{
{
@ -68,7 +67,6 @@ func TestInterpolate(t *testing.T) {
assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"])
assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"])
assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"])
assert.Equal(t, "${databricks_cluster.other_cluster.id}", j.Tags["other_cluster"])
m := b.Config.Resources.Models["my_model"]
assert.Equal(t, "my_model", m.Model.Name)

View File

@ -1,52 +0,0 @@
package tfdyn
import (
"context"
"fmt"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/compute"
)
func convertClusterResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the output value to the target schema.
vout, diags := convert.Normalize(compute.ClusterSpec{}, vin)
for _, diag := range diags {
log.Debugf(ctx, "cluster normalization diagnostic: %s", diag.Summary)
}
return vout, nil
}
type clusterConverter struct{}
func (clusterConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error {
vout, err := convertClusterResource(ctx, vin)
if err != nil {
return err
}
// We always set no_wait as it allows DABs not to wait for cluster to be started.
vout, err = dyn.Set(vout, "no_wait", dyn.V(true))
if err != nil {
return err
}
// Add the converted resource to the output.
out.Cluster[key] = vout.AsAny()
// Configure permissions for this resource.
if permissions := convertPermissionsResource(ctx, vin); permissions != nil {
permissions.JobId = fmt.Sprintf("${databricks_cluster.%s.id}", key)
out.Permissions["cluster_"+key] = permissions
}
return nil
}
func init() {
registerConverter("clusters", clusterConverter{})
}

View File

@ -1,97 +0,0 @@
package tfdyn
import (
"context"
"testing"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConvertCluster(t *testing.T) {
var src = resources.Cluster{
ClusterSpec: &compute.ClusterSpec{
NumWorkers: 3,
SparkVersion: "13.3.x-scala2.12",
ClusterName: "cluster",
SparkConf: map[string]string{
"spark.executor.memory": "2g",
},
AwsAttributes: &compute.AwsAttributes{
Availability: "ON_DEMAND",
},
AzureAttributes: &compute.AzureAttributes{
Availability: "SPOT",
},
DataSecurityMode: "USER_ISOLATION",
NodeTypeId: "m5.xlarge",
Autoscale: &compute.AutoScale{
MinWorkers: 1,
MaxWorkers: 10,
},
},
Permissions: []resources.Permission{
{
Level: "CAN_RUN",
UserName: "jack@gmail.com",
},
{
Level: "CAN_MANAGE",
ServicePrincipalName: "sp",
},
},
}
vin, err := convert.FromTyped(src, dyn.NilValue)
require.NoError(t, err)
ctx := context.Background()
out := schema.NewResources()
err = clusterConverter{}.Convert(ctx, "my_cluster", vin, out)
require.NoError(t, err)
cluster := out.Cluster["my_cluster"]
assert.Equal(t, map[string]any{
"num_workers": int64(3),
"spark_version": "13.3.x-scala2.12",
"cluster_name": "cluster",
"spark_conf": map[string]any{
"spark.executor.memory": "2g",
},
"aws_attributes": map[string]any{
"availability": "ON_DEMAND",
},
"azure_attributes": map[string]any{
"availability": "SPOT",
},
"data_security_mode": "USER_ISOLATION",
"no_wait": true,
"node_type_id": "m5.xlarge",
"autoscale": map[string]any{
"min_workers": int64(1),
"max_workers": int64(10),
},
}, cluster)
// Assert equality on the permissions
assert.Equal(t, &schema.ResourcePermissions{
JobId: "${databricks_cluster.my_cluster.id}",
AccessControl: []schema.ResourcePermissionsAccessControl{
{
PermissionLevel: "CAN_RUN",
UserName: "jack@gmail.com",
},
{
PermissionLevel: "CAN_MANAGE",
ServicePrincipalName: "sp",
},
},
}, out.Permissions["cluster_my_cluster"])
}

View File

@ -51,15 +51,9 @@ func (r *root) Generate(path string) error {
}
func Run(ctx context.Context, schema *tfjson.ProviderSchema, path string) error {
// Generate types for resources
// Generate types for resources.
var resources []*namedBlock
for _, k := range sortKeys(schema.ResourceSchemas) {
// Skipping all plugin framework struct generation.
// TODO: This is a temporary fix, generation should be fixed in the future.
if strings.HasSuffix(k, "_pluginframework") {
continue
}
v := schema.ResourceSchemas[k]
b := &namedBlock{
filePattern: "resource_%s.go",
@ -77,12 +71,6 @@ func Run(ctx context.Context, schema *tfjson.ProviderSchema, path string) error
// Generate types for data sources.
var dataSources []*namedBlock
for _, k := range sortKeys(schema.DataSourceSchemas) {
// Skipping all plugin framework struct generation.
// TODO: This is a temporary fix, generation should be fixed in the future.
if strings.HasSuffix(k, "_pluginframework") {
continue
}
v := schema.DataSourceSchemas[k]
b := &namedBlock{
filePattern: "data_source_%s.go",

View File

@ -1,3 +1,3 @@
package schema
const ProviderVersion = "1.52.0"
const ProviderVersion = "1.50.0"

View File

@ -2,16 +2,8 @@
package schema
type DataSourceClustersFilterBy struct {
ClusterSources []string `json:"cluster_sources,omitempty"`
ClusterStates []string `json:"cluster_states,omitempty"`
IsPinned bool `json:"is_pinned,omitempty"`
PolicyId string `json:"policy_id,omitempty"`
}
type DataSourceClusters struct {
ClusterNameContains string `json:"cluster_name_contains,omitempty"`
Id string `json:"id,omitempty"`
Ids []string `json:"ids,omitempty"`
FilterBy *DataSourceClustersFilterBy `json:"filter_by,omitempty"`
ClusterNameContains string `json:"cluster_name_contains,omitempty"`
Id string `json:"id,omitempty"`
Ids []string `json:"ids,omitempty"`
}

View File

@ -19,7 +19,6 @@ type DataSourceExternalLocationExternalLocationInfo struct {
CreatedBy string `json:"created_by,omitempty"`
CredentialId string `json:"credential_id,omitempty"`
CredentialName string `json:"credential_name,omitempty"`
Fallback bool `json:"fallback,omitempty"`
IsolationMode string `json:"isolation_mode,omitempty"`
MetastoreId string `json:"metastore_id,omitempty"`
Name string `json:"name,omitempty"`

View File

@ -18,14 +18,12 @@ type DataSourceShareObject struct {
AddedBy string `json:"added_by,omitempty"`
CdfEnabled bool `json:"cdf_enabled,omitempty"`
Comment string `json:"comment,omitempty"`
Content string `json:"content,omitempty"`
DataObjectType string `json:"data_object_type"`
HistoryDataSharingStatus string `json:"history_data_sharing_status,omitempty"`
Name string `json:"name"`
SharedAs string `json:"shared_as,omitempty"`
StartVersion int `json:"start_version,omitempty"`
Status string `json:"status,omitempty"`
StringSharedAs string `json:"string_shared_as,omitempty"`
Partition []DataSourceShareObjectPartition `json:"partition,omitempty"`
}

View File

@ -2,14 +2,20 @@
package schema
type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceEnablementDetails struct {
ForcedForComplianceMode bool `json:"forced_for_compliance_mode,omitempty"`
UnavailableForDisabledEntitlement bool `json:"unavailable_for_disabled_entitlement,omitempty"`
UnavailableForNonEnterpriseTier bool `json:"unavailable_for_non_enterprise_tier,omitempty"`
}
type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindowWeekDayBasedScheduleWindowStartTime struct {
Hours int `json:"hours"`
Minutes int `json:"minutes"`
Hours int `json:"hours,omitempty"`
Minutes int `json:"minutes,omitempty"`
}
type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindowWeekDayBasedSchedule struct {
DayOfWeek string `json:"day_of_week"`
Frequency string `json:"frequency"`
DayOfWeek string `json:"day_of_week,omitempty"`
Frequency string `json:"frequency,omitempty"`
WindowStartTime *ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindowWeekDayBasedScheduleWindowStartTime `json:"window_start_time,omitempty"`
}
@ -19,9 +25,9 @@ type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspa
type ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspace struct {
CanToggle bool `json:"can_toggle,omitempty"`
Enabled bool `json:"enabled"`
EnablementDetails []any `json:"enablement_details,omitempty"`
Enabled bool `json:"enabled,omitempty"`
RestartEvenIfNoUpdatesAvailable bool `json:"restart_even_if_no_updates_available,omitempty"`
EnablementDetails *ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceEnablementDetails `json:"enablement_details,omitempty"`
MaintenanceWindow *ResourceAutomaticClusterUpdateWorkspaceSettingAutomaticClusterUpdateWorkspaceMaintenanceWindow `json:"maintenance_window,omitempty"`
}

View File

@ -176,7 +176,6 @@ type ResourceCluster struct {
IdempotencyToken string `json:"idempotency_token,omitempty"`
InstancePoolId string `json:"instance_pool_id,omitempty"`
IsPinned bool `json:"is_pinned,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
NodeTypeId string `json:"node_type_id,omitempty"`
NumWorkers int `json:"num_workers,omitempty"`
PolicyId string `json:"policy_id,omitempty"`

View File

@ -3,8 +3,8 @@
package schema
type ResourceComplianceSecurityProfileWorkspaceSettingComplianceSecurityProfileWorkspace struct {
ComplianceStandards []string `json:"compliance_standards"`
IsEnabled bool `json:"is_enabled"`
ComplianceStandards []string `json:"compliance_standards,omitempty"`
IsEnabled bool `json:"is_enabled,omitempty"`
}
type ResourceComplianceSecurityProfileWorkspaceSetting struct {

View File

@ -3,7 +3,7 @@
package schema
type ResourceEnhancedSecurityMonitoringWorkspaceSettingEnhancedSecurityMonitoringWorkspace struct {
IsEnabled bool `json:"is_enabled"`
IsEnabled bool `json:"is_enabled,omitempty"`
}
type ResourceEnhancedSecurityMonitoringWorkspaceSetting struct {

View File

@ -95,16 +95,14 @@ type ResourceModelServingConfigServedEntities struct {
}
type ResourceModelServingConfigServedModels struct {
EnvironmentVars map[string]string `json:"environment_vars,omitempty"`
InstanceProfileArn string `json:"instance_profile_arn,omitempty"`
MaxProvisionedThroughput int `json:"max_provisioned_throughput,omitempty"`
MinProvisionedThroughput int `json:"min_provisioned_throughput,omitempty"`
ModelName string `json:"model_name"`
ModelVersion string `json:"model_version"`
Name string `json:"name,omitempty"`
ScaleToZeroEnabled bool `json:"scale_to_zero_enabled,omitempty"`
WorkloadSize string `json:"workload_size,omitempty"`
WorkloadType string `json:"workload_type,omitempty"`
EnvironmentVars map[string]string `json:"environment_vars,omitempty"`
InstanceProfileArn string `json:"instance_profile_arn,omitempty"`
ModelName string `json:"model_name"`
ModelVersion string `json:"model_version"`
Name string `json:"name,omitempty"`
ScaleToZeroEnabled bool `json:"scale_to_zero_enabled,omitempty"`
WorkloadSize string `json:"workload_size"`
WorkloadType string `json:"workload_type,omitempty"`
}
type ResourceModelServingConfigTrafficConfigRoutes struct {

View File

@ -18,27 +18,20 @@ type ResourceShareObject struct {
AddedBy string `json:"added_by,omitempty"`
CdfEnabled bool `json:"cdf_enabled,omitempty"`
Comment string `json:"comment,omitempty"`
Content string `json:"content,omitempty"`
DataObjectType string `json:"data_object_type"`
HistoryDataSharingStatus string `json:"history_data_sharing_status,omitempty"`
Name string `json:"name"`
SharedAs string `json:"shared_as,omitempty"`
StartVersion int `json:"start_version,omitempty"`
Status string `json:"status,omitempty"`
StringSharedAs string `json:"string_shared_as,omitempty"`
Partition []ResourceShareObjectPartition `json:"partition,omitempty"`
}
type ResourceShare struct {
Comment string `json:"comment,omitempty"`
CreatedAt int `json:"created_at,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
Id string `json:"id,omitempty"`
Name string `json:"name"`
Owner string `json:"owner,omitempty"`
StorageLocation string `json:"storage_location,omitempty"`
StorageRoot string `json:"storage_root,omitempty"`
UpdatedAt int `json:"updated_at,omitempty"`
UpdatedBy string `json:"updated_by,omitempty"`
Object []ResourceShareObject `json:"object,omitempty"`
CreatedAt int `json:"created_at,omitempty"`
CreatedBy string `json:"created_by,omitempty"`
Id string `json:"id,omitempty"`
Name string `json:"name"`
Owner string `json:"owner,omitempty"`
Object []ResourceShareObject `json:"object,omitempty"`
}

View File

@ -15,7 +15,6 @@ type ResourceSqlTable struct {
ClusterKeys []string `json:"cluster_keys,omitempty"`
Comment string `json:"comment,omitempty"`
DataSourceFormat string `json:"data_source_format,omitempty"`
EffectiveProperties map[string]string `json:"effective_properties,omitempty"`
Id string `json:"id,omitempty"`
Name string `json:"name"`
Options map[string]string `json:"options,omitempty"`

View File

@ -21,7 +21,7 @@ type Root struct {
const ProviderHost = "registry.terraform.io"
const ProviderSource = "databricks/databricks"
const ProviderVersion = "1.52.0"
const ProviderVersion = "1.50.0"
func NewRoot() *Root {
return &Root{

View File

@ -18,7 +18,6 @@ import (
"github.com/databricks/cli/bundle/python"
"github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/sync"
terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
)
@ -129,7 +128,7 @@ properties such as the 'catalog' or 'storage' are changed:`
}
// The deploy phase deploys artifacts and resources.
func Deploy(outputHandler sync.OutputHandler) bundle.Mutator {
func Deploy() bundle.Mutator {
// Core mutators that CRUD resources and modify deployment state. These
// mutators need informed consent if they are potentially destructive.
deployCore := bundle.Defer(
@ -158,7 +157,7 @@ func Deploy(outputHandler sync.OutputHandler) bundle.Mutator {
libraries.ExpandGlobReferences(),
libraries.Upload(),
python.TransformWheelTask(),
files.Upload(outputHandler),
files.Upload(),
deploy.StateUpdate(),
deploy.StatePush(),
permissions.ApplyWorkspaceRootPermissions(),

View File

@ -1,36 +0,0 @@
bundle:
name: clusters
workspace:
host: https://acme.cloud.databricks.com/
resources:
clusters:
foo:
cluster_name: foo
num_workers: 2
node_type_id: "i3.xlarge"
autoscale:
min_workers: 2
max_workers: 7
spark_version: "13.3.x-scala2.12"
spark_conf:
"spark.executor.memory": "2g"
targets:
default:
development:
resources:
clusters:
foo:
cluster_name: foo-override
num_workers: 3
node_type_id: "m5.xlarge"
autoscale:
min_workers: 1
max_workers: 3
spark_version: "15.2.x-scala2.12"
spark_conf:
"spark.executor.memory": "4g"
"spark.executor.memory2": "4g"

View File

@ -1,36 +0,0 @@
package config_tests
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestClusters(t *testing.T) {
b := load(t, "./clusters")
assert.Equal(t, "clusters", b.Config.Bundle.Name)
cluster := b.Config.Resources.Clusters["foo"]
assert.Equal(t, "foo", cluster.ClusterName)
assert.Equal(t, "13.3.x-scala2.12", cluster.SparkVersion)
assert.Equal(t, "i3.xlarge", cluster.NodeTypeId)
assert.Equal(t, 2, cluster.NumWorkers)
assert.Equal(t, "2g", cluster.SparkConf["spark.executor.memory"])
assert.Equal(t, 2, cluster.Autoscale.MinWorkers)
assert.Equal(t, 7, cluster.Autoscale.MaxWorkers)
}
func TestClustersOverride(t *testing.T) {
b := loadTarget(t, "./clusters", "development")
assert.Equal(t, "clusters", b.Config.Bundle.Name)
cluster := b.Config.Resources.Clusters["foo"]
assert.Equal(t, "foo-override", cluster.ClusterName)
assert.Equal(t, "15.2.x-scala2.12", cluster.SparkVersion)
assert.Equal(t, "m5.xlarge", cluster.NodeTypeId)
assert.Equal(t, 3, cluster.NumWorkers)
assert.Equal(t, "4g", cluster.SparkConf["spark.executor.memory"])
assert.Equal(t, "4g", cluster.SparkConf["spark.executor.memory2"])
assert.Equal(t, 1, cluster.Autoscale.MinWorkers)
assert.Equal(t, 3, cluster.Autoscale.MaxWorkers)
}

View File

@ -88,21 +88,3 @@ func TestComplexVariablesOverrideWithMultipleFiles(t *testing.T) {
require.Equalf(t, "false", cluster.NewCluster.SparkConf["spark.speculation"], "cluster: %v", cluster.JobClusterKey)
}
}
func TestComplexVariablesOverrideWithFullSyntax(t *testing.T) {
b, diags := loadTargetWithDiags("variables/complex", "dev")
require.Empty(t, diags)
diags = bundle.Apply(context.Background(), b, bundle.Seq(
mutator.SetVariables(),
mutator.ResolveVariableReferencesInComplexVariables(),
mutator.ResolveVariableReferences(
"variables",
),
))
require.NoError(t, diags.Error())
require.Empty(t, diags)
complexvar := b.Config.Variables["complexvar"].Value
require.Equal(t, map[string]interface{}{"key1": "1", "key2": "2", "key3": "3"}, complexvar)
}

View File

@ -35,13 +35,6 @@ variables:
- jar: "/path/to/jar"
- egg: "/path/to/egg"
- whl: "/path/to/whl"
complexvar:
type: complex
description: "A complex variable"
default:
key1: "value1"
key2: "value2"
key3: "value3"
targets:
@ -56,9 +49,3 @@ targets:
spark_conf:
spark.speculation: false
spark.databricks.delta.retentionDurationCheck.enabled: false
complexvar:
type: complex
default:
key1: "1"
key2: "2"
key3: "3"

View File

@ -10,7 +10,6 @@ import (
"github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/sync"
"github.com/spf13/cobra"
)
@ -24,19 +23,13 @@ func newDeployCommand() *cobra.Command {
var force bool
var forceLock bool
var failOnActiveRuns bool
var clusterId string
var computeID string
var autoApprove bool
var verbose bool
cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.")
cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.")
cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.")
cmd.Flags().StringVar(&clusterId, "compute-id", "", "Override cluster in the deployment with the given compute ID.")
cmd.Flags().StringVarP(&clusterId, "cluster-id", "c", "", "Override cluster in the deployment with the given cluster ID.")
cmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.")
cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals that might be required for deployment.")
cmd.Flags().MarkDeprecated("compute-id", "use --cluster-id instead")
cmd.Flags().BoolVar(&verbose, "verbose", false, "Enable verbose output.")
// Verbose flag currently only affects file sync output, it's used by the vscode extension
cmd.Flags().MarkHidden("verbose")
cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
@ -49,10 +42,7 @@ func newDeployCommand() *cobra.Command {
b.AutoApprove = autoApprove
if cmd.Flag("compute-id").Changed {
b.Config.Bundle.ClusterId = clusterId
}
if cmd.Flag("cluster-id").Changed {
b.Config.Bundle.ClusterId = clusterId
b.Config.Bundle.ComputeID = computeID
}
if cmd.Flag("fail-on-active-runs").Changed {
b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns
@ -61,18 +51,11 @@ func newDeployCommand() *cobra.Command {
return nil
})
var outputHandler sync.OutputHandler
if verbose {
outputHandler = func(ctx context.Context, c <-chan sync.Event) {
sync.TextOutput(ctx, c, cmd.OutOrStdout())
}
}
diags = diags.Extend(
bundle.Apply(ctx, b, bundle.Seq(
phases.Initialize(),
phases.Build(),
phases.Deploy(outputHandler),
phases.Deploy(),
)),
)
}

View File

@ -55,7 +55,13 @@ task or a Python wheel task, the second example applies.
return diags.Error()
}
diags = bundle.Apply(ctx, b, phases.Initialize())
diags = bundle.Apply(ctx, b, bundle.Seq(
phases.Initialize(),
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
terraform.Load(terraform.ErrorOnEmptyState),
))
if err := diags.Error(); err != nil {
return err
}
@ -78,16 +84,6 @@ task or a Python wheel task, the second example applies.
return fmt.Errorf("expected a KEY of the resource to run")
}
diags = bundle.Apply(ctx, b, bundle.Seq(
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
terraform.Load(terraform.ErrorOnEmptyState),
))
if err := diags.Error(); err != nil {
return err
}
runner, err := run.Find(b, args[0])
if err != nil {
return err

View File

@ -5,10 +5,12 @@ import (
"context"
"encoding/json"
"io"
"github.com/databricks/cli/libs/sync"
)
// Read synchronization events and write them as JSON to the specified writer (typically stdout).
func JsonOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) {
enc := json.NewEncoder(w)
for {
select {
@ -29,7 +31,7 @@ func JsonOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
}
// Read synchronization events and write them as text to the specified writer (typically stdout).
func TextOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
func textOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) {
bw := bufio.NewWriter(w)
for {

View File

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"path/filepath"
stdsync "sync"
"time"
"github.com/databricks/cli/bundle"
@ -45,21 +46,6 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
return nil, flag.ErrHelp
}
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch f.output {
case flags.OutputText:
outputFunc = sync.TextOutput
case flags.OutputJSON:
outputFunc = sync.JsonOutput
}
var outputHandler sync.OutputHandler
if outputFunc != nil {
outputHandler = func(ctx context.Context, events <-chan sync.Event) {
outputFunc(ctx, events, cmd.OutOrStdout())
}
}
opts := sync.SyncOptions{
LocalRoot: vfs.MustNew(args[0]),
Paths: []string{"."},
@ -76,8 +62,6 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
// exist and add it to the `.gitignore` file in the root.
SnapshotBasePath: filepath.Join(args[0], ".databricks"),
WorkspaceClient: root.WorkspaceClient(cmd.Context()),
OutputHandler: outputHandler,
}
return &opts, nil
}
@ -134,7 +118,23 @@ func New() *cobra.Command {
if err != nil {
return err
}
defer s.Close()
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch f.output {
case flags.OutputText:
outputFunc = textOutput
case flags.OutputJSON:
outputFunc = jsonOutput
}
var wg stdsync.WaitGroup
if outputFunc != nil {
wg.Add(1)
go func() {
defer wg.Done()
outputFunc(ctx, s.Events(), cmd.OutOrStdout())
}()
}
if f.watch {
err = s.RunContinuous(ctx)
@ -142,6 +142,8 @@ func New() *cobra.Command {
_, err = s.RunOnce(ctx)
}
s.Close()
wg.Wait()
return err
}

View File

@ -1,16 +0,0 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for job name"
},
"spark_version": {
"type": "string",
"description": "Spark version used for job cluster"
},
"node_type_id": {
"type": "string",
"description": "Node type id for job cluster"
}
}
}

View File

@ -1,24 +0,0 @@
bundle:
name: basic
workspace:
root_path: "~/.bundle/{{.unique_id}}"
resources:
clusters:
test_cluster:
cluster_name: "test-cluster-{{.unique_id}}"
spark_version: "{{.spark_version}}"
node_type_id: "{{.node_type_id}}"
num_workers: 2
spark_conf:
"spark.executor.memory": "2g"
jobs:
foo:
name: test-job-with-cluster-{{.unique_id}}
tasks:
- task_key: my_notebook_task
existing_cluster_id: "${resources.clusters.test_cluster.cluster_id}"
spark_python_task:
python_file: ./hello_world.py

View File

@ -1 +0,0 @@
print("Hello World!")

View File

@ -1,56 +0,0 @@
package bundle
import (
"fmt"
"testing"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/internal/acc"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/env"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
func TestAccDeployBundleWithCluster(t *testing.T) {
ctx, wt := acc.WorkspaceTest(t)
if testutil.IsAWSCloud(wt.T) {
t.Skip("Skipping test for AWS cloud because it is not permitted to create clusters")
}
nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
uniqueId := uuid.New().String()
root, err := initTestTemplate(t, ctx, "clusters", map[string]any{
"unique_id": uniqueId,
"node_type_id": nodeTypeId,
"spark_version": defaultSparkVersion,
})
require.NoError(t, err)
t.Cleanup(func() {
err = destroyBundle(t, ctx, root)
require.NoError(t, err)
cluster, err := wt.W.Clusters.GetByClusterName(ctx, fmt.Sprintf("test-cluster-%s", uniqueId))
if err != nil {
require.ErrorContains(t, err, "does not exist")
} else {
require.Contains(t, []compute.State{compute.StateTerminated, compute.StateTerminating}, cluster.State)
}
})
err = deployBundle(t, ctx, root)
require.NoError(t, err)
// Cluster should exists after bundle deployment
cluster, err := wt.W.Clusters.GetByClusterName(ctx, fmt.Sprintf("test-cluster-%s", uniqueId))
require.NoError(t, err)
require.NotNil(t, cluster)
out, err := runResource(t, ctx, root, "foo")
require.NoError(t, err)
require.Contains(t, out, "Hello World!")
}

View File

@ -49,7 +49,3 @@ func GetCloud(t *testing.T) Cloud {
}
return -1
}
func IsAWSCloud(t *testing.T) bool {
return GetCloud(t) == AWS
}

View File

@ -209,26 +209,7 @@ func TestRepositoryGitConfigWhenNotARepo(t *testing.T) {
}
func TestRepositoryOriginUrlRemovesUserCreds(t *testing.T) {
tcases := []struct {
url string
expected string
}{
{
url: "https://username:token@github.com/databricks/foobar.git",
expected: "https://github.com/databricks/foobar.git",
},
{
// Note: The token is still considered and parsed as a username here.
// However credentials integrations by Git providers like GitHub
// allow for setting a PAT token as a username.
url: "https://token@github.com/databricks/foobar.git",
expected: "https://github.com/databricks/foobar.git",
},
}
for _, tc := range tcases {
repo := newTestRepository(t)
repo.addOriginUrl(tc.url)
repo.assertOriginUrl(tc.expected)
}
repo := newTestRepository(t)
repo.addOriginUrl("https://username:token@github.com/databricks/foobar.git")
repo.assertOriginUrl("https://github.com/databricks/foobar.git")
}

View File

@ -3,7 +3,6 @@ package sync
import (
"context"
"fmt"
stdsync "sync"
"time"
"github.com/databricks/cli/libs/filer"
@ -16,8 +15,6 @@ import (
"github.com/databricks/databricks-sdk-go/service/iam"
)
type OutputHandler func(context.Context, <-chan Event)
type SyncOptions struct {
LocalRoot vfs.Path
Paths []string
@ -37,8 +34,6 @@ type SyncOptions struct {
CurrentUser *iam.User
Host string
OutputHandler OutputHandler
}
type Sync struct {
@ -54,10 +49,6 @@ type Sync struct {
// Synchronization progress events are sent to this event notifier.
notifier EventNotifier
seq int
// WaitGroup is automatically created when an output handler is provided in the SyncOptions.
// Close call is required to ensure the output handler goroutine handles all events in time.
outputWaitGroup *stdsync.WaitGroup
}
// New initializes and returns a new [Sync] instance.
@ -115,41 +106,31 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
return nil, err
}
var notifier EventNotifier
var outputWaitGroup = &stdsync.WaitGroup{}
if opts.OutputHandler != nil {
ch := make(chan Event, MaxRequestsInFlight)
notifier = &ChannelNotifier{ch}
outputWaitGroup.Add(1)
go func() {
defer outputWaitGroup.Done()
opts.OutputHandler(ctx, ch)
}()
} else {
notifier = &NopNotifier{}
}
return &Sync{
SyncOptions: &opts,
fileSet: fileSet,
includeFileSet: includeFileSet,
excludeFileSet: excludeFileSet,
snapshot: snapshot,
filer: filer,
notifier: notifier,
outputWaitGroup: outputWaitGroup,
seq: 0,
fileSet: fileSet,
includeFileSet: includeFileSet,
excludeFileSet: excludeFileSet,
snapshot: snapshot,
filer: filer,
notifier: &NopNotifier{},
seq: 0,
}, nil
}
func (s *Sync) Events() <-chan Event {
ch := make(chan Event, MaxRequestsInFlight)
s.notifier = &ChannelNotifier{ch}
return ch
}
func (s *Sync) Close() {
if s.notifier == nil {
return
}
s.notifier.Close()
s.notifier = nil
s.outputWaitGroup.Wait()
}
func (s *Sync) notifyStart(ctx context.Context, d diff) {

View File

@ -3,12 +3,6 @@ resources:
pipelines:
{{.project_name}}_pipeline:
name: {{.project_name}}_pipeline
{{- if eq default_catalog ""}}
## Specify the 'catalog' field to configure this pipeline to make use of Unity Catalog:
# catalog: catalog_name
{{- else}}
catalog: {{default_catalog}}
{{- end}}
target: {{.project_name}}_${bundle.environment}
libraries:
- notebook: