Merge branch 'main' into feature/logout

This commit is contained in:
Richard Nordström 2024-09-23 21:43:07 +02:00 committed by GitHub
commit 18d3fea34e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 764 additions and 70 deletions

View File

@ -38,8 +38,11 @@ type Bundle struct {
// Annotated readonly as this should be set at the target level. // Annotated readonly as this should be set at the target level.
Mode Mode `json:"mode,omitempty" bundle:"readonly"` Mode Mode `json:"mode,omitempty" bundle:"readonly"`
// Overrides the compute used for jobs and other supported assets. // DEPRECATED: Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"` ComputeId string `json:"compute_id,omitempty"`
// Overrides the cluster used for jobs and other supported assets.
ClusterId string `json:"cluster_id,omitempty"`
// Deployment section specifies deployment related configuration for bundle // Deployment section specifies deployment related configuration for bundle
Deployment Deployment `json:"deployment,omitempty"` Deployment Deployment `json:"deployment,omitempty"`

View File

@ -160,6 +160,21 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
// the Databricks UI and via the SQL API. // the Databricks UI and via the SQL API.
} }
// Clusters: Prefix, Tags
for _, c := range r.Clusters {
c.ClusterName = prefix + c.ClusterName
if c.CustomTags == nil {
c.CustomTags = make(map[string]string)
}
for _, tag := range tags {
normalisedKey := b.Tagging.NormalizeKey(tag.Key)
normalisedValue := b.Tagging.NormalizeValue(tag.Value)
if _, ok := c.CustomTags[normalisedKey]; !ok {
c.CustomTags[normalisedKey] = normalisedValue
}
}
}
return nil return nil
} }

View File

@ -0,0 +1,87 @@
package mutator
import (
"context"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
)
type computeIdToClusterId struct{}
func ComputeIdToClusterId() bundle.Mutator {
return &computeIdToClusterId{}
}
func (m *computeIdToClusterId) Name() string {
return "ComputeIdToClusterId"
}
func (m *computeIdToClusterId) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
var diags diag.Diagnostics
// The "compute_id" key is set; rewrite it to "cluster_id".
err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
v, d := rewriteComputeIdToClusterId(v, dyn.NewPath(dyn.Key("bundle")))
diags = diags.Extend(d)
// Check if the "compute_id" key is set in any target overrides.
return dyn.MapByPattern(v, dyn.NewPattern(dyn.Key("targets"), dyn.AnyKey()), func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
v, d := rewriteComputeIdToClusterId(v, dyn.Path{})
diags = diags.Extend(d)
return v, nil
})
})
diags = diags.Extend(diag.FromErr(err))
return diags
}
func rewriteComputeIdToClusterId(v dyn.Value, p dyn.Path) (dyn.Value, diag.Diagnostics) {
var diags diag.Diagnostics
computeIdPath := p.Append(dyn.Key("compute_id"))
computeId, err := dyn.GetByPath(v, computeIdPath)
// If the "compute_id" key is not set, we don't need to do anything.
if err != nil {
return v, nil
}
if computeId.Kind() == dyn.KindInvalid {
return v, nil
}
diags = diags.Append(diag.Diagnostic{
Severity: diag.Warning,
Summary: "compute_id is deprecated, please use cluster_id instead",
Locations: computeId.Locations(),
Paths: []dyn.Path{computeIdPath},
})
clusterIdPath := p.Append(dyn.Key("cluster_id"))
nv, err := dyn.SetByPath(v, clusterIdPath, computeId)
if err != nil {
return dyn.InvalidValue, diag.FromErr(err)
}
// Drop the "compute_id" key.
vout, err := dyn.Walk(nv, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
switch len(p) {
case 0:
return v, nil
case 1:
if p[0] == dyn.Key("compute_id") {
return v, dyn.ErrDrop
}
return v, nil
case 2:
if p[1] == dyn.Key("compute_id") {
return v, dyn.ErrDrop
}
}
return v, dyn.ErrSkip
})
diags = diags.Extend(diag.FromErr(err))
return vout, diags
}

View File

@ -0,0 +1,57 @@
package mutator_test
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/libs/diag"
"github.com/stretchr/testify/assert"
)
func TestComputeIdToClusterId(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
ComputeId: "compute-id",
},
},
}
diags := bundle.Apply(context.Background(), b, mutator.ComputeIdToClusterId())
assert.NoError(t, diags.Error())
assert.Equal(t, "compute-id", b.Config.Bundle.ClusterId)
assert.Empty(t, b.Config.Bundle.ComputeId)
assert.Len(t, diags, 1)
assert.Equal(t, "compute_id is deprecated, please use cluster_id instead", diags[0].Summary)
assert.Equal(t, diag.Warning, diags[0].Severity)
}
func TestComputeIdToClusterIdInTargetOverride(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Targets: map[string]*config.Target{
"dev": {
ComputeId: "compute-id-dev",
},
},
},
}
diags := bundle.Apply(context.Background(), b, mutator.ComputeIdToClusterId())
assert.NoError(t, diags.Error())
assert.Empty(t, b.Config.Targets["dev"].ComputeId)
diags = diags.Extend(bundle.Apply(context.Background(), b, mutator.SelectTarget("dev")))
assert.NoError(t, diags.Error())
assert.Equal(t, "compute-id-dev", b.Config.Bundle.ClusterId)
assert.Empty(t, b.Config.Bundle.ComputeId)
assert.Len(t, diags, 1)
assert.Equal(t, "compute_id is deprecated, please use cluster_id instead", diags[0].Summary)
assert.Equal(t, diag.Warning, diags[0].Severity)
}

View File

@ -23,6 +23,7 @@ func DefaultMutators() []bundle.Mutator {
VerifyCliVersion(), VerifyCliVersion(),
EnvironmentsToTargets(), EnvironmentsToTargets(),
ComputeIdToClusterId(),
InitializeVariables(), InitializeVariables(),
DefineDefaultTarget(), DefineDefaultTarget(),
LoadGitDetails(), LoadGitDetails(),

View File

@ -39,22 +39,22 @@ func overrideJobCompute(j *resources.Job, compute string) {
func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
if b.Config.Bundle.Mode != config.Development { if b.Config.Bundle.Mode != config.Development {
if b.Config.Bundle.ComputeID != "" { if b.Config.Bundle.ClusterId != "" {
return diag.Errorf("cannot override compute for an target that does not use 'mode: development'") return diag.Errorf("cannot override compute for an target that does not use 'mode: development'")
} }
return nil return nil
} }
if v := env.Get(ctx, "DATABRICKS_CLUSTER_ID"); v != "" { if v := env.Get(ctx, "DATABRICKS_CLUSTER_ID"); v != "" {
b.Config.Bundle.ComputeID = v b.Config.Bundle.ClusterId = v
} }
if b.Config.Bundle.ComputeID == "" { if b.Config.Bundle.ClusterId == "" {
return nil return nil
} }
r := b.Config.Resources r := b.Config.Resources
for i := range r.Jobs { for i := range r.Jobs {
overrideJobCompute(r.Jobs[i], b.Config.Bundle.ComputeID) overrideJobCompute(r.Jobs[i], b.Config.Bundle.ClusterId)
} }
return nil return nil

View File

@ -20,7 +20,7 @@ func TestOverrideDevelopment(t *testing.T) {
Config: config.Root{ Config: config.Root{
Bundle: config.Bundle{ Bundle: config.Bundle{
Mode: config.Development, Mode: config.Development,
ComputeID: "newClusterID", ClusterId: "newClusterID",
}, },
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{
@ -144,7 +144,7 @@ func TestOverrideProduction(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Bundle: config.Bundle{ Bundle: config.Bundle{
ComputeID: "newClusterID", ClusterId: "newClusterID",
}, },
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{

View File

@ -13,6 +13,7 @@ import (
"github.com/databricks/cli/libs/tags" "github.com/databricks/cli/libs/tags"
sdkconfig "github.com/databricks/databricks-sdk-go/config" sdkconfig "github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml" "github.com/databricks/databricks-sdk-go/service/ml"
@ -119,6 +120,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle {
Schemas: map[string]*resources.Schema{ Schemas: map[string]*resources.Schema{
"schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}}, "schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}},
}, },
Clusters: map[string]*resources.Cluster{
"cluster1": {ClusterSpec: &compute.ClusterSpec{ClusterName: "cluster1", SparkVersion: "13.2.x", NumWorkers: 1}},
},
}, },
}, },
// Use AWS implementation for testing. // Use AWS implementation for testing.
@ -177,6 +181,9 @@ func TestProcessTargetModeDevelopment(t *testing.T) {
// Schema 1 // Schema 1
assert.Equal(t, "dev_lennart_schema1", b.Config.Resources.Schemas["schema1"].Name) assert.Equal(t, "dev_lennart_schema1", b.Config.Resources.Schemas["schema1"].Name)
// Clusters
assert.Equal(t, "[dev lennart] cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
} }
func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) { func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) {
@ -281,6 +288,7 @@ func TestProcessTargetModeDefault(t *testing.T) {
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
} }
func TestProcessTargetModeProduction(t *testing.T) { func TestProcessTargetModeProduction(t *testing.T) {
@ -312,6 +320,7 @@ func TestProcessTargetModeProduction(t *testing.T) {
b.Config.Resources.Experiments["experiment2"].Permissions = permissions b.Config.Resources.Experiments["experiment2"].Permissions = permissions
b.Config.Resources.Models["model1"].Permissions = permissions b.Config.Resources.Models["model1"].Permissions = permissions
b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Permissions = permissions b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Permissions = permissions
b.Config.Resources.Clusters["cluster1"].Permissions = permissions
diags = validateProductionMode(context.Background(), b, false) diags = validateProductionMode(context.Background(), b, false)
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
@ -322,6 +331,7 @@ func TestProcessTargetModeProduction(t *testing.T) {
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
} }
func TestProcessTargetModeProductionOkForPrincipal(t *testing.T) { func TestProcessTargetModeProductionOkForPrincipal(t *testing.T) {

View File

@ -32,6 +32,7 @@ func allResourceTypes(t *testing.T) []string {
// the dyn library gives us the correct list of all resources supported. Please // the dyn library gives us the correct list of all resources supported. Please
// also update this check when adding a new resource // also update this check when adding a new resource
require.Equal(t, []string{ require.Equal(t, []string{
"clusters",
"experiments", "experiments",
"jobs", "jobs",
"model_serving_endpoints", "model_serving_endpoints",
@ -133,6 +134,7 @@ func TestRunAsErrorForUnsupportedResources(t *testing.T) {
// some point in the future. These resources are (implicitly) on the deny list, since // some point in the future. These resources are (implicitly) on the deny list, since
// they are not on the allow list below. // they are not on the allow list below.
allowList := []string{ allowList := []string{
"clusters",
"jobs", "jobs",
"models", "models",
"registered_models", "registered_models",

View File

@ -19,6 +19,7 @@ type Resources struct {
RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"` RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"`
QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"` QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"`
Schemas map[string]*resources.Schema `json:"schemas,omitempty"` Schemas map[string]*resources.Schema `json:"schemas,omitempty"`
Clusters map[string]*resources.Cluster `json:"clusters,omitempty"`
} }
type ConfigResource interface { type ConfigResource interface {

View File

@ -0,0 +1,39 @@
package resources
import (
"context"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/compute"
)
type Cluster struct {
ID string `json:"id,omitempty" bundle:"readonly"`
Permissions []Permission `json:"permissions,omitempty"`
ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
*compute.ClusterSpec
}
func (s *Cluster) UnmarshalJSON(b []byte) error {
return marshal.Unmarshal(b, s)
}
func (s Cluster) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s)
}
func (s *Cluster) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
_, err := w.Clusters.GetByClusterId(ctx, id)
if err != nil {
log.Debugf(ctx, "cluster %s does not exist", id)
return false, err
}
return true, nil
}
func (s *Cluster) TerraformResourceName() string {
return "databricks_cluster"
}

View File

@ -366,9 +366,9 @@ func (r *Root) MergeTargetOverrides(name string) error {
} }
} }
// Merge `compute_id`. This field must be overwritten if set, not merged. // Merge `cluster_id`. This field must be overwritten if set, not merged.
if v := target.Get("compute_id"); v.Kind() != dyn.KindInvalid { if v := target.Get("cluster_id"); v.Kind() != dyn.KindInvalid {
root, err = dyn.SetByPath(root, dyn.NewPath(dyn.Key("bundle"), dyn.Key("compute_id")), v) root, err = dyn.SetByPath(root, dyn.NewPath(dyn.Key("bundle"), dyn.Key("cluster_id")), v)
if err != nil { if err != nil {
return err return err
} }

View File

@ -24,8 +24,11 @@ type Target struct {
// name prefix of deployed resources. // name prefix of deployed resources.
Presets Presets `json:"presets,omitempty"` Presets Presets `json:"presets,omitempty"`
// Overrides the compute used for jobs and other supported assets. // DEPRECATED: Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"` ComputeId string `json:"compute_id,omitempty"`
// Overrides the cluster used for jobs and other supported assets.
ClusterId string `json:"cluster_id,omitempty"`
Bundle *Bundle `json:"bundle,omitempty"` Bundle *Bundle `json:"bundle,omitempty"`

View File

@ -8,9 +8,12 @@ import (
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/sync"
) )
type upload struct{} type upload struct {
outputHandler sync.OutputHandler
}
func (m *upload) Name() string { func (m *upload) Name() string {
return "files.Upload" return "files.Upload"
@ -18,11 +21,18 @@ func (m *upload) Name() string {
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath)) cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath))
sync, err := GetSync(ctx, bundle.ReadOnly(b)) opts, err := GetSyncOptions(ctx, bundle.ReadOnly(b))
if err != nil { if err != nil {
return diag.FromErr(err) return diag.FromErr(err)
} }
opts.OutputHandler = m.outputHandler
sync, err := sync.New(ctx, *opts)
if err != nil {
return diag.FromErr(err)
}
defer sync.Close()
b.Files, err = sync.RunOnce(ctx) b.Files, err = sync.RunOnce(ctx)
if err != nil { if err != nil {
return diag.FromErr(err) return diag.FromErr(err)
@ -32,6 +42,6 @@ func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
return nil return nil
} }
func Upload() bundle.Mutator { func Upload(outputHandler sync.OutputHandler) bundle.Mutator {
return &upload{} return &upload{outputHandler}
} }

View File

@ -231,6 +231,13 @@ func BundleToTerraform(config *config.Root) *schema.Root {
tfroot.Resource.QualityMonitor[k] = &dst tfroot.Resource.QualityMonitor[k] = &dst
} }
for k, src := range config.Resources.Clusters {
noResources = false
var dst schema.ResourceCluster
conv(src, &dst)
tfroot.Resource.Cluster[k] = &dst
}
// We explicitly set "resource" to nil to omit it from a JSON encoding. // We explicitly set "resource" to nil to omit it from a JSON encoding.
// This is required because the terraform CLI requires >= 1 resources defined // This is required because the terraform CLI requires >= 1 resources defined
// if the "resource" property is used in a .tf.json file. // if the "resource" property is used in a .tf.json file.
@ -394,6 +401,16 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
} }
cur.ID = instance.Attributes.ID cur.ID = instance.Attributes.ID
config.Resources.Schemas[resource.Name] = cur config.Resources.Schemas[resource.Name] = cur
case "databricks_cluster":
if config.Resources.Clusters == nil {
config.Resources.Clusters = make(map[string]*resources.Cluster)
}
cur := config.Resources.Clusters[resource.Name]
if cur == nil {
cur = &resources.Cluster{ModifiedStatus: resources.ModifiedStatusDeleted}
}
cur.ID = instance.Attributes.ID
config.Resources.Clusters[resource.Name] = cur
case "databricks_permissions": case "databricks_permissions":
case "databricks_grants": case "databricks_grants":
// Ignore; no need to pull these back into the configuration. // Ignore; no need to pull these back into the configuration.
@ -443,6 +460,11 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
src.ModifiedStatus = resources.ModifiedStatusCreated src.ModifiedStatus = resources.ModifiedStatusCreated
} }
} }
for _, src := range config.Resources.Clusters {
if src.ModifiedStatus == "" && src.ID == "" {
src.ModifiedStatus = resources.ModifiedStatusCreated
}
}
return nil return nil
} }

View File

@ -663,6 +663,14 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "1"}}, {Attributes: stateInstanceAttributes{ID: "1"}},
}, },
}, },
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
}, },
} }
err := TerraformToBundle(&tfState, &config) err := TerraformToBundle(&tfState, &config)
@ -692,6 +700,9 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID) assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus)
assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }
@ -754,6 +765,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
}, },
}, },
}, },
Clusters: map[string]*resources.Cluster{
"test_cluster": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster",
},
},
},
}, },
} }
var tfState = resourcesState{ var tfState = resourcesState{
@ -786,6 +804,9 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID) assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }
@ -888,6 +909,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
}, },
}, },
}, },
Clusters: map[string]*resources.Cluster{
"test_cluster": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster",
},
},
"test_cluster_new": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster_new",
},
},
},
}, },
} }
var tfState = resourcesState{ var tfState = resourcesState{
@ -1020,6 +1053,22 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "2"}}, {Attributes: stateInstanceAttributes{ID: "2"}},
}, },
}, },
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster_old",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "2"}},
},
},
}, },
} }
err := TerraformToBundle(&tfState, &config) err := TerraformToBundle(&tfState, &config)
@ -1081,6 +1130,13 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID) assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus)
assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ModifiedStatus)
assert.Equal(t, "2", config.Resources.Clusters["test_cluster_old"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster_old"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Clusters["test_cluster_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster_new"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }

View File

@ -58,6 +58,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D
path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...) path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...)
case dyn.Key("schemas"): case dyn.Key("schemas"):
path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...) path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...)
case dyn.Key("clusters"):
path = dyn.NewPath(dyn.Key("databricks_cluster")).Append(path[2:]...)
default: default:
// Trigger "key not found" for unknown resource types. // Trigger "key not found" for unknown resource types.
return dyn.GetByPath(root, path) return dyn.GetByPath(root, path)

View File

@ -31,6 +31,7 @@ func TestInterpolate(t *testing.T) {
"other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}", "other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}",
"other_registered_model": "${resources.registered_models.other_registered_model.id}", "other_registered_model": "${resources.registered_models.other_registered_model.id}",
"other_schema": "${resources.schemas.other_schema.id}", "other_schema": "${resources.schemas.other_schema.id}",
"other_cluster": "${resources.clusters.other_cluster.id}",
}, },
Tasks: []jobs.Task{ Tasks: []jobs.Task{
{ {
@ -67,6 +68,7 @@ func TestInterpolate(t *testing.T) {
assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"]) assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"])
assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"]) assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"])
assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"]) assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"])
assert.Equal(t, "${databricks_cluster.other_cluster.id}", j.Tags["other_cluster"])
m := b.Config.Resources.Models["my_model"] m := b.Config.Resources.Models["my_model"]
assert.Equal(t, "my_model", m.Model.Name) assert.Equal(t, "my_model", m.Model.Name)

View File

@ -0,0 +1,52 @@
package tfdyn
import (
"context"
"fmt"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/compute"
)
func convertClusterResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the output value to the target schema.
vout, diags := convert.Normalize(compute.ClusterSpec{}, vin)
for _, diag := range diags {
log.Debugf(ctx, "cluster normalization diagnostic: %s", diag.Summary)
}
return vout, nil
}
type clusterConverter struct{}
func (clusterConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error {
vout, err := convertClusterResource(ctx, vin)
if err != nil {
return err
}
// We always set no_wait as it allows DABs not to wait for cluster to be started.
vout, err = dyn.Set(vout, "no_wait", dyn.V(true))
if err != nil {
return err
}
// Add the converted resource to the output.
out.Cluster[key] = vout.AsAny()
// Configure permissions for this resource.
if permissions := convertPermissionsResource(ctx, vin); permissions != nil {
permissions.JobId = fmt.Sprintf("${databricks_cluster.%s.id}", key)
out.Permissions["cluster_"+key] = permissions
}
return nil
}
func init() {
registerConverter("clusters", clusterConverter{})
}

View File

@ -0,0 +1,97 @@
package tfdyn
import (
"context"
"testing"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConvertCluster(t *testing.T) {
var src = resources.Cluster{
ClusterSpec: &compute.ClusterSpec{
NumWorkers: 3,
SparkVersion: "13.3.x-scala2.12",
ClusterName: "cluster",
SparkConf: map[string]string{
"spark.executor.memory": "2g",
},
AwsAttributes: &compute.AwsAttributes{
Availability: "ON_DEMAND",
},
AzureAttributes: &compute.AzureAttributes{
Availability: "SPOT",
},
DataSecurityMode: "USER_ISOLATION",
NodeTypeId: "m5.xlarge",
Autoscale: &compute.AutoScale{
MinWorkers: 1,
MaxWorkers: 10,
},
},
Permissions: []resources.Permission{
{
Level: "CAN_RUN",
UserName: "jack@gmail.com",
},
{
Level: "CAN_MANAGE",
ServicePrincipalName: "sp",
},
},
}
vin, err := convert.FromTyped(src, dyn.NilValue)
require.NoError(t, err)
ctx := context.Background()
out := schema.NewResources()
err = clusterConverter{}.Convert(ctx, "my_cluster", vin, out)
require.NoError(t, err)
cluster := out.Cluster["my_cluster"]
assert.Equal(t, map[string]any{
"num_workers": int64(3),
"spark_version": "13.3.x-scala2.12",
"cluster_name": "cluster",
"spark_conf": map[string]any{
"spark.executor.memory": "2g",
},
"aws_attributes": map[string]any{
"availability": "ON_DEMAND",
},
"azure_attributes": map[string]any{
"availability": "SPOT",
},
"data_security_mode": "USER_ISOLATION",
"no_wait": true,
"node_type_id": "m5.xlarge",
"autoscale": map[string]any{
"min_workers": int64(1),
"max_workers": int64(10),
},
}, cluster)
// Assert equality on the permissions
assert.Equal(t, &schema.ResourcePermissions{
JobId: "${databricks_cluster.my_cluster.id}",
AccessControl: []schema.ResourcePermissionsAccessControl{
{
PermissionLevel: "CAN_RUN",
UserName: "jack@gmail.com",
},
{
PermissionLevel: "CAN_MANAGE",
ServicePrincipalName: "sp",
},
},
}, out.Permissions["cluster_my_cluster"])
}

View File

@ -18,6 +18,7 @@ import (
"github.com/databricks/cli/bundle/python" "github.com/databricks/cli/bundle/python"
"github.com/databricks/cli/bundle/scripts" "github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/sync"
terraformlib "github.com/databricks/cli/libs/terraform" terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json" tfjson "github.com/hashicorp/terraform-json"
) )
@ -128,7 +129,7 @@ properties such as the 'catalog' or 'storage' are changed:`
} }
// The deploy phase deploys artifacts and resources. // The deploy phase deploys artifacts and resources.
func Deploy() bundle.Mutator { func Deploy(outputHandler sync.OutputHandler) bundle.Mutator {
// Core mutators that CRUD resources and modify deployment state. These // Core mutators that CRUD resources and modify deployment state. These
// mutators need informed consent if they are potentially destructive. // mutators need informed consent if they are potentially destructive.
deployCore := bundle.Defer( deployCore := bundle.Defer(
@ -157,7 +158,7 @@ func Deploy() bundle.Mutator {
libraries.ExpandGlobReferences(), libraries.ExpandGlobReferences(),
libraries.Upload(), libraries.Upload(),
python.TransformWheelTask(), python.TransformWheelTask(),
files.Upload(), files.Upload(outputHandler),
deploy.StateUpdate(), deploy.StateUpdate(),
deploy.StatePush(), deploy.StatePush(),
permissions.ApplyWorkspaceRootPermissions(), permissions.ApplyWorkspaceRootPermissions(),

View File

@ -0,0 +1,36 @@
bundle:
name: clusters
workspace:
host: https://acme.cloud.databricks.com/
resources:
clusters:
foo:
cluster_name: foo
num_workers: 2
node_type_id: "i3.xlarge"
autoscale:
min_workers: 2
max_workers: 7
spark_version: "13.3.x-scala2.12"
spark_conf:
"spark.executor.memory": "2g"
targets:
default:
development:
resources:
clusters:
foo:
cluster_name: foo-override
num_workers: 3
node_type_id: "m5.xlarge"
autoscale:
min_workers: 1
max_workers: 3
spark_version: "15.2.x-scala2.12"
spark_conf:
"spark.executor.memory": "4g"
"spark.executor.memory2": "4g"

View File

@ -0,0 +1,36 @@
package config_tests
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestClusters(t *testing.T) {
b := load(t, "./clusters")
assert.Equal(t, "clusters", b.Config.Bundle.Name)
cluster := b.Config.Resources.Clusters["foo"]
assert.Equal(t, "foo", cluster.ClusterName)
assert.Equal(t, "13.3.x-scala2.12", cluster.SparkVersion)
assert.Equal(t, "i3.xlarge", cluster.NodeTypeId)
assert.Equal(t, 2, cluster.NumWorkers)
assert.Equal(t, "2g", cluster.SparkConf["spark.executor.memory"])
assert.Equal(t, 2, cluster.Autoscale.MinWorkers)
assert.Equal(t, 7, cluster.Autoscale.MaxWorkers)
}
func TestClustersOverride(t *testing.T) {
b := loadTarget(t, "./clusters", "development")
assert.Equal(t, "clusters", b.Config.Bundle.Name)
cluster := b.Config.Resources.Clusters["foo"]
assert.Equal(t, "foo-override", cluster.ClusterName)
assert.Equal(t, "15.2.x-scala2.12", cluster.SparkVersion)
assert.Equal(t, "m5.xlarge", cluster.NodeTypeId)
assert.Equal(t, 3, cluster.NumWorkers)
assert.Equal(t, "4g", cluster.SparkConf["spark.executor.memory"])
assert.Equal(t, "4g", cluster.SparkConf["spark.executor.memory2"])
assert.Equal(t, 1, cluster.Autoscale.MinWorkers)
assert.Equal(t, 3, cluster.Autoscale.MaxWorkers)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/cmd/root" "github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/sync"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -23,13 +24,19 @@ func newDeployCommand() *cobra.Command {
var force bool var force bool
var forceLock bool var forceLock bool
var failOnActiveRuns bool var failOnActiveRuns bool
var computeID string var clusterId string
var autoApprove bool var autoApprove bool
var verbose bool
cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.") cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.")
cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.") cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.")
cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.") cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.")
cmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.") cmd.Flags().StringVar(&clusterId, "compute-id", "", "Override cluster in the deployment with the given compute ID.")
cmd.Flags().StringVarP(&clusterId, "cluster-id", "c", "", "Override cluster in the deployment with the given cluster ID.")
cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals that might be required for deployment.") cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals that might be required for deployment.")
cmd.Flags().MarkDeprecated("compute-id", "use --cluster-id instead")
cmd.Flags().BoolVar(&verbose, "verbose", false, "Enable verbose output.")
// Verbose flag currently only affects file sync output, it's used by the vscode extension
cmd.Flags().MarkHidden("verbose")
cmd.RunE = func(cmd *cobra.Command, args []string) error { cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context() ctx := cmd.Context()
@ -42,7 +49,10 @@ func newDeployCommand() *cobra.Command {
b.AutoApprove = autoApprove b.AutoApprove = autoApprove
if cmd.Flag("compute-id").Changed { if cmd.Flag("compute-id").Changed {
b.Config.Bundle.ComputeID = computeID b.Config.Bundle.ClusterId = clusterId
}
if cmd.Flag("cluster-id").Changed {
b.Config.Bundle.ClusterId = clusterId
} }
if cmd.Flag("fail-on-active-runs").Changed { if cmd.Flag("fail-on-active-runs").Changed {
b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns
@ -51,11 +61,18 @@ func newDeployCommand() *cobra.Command {
return nil return nil
}) })
var outputHandler sync.OutputHandler
if verbose {
outputHandler = func(ctx context.Context, c <-chan sync.Event) {
sync.TextOutput(ctx, c, cmd.OutOrStdout())
}
}
diags = diags.Extend( diags = diags.Extend(
bundle.Apply(ctx, b, bundle.Seq( bundle.Apply(ctx, b, bundle.Seq(
phases.Initialize(), phases.Initialize(),
phases.Build(), phases.Build(),
phases.Deploy(), phases.Deploy(outputHandler),
)), )),
) )
} }

View File

@ -55,13 +55,7 @@ task or a Python wheel task, the second example applies.
return diags.Error() return diags.Error()
} }
diags = bundle.Apply(ctx, b, bundle.Seq( diags = bundle.Apply(ctx, b, phases.Initialize())
phases.Initialize(),
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
terraform.Load(terraform.ErrorOnEmptyState),
))
if err := diags.Error(); err != nil { if err := diags.Error(); err != nil {
return err return err
} }
@ -84,6 +78,16 @@ task or a Python wheel task, the second example applies.
return fmt.Errorf("expected a KEY of the resource to run") return fmt.Errorf("expected a KEY of the resource to run")
} }
diags = bundle.Apply(ctx, b, bundle.Seq(
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
terraform.Load(terraform.ErrorOnEmptyState),
))
if err := diags.Error(); err != nil {
return err
}
runner, err := run.Find(b, args[0]) runner, err := run.Find(b, args[0])
if err != nil { if err != nil {
return err return err

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
stdsync "sync"
"time" "time"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
@ -46,6 +45,21 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
return nil, flag.ErrHelp return nil, flag.ErrHelp
} }
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch f.output {
case flags.OutputText:
outputFunc = sync.TextOutput
case flags.OutputJSON:
outputFunc = sync.JsonOutput
}
var outputHandler sync.OutputHandler
if outputFunc != nil {
outputHandler = func(ctx context.Context, events <-chan sync.Event) {
outputFunc(ctx, events, cmd.OutOrStdout())
}
}
opts := sync.SyncOptions{ opts := sync.SyncOptions{
LocalRoot: vfs.MustNew(args[0]), LocalRoot: vfs.MustNew(args[0]),
Paths: []string{"."}, Paths: []string{"."},
@ -62,6 +76,8 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
// exist and add it to the `.gitignore` file in the root. // exist and add it to the `.gitignore` file in the root.
SnapshotBasePath: filepath.Join(args[0], ".databricks"), SnapshotBasePath: filepath.Join(args[0], ".databricks"),
WorkspaceClient: root.WorkspaceClient(cmd.Context()), WorkspaceClient: root.WorkspaceClient(cmd.Context()),
OutputHandler: outputHandler,
} }
return &opts, nil return &opts, nil
} }
@ -118,23 +134,7 @@ func New() *cobra.Command {
if err != nil { if err != nil {
return err return err
} }
defer s.Close()
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch f.output {
case flags.OutputText:
outputFunc = textOutput
case flags.OutputJSON:
outputFunc = jsonOutput
}
var wg stdsync.WaitGroup
if outputFunc != nil {
wg.Add(1)
go func() {
defer wg.Done()
outputFunc(ctx, s.Events(), cmd.OutOrStdout())
}()
}
if f.watch { if f.watch {
err = s.RunContinuous(ctx) err = s.RunContinuous(ctx)
@ -142,8 +142,6 @@ func New() *cobra.Command {
_, err = s.RunOnce(ctx) _, err = s.RunOnce(ctx)
} }
s.Close()
wg.Wait()
return err return err
} }

View File

@ -0,0 +1,16 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for job name"
},
"spark_version": {
"type": "string",
"description": "Spark version used for job cluster"
},
"node_type_id": {
"type": "string",
"description": "Node type id for job cluster"
}
}
}

View File

@ -0,0 +1,24 @@
bundle:
name: basic
workspace:
root_path: "~/.bundle/{{.unique_id}}"
resources:
clusters:
test_cluster:
cluster_name: "test-cluster-{{.unique_id}}"
spark_version: "{{.spark_version}}"
node_type_id: "{{.node_type_id}}"
num_workers: 2
spark_conf:
"spark.executor.memory": "2g"
jobs:
foo:
name: test-job-with-cluster-{{.unique_id}}
tasks:
- task_key: my_notebook_task
existing_cluster_id: "${resources.clusters.test_cluster.cluster_id}"
spark_python_task:
python_file: ./hello_world.py

View File

@ -0,0 +1 @@
print("Hello World!")

View File

@ -0,0 +1,56 @@
package bundle
import (
"fmt"
"testing"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/internal/acc"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/env"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
func TestAccDeployBundleWithCluster(t *testing.T) {
ctx, wt := acc.WorkspaceTest(t)
if testutil.IsAWSCloud(wt.T) {
t.Skip("Skipping test for AWS cloud because it is not permitted to create clusters")
}
nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
uniqueId := uuid.New().String()
root, err := initTestTemplate(t, ctx, "clusters", map[string]any{
"unique_id": uniqueId,
"node_type_id": nodeTypeId,
"spark_version": defaultSparkVersion,
})
require.NoError(t, err)
t.Cleanup(func() {
err = destroyBundle(t, ctx, root)
require.NoError(t, err)
cluster, err := wt.W.Clusters.GetByClusterName(ctx, fmt.Sprintf("test-cluster-%s", uniqueId))
if err != nil {
require.ErrorContains(t, err, "does not exist")
} else {
require.Contains(t, []compute.State{compute.StateTerminated, compute.StateTerminating}, cluster.State)
}
})
err = deployBundle(t, ctx, root)
require.NoError(t, err)
// Cluster should exists after bundle deployment
cluster, err := wt.W.Clusters.GetByClusterName(ctx, fmt.Sprintf("test-cluster-%s", uniqueId))
require.NoError(t, err)
require.NotNil(t, cluster)
out, err := runResource(t, ctx, root, "foo")
require.NoError(t, err)
require.Contains(t, out, "Hello World!")
}

View File

@ -49,3 +49,7 @@ func GetCloud(t *testing.T) Cloud {
} }
return -1 return -1
} }
func IsAWSCloud(t *testing.T) bool {
return GetCloud(t) == AWS
}

View File

@ -209,7 +209,26 @@ func TestRepositoryGitConfigWhenNotARepo(t *testing.T) {
} }
func TestRepositoryOriginUrlRemovesUserCreds(t *testing.T) { func TestRepositoryOriginUrlRemovesUserCreds(t *testing.T) {
tcases := []struct {
url string
expected string
}{
{
url: "https://username:token@github.com/databricks/foobar.git",
expected: "https://github.com/databricks/foobar.git",
},
{
// Note: The token is still considered and parsed as a username here.
// However credentials integrations by Git providers like GitHub
// allow for setting a PAT token as a username.
url: "https://token@github.com/databricks/foobar.git",
expected: "https://github.com/databricks/foobar.git",
},
}
for _, tc := range tcases {
repo := newTestRepository(t) repo := newTestRepository(t)
repo.addOriginUrl("https://username:token@github.com/databricks/foobar.git") repo.addOriginUrl(tc.url)
repo.assertOriginUrl("https://github.com/databricks/foobar.git") repo.assertOriginUrl(tc.expected)
}
} }

View File

@ -5,12 +5,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"io" "io"
"github.com/databricks/cli/libs/sync"
) )
// Read synchronization events and write them as JSON to the specified writer (typically stdout). // Read synchronization events and write them as JSON to the specified writer (typically stdout).
func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { func JsonOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
for { for {
select { select {
@ -31,7 +29,7 @@ func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) {
} }
// Read synchronization events and write them as text to the specified writer (typically stdout). // Read synchronization events and write them as text to the specified writer (typically stdout).
func textOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { func TextOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
bw := bufio.NewWriter(w) bw := bufio.NewWriter(w)
for { for {

View File

@ -3,6 +3,7 @@ package sync
import ( import (
"context" "context"
"fmt" "fmt"
stdsync "sync"
"time" "time"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
@ -15,6 +16,8 @@ import (
"github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/iam"
) )
type OutputHandler func(context.Context, <-chan Event)
type SyncOptions struct { type SyncOptions struct {
LocalRoot vfs.Path LocalRoot vfs.Path
Paths []string Paths []string
@ -34,6 +37,8 @@ type SyncOptions struct {
CurrentUser *iam.User CurrentUser *iam.User
Host string Host string
OutputHandler OutputHandler
} }
type Sync struct { type Sync struct {
@ -49,6 +54,10 @@ type Sync struct {
// Synchronization progress events are sent to this event notifier. // Synchronization progress events are sent to this event notifier.
notifier EventNotifier notifier EventNotifier
seq int seq int
// WaitGroup is automatically created when an output handler is provided in the SyncOptions.
// Close call is required to ensure the output handler goroutine handles all events in time.
outputWaitGroup *stdsync.WaitGroup
} }
// New initializes and returns a new [Sync] instance. // New initializes and returns a new [Sync] instance.
@ -106,6 +115,20 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
return nil, err return nil, err
} }
var notifier EventNotifier
var outputWaitGroup = &stdsync.WaitGroup{}
if opts.OutputHandler != nil {
ch := make(chan Event, MaxRequestsInFlight)
notifier = &ChannelNotifier{ch}
outputWaitGroup.Add(1)
go func() {
defer outputWaitGroup.Done()
opts.OutputHandler(ctx, ch)
}()
} else {
notifier = &NopNotifier{}
}
return &Sync{ return &Sync{
SyncOptions: &opts, SyncOptions: &opts,
@ -114,23 +137,19 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
excludeFileSet: excludeFileSet, excludeFileSet: excludeFileSet,
snapshot: snapshot, snapshot: snapshot,
filer: filer, filer: filer,
notifier: &NopNotifier{}, notifier: notifier,
outputWaitGroup: outputWaitGroup,
seq: 0, seq: 0,
}, nil }, nil
} }
func (s *Sync) Events() <-chan Event {
ch := make(chan Event, MaxRequestsInFlight)
s.notifier = &ChannelNotifier{ch}
return ch
}
func (s *Sync) Close() { func (s *Sync) Close() {
if s.notifier == nil { if s.notifier == nil {
return return
} }
s.notifier.Close() s.notifier.Close()
s.notifier = nil s.notifier = nil
s.outputWaitGroup.Wait()
} }
func (s *Sync) notifyStart(ctx context.Context, d diff) { func (s *Sync) notifyStart(ctx context.Context, d diff) {

View File

@ -3,6 +3,12 @@ resources:
pipelines: pipelines:
{{.project_name}}_pipeline: {{.project_name}}_pipeline:
name: {{.project_name}}_pipeline name: {{.project_name}}_pipeline
{{- if eq default_catalog ""}}
## Specify the 'catalog' field to configure this pipeline to make use of Unity Catalog:
# catalog: catalog_name
{{- else}}
catalog: {{default_catalog}}
{{- end}}
target: {{.project_name}}_${bundle.environment} target: {{.project_name}}_${bundle.environment}
libraries: libraries:
- notebook: - notebook: