Compare commits

...

12 Commits

Author SHA1 Message Date
Richard Nordström 11c37673a6
make tokenCacheMock consistent naming with struct 2024-09-23 21:55:31 +02:00
Richard Nordström 18d3fea34e
Merge branch 'main' into feature/logout 2024-09-23 21:43:07 +02:00
Richard Nordström b7ff019b60
add test for file write 2024-09-23 21:20:56 +02:00
Richard Nordström bb35ca090f
logoutSession not exportable 2024-09-23 20:37:53 +02:00
Richard Nordström d037ec32a1
add new write function to persist to disk 2024-09-23 20:26:43 +02:00
Richard Nordström 89d3b1a4df
remove redundant version specification 2024-09-23 20:23:09 +02:00
Richard Nordström 37067ef933
rename DeleteKey to Delete 2024-09-23 20:21:38 +02:00
shreyas-goenka 0cc35ca056
Assert tokens are redacted in origin URL when username is not specified (#1785)
TSIA
2024-09-23 12:42:30 +00:00
Andrew Nester 56ed9bebf3
Added support for creating all-purpose clusters (#1698)
## Changes
Added support for creating all-purpose clusters

Example of configuration

```
bundle:
  name: clusters

resources:
  clusters:
    test_cluster:
      cluster_name: "Test Cluster"
      num_workers: 2
      node_type_id: "i3.xlarge"
      autoscale:
        min_workers: 2
        max_workers: 7
      spark_version: "13.3.x-scala2.12"
      spark_conf:
        "spark.executor.memory": "2g"

  jobs:
    test_job:
      name: "Test Job"
      tasks:
        - task_key: test_task
          existing_cluster_id: ${resources.clusters.test_cluster.id}
          notebook_task:
            notebook_path: "./src/test.py"

targets:
    development:
      mode: development
      compute_id: ${resources.clusters.test_cluster.id}

```

## Tests
Added unit, config and E2E tests
2024-09-23 10:42:34 +00:00
Ilia Babanov ac80d3dfcb
Add verbose flag to the "bundle deploy" command (#1774)
## Changes
- Extract sync output logic from `cmd/sync` into `lib/sync`
- Add hidden `verbose` flag to the `bundle deploy` command, it's false
by default and hidden from the `--help` output
- Pass output handler to the `deploy/files/upload` mutator if the
verbose option is true

The was an idea to use in-place output overriding each past file sync
event in the output, bit that wont work for the extension, since it
doesn't display deploy logs in the terminal.

Example output:
```
~/tmp/defpy: ~/cli/cli bundle deploy --sync-progress
Building defpy...
Uploading defpy-0.0.1+20240917.112755-py3-none-any.whl...
Uploading bundle files to /Users/ilia.babanov@databricks.com/.bundle/defpy/dev/files...
Action: PUT: requirements-dev.txt, resources/defpy_pipeline.yml, pytest.ini, src/defpy/main.py, src/defpy/__init__.py, src/dlt_pipeline.ipynb, tests/main_test.py, src/notebook.ipynb, setup.py, resources/defpy_job.yml, .vscode/extensions.json, .vscode/settings.json, fixtures/.gitkeep, .vscode/__builtins__.pyi, README.md, .gitignore, databricks.yml
Uploaded tests
Uploaded resources
Uploaded fixtures
Uploaded .vscode
Uploaded src/defpy
Uploaded requirements-dev.txt
Uploaded .gitignore
Uploaded fixtures/.gitkeep
Uploaded src/defpy/__init__.py
Uploaded databricks.yml
Uploaded README.md
Uploaded setup.py
Uploaded .vscode/__builtins__.pyi
Uploaded .vscode/extensions.json
Uploaded src/dlt_pipeline.ipynb
Uploaded .vscode/settings.json
Uploaded resources/defpy_job.yml
Uploaded pytest.ini
Uploaded src/defpy/main.py
Uploaded tests/main_test.py
Uploaded resources/defpy_pipeline.yml
Uploaded src/notebook.ipynb
Initial Sync Complete
Deploying resources...
Updating deployment state...
Deployment complete!
```

Output example in the extension:
<img width="1843" alt="Screenshot 2024-09-19 at 11 07 48"
src="https://github.com/user-attachments/assets/0fafd095-cdc6-44b8-b482-27a38ada0330">


## Tests
Manually for the `sync` and `bundle deploy` commands + vscode extension
sync and deploy flows
2024-09-23 10:09:11 +00:00
Lennart Kats (databricks) 7665c639bd
Use Unity Catalog for pipelines in the default-python template (#1766)
## Summary

Enables Unity Catalog for pipelines in the default template. Pipelines
will default to non-Unity Catalog pipelines if a catalog is not
specified.

*Small caveat*: there are cases where admins lock down the default
catalog of a workspace and don't allow the creation of a new schema
there. If that happens, the pipeline would fail at runtime with a clear
error indicating what happened. ("PERMISSION_DENIED: User does not have
CREATE SCHEMA on Catalog 'main'."). I've seen this with an internal
Databricks workspace, where creating new non-UC schemas wasn't locked
down, but creation in the `main` was.

## Testing

- Validated on a non-UC + UC workspace. The catalog selection logic here
is the same as applied for the SQL templates.
2024-09-23 09:52:04 +00:00
Lennart Kats (databricks) 6c57683dc6
Reduce time until the prompt is shown for bundle run (#1727)
## Summary

Makes the `databricks bundle run` command use local state before showing
the menu prompt, which makes it show more quickly. For large/busy
workspaces this means the prompt can show 2-3 seconds earlier.
2024-09-21 06:36:47 +00:00
44 changed files with 850 additions and 132 deletions

View File

@ -38,8 +38,11 @@ type Bundle struct {
// Annotated readonly as this should be set at the target level. // Annotated readonly as this should be set at the target level.
Mode Mode `json:"mode,omitempty" bundle:"readonly"` Mode Mode `json:"mode,omitempty" bundle:"readonly"`
// Overrides the compute used for jobs and other supported assets. // DEPRECATED: Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"` ComputeId string `json:"compute_id,omitempty"`
// Overrides the cluster used for jobs and other supported assets.
ClusterId string `json:"cluster_id,omitempty"`
// Deployment section specifies deployment related configuration for bundle // Deployment section specifies deployment related configuration for bundle
Deployment Deployment `json:"deployment,omitempty"` Deployment Deployment `json:"deployment,omitempty"`

View File

@ -160,6 +160,21 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
// the Databricks UI and via the SQL API. // the Databricks UI and via the SQL API.
} }
// Clusters: Prefix, Tags
for _, c := range r.Clusters {
c.ClusterName = prefix + c.ClusterName
if c.CustomTags == nil {
c.CustomTags = make(map[string]string)
}
for _, tag := range tags {
normalisedKey := b.Tagging.NormalizeKey(tag.Key)
normalisedValue := b.Tagging.NormalizeValue(tag.Value)
if _, ok := c.CustomTags[normalisedKey]; !ok {
c.CustomTags[normalisedKey] = normalisedValue
}
}
}
return nil return nil
} }

View File

@ -0,0 +1,87 @@
package mutator
import (
"context"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
)
type computeIdToClusterId struct{}
func ComputeIdToClusterId() bundle.Mutator {
return &computeIdToClusterId{}
}
func (m *computeIdToClusterId) Name() string {
return "ComputeIdToClusterId"
}
func (m *computeIdToClusterId) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
var diags diag.Diagnostics
// The "compute_id" key is set; rewrite it to "cluster_id".
err := b.Config.Mutate(func(v dyn.Value) (dyn.Value, error) {
v, d := rewriteComputeIdToClusterId(v, dyn.NewPath(dyn.Key("bundle")))
diags = diags.Extend(d)
// Check if the "compute_id" key is set in any target overrides.
return dyn.MapByPattern(v, dyn.NewPattern(dyn.Key("targets"), dyn.AnyKey()), func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
v, d := rewriteComputeIdToClusterId(v, dyn.Path{})
diags = diags.Extend(d)
return v, nil
})
})
diags = diags.Extend(diag.FromErr(err))
return diags
}
func rewriteComputeIdToClusterId(v dyn.Value, p dyn.Path) (dyn.Value, diag.Diagnostics) {
var diags diag.Diagnostics
computeIdPath := p.Append(dyn.Key("compute_id"))
computeId, err := dyn.GetByPath(v, computeIdPath)
// If the "compute_id" key is not set, we don't need to do anything.
if err != nil {
return v, nil
}
if computeId.Kind() == dyn.KindInvalid {
return v, nil
}
diags = diags.Append(diag.Diagnostic{
Severity: diag.Warning,
Summary: "compute_id is deprecated, please use cluster_id instead",
Locations: computeId.Locations(),
Paths: []dyn.Path{computeIdPath},
})
clusterIdPath := p.Append(dyn.Key("cluster_id"))
nv, err := dyn.SetByPath(v, clusterIdPath, computeId)
if err != nil {
return dyn.InvalidValue, diag.FromErr(err)
}
// Drop the "compute_id" key.
vout, err := dyn.Walk(nv, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
switch len(p) {
case 0:
return v, nil
case 1:
if p[0] == dyn.Key("compute_id") {
return v, dyn.ErrDrop
}
return v, nil
case 2:
if p[1] == dyn.Key("compute_id") {
return v, dyn.ErrDrop
}
}
return v, dyn.ErrSkip
})
diags = diags.Extend(diag.FromErr(err))
return vout, diags
}

View File

@ -0,0 +1,57 @@
package mutator_test
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/libs/diag"
"github.com/stretchr/testify/assert"
)
func TestComputeIdToClusterId(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
ComputeId: "compute-id",
},
},
}
diags := bundle.Apply(context.Background(), b, mutator.ComputeIdToClusterId())
assert.NoError(t, diags.Error())
assert.Equal(t, "compute-id", b.Config.Bundle.ClusterId)
assert.Empty(t, b.Config.Bundle.ComputeId)
assert.Len(t, diags, 1)
assert.Equal(t, "compute_id is deprecated, please use cluster_id instead", diags[0].Summary)
assert.Equal(t, diag.Warning, diags[0].Severity)
}
func TestComputeIdToClusterIdInTargetOverride(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Targets: map[string]*config.Target{
"dev": {
ComputeId: "compute-id-dev",
},
},
},
}
diags := bundle.Apply(context.Background(), b, mutator.ComputeIdToClusterId())
assert.NoError(t, diags.Error())
assert.Empty(t, b.Config.Targets["dev"].ComputeId)
diags = diags.Extend(bundle.Apply(context.Background(), b, mutator.SelectTarget("dev")))
assert.NoError(t, diags.Error())
assert.Equal(t, "compute-id-dev", b.Config.Bundle.ClusterId)
assert.Empty(t, b.Config.Bundle.ComputeId)
assert.Len(t, diags, 1)
assert.Equal(t, "compute_id is deprecated, please use cluster_id instead", diags[0].Summary)
assert.Equal(t, diag.Warning, diags[0].Severity)
}

View File

@ -23,6 +23,7 @@ func DefaultMutators() []bundle.Mutator {
VerifyCliVersion(), VerifyCliVersion(),
EnvironmentsToTargets(), EnvironmentsToTargets(),
ComputeIdToClusterId(),
InitializeVariables(), InitializeVariables(),
DefineDefaultTarget(), DefineDefaultTarget(),
LoadGitDetails(), LoadGitDetails(),

View File

@ -39,22 +39,22 @@ func overrideJobCompute(j *resources.Job, compute string) {
func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
if b.Config.Bundle.Mode != config.Development { if b.Config.Bundle.Mode != config.Development {
if b.Config.Bundle.ComputeID != "" { if b.Config.Bundle.ClusterId != "" {
return diag.Errorf("cannot override compute for an target that does not use 'mode: development'") return diag.Errorf("cannot override compute for an target that does not use 'mode: development'")
} }
return nil return nil
} }
if v := env.Get(ctx, "DATABRICKS_CLUSTER_ID"); v != "" { if v := env.Get(ctx, "DATABRICKS_CLUSTER_ID"); v != "" {
b.Config.Bundle.ComputeID = v b.Config.Bundle.ClusterId = v
} }
if b.Config.Bundle.ComputeID == "" { if b.Config.Bundle.ClusterId == "" {
return nil return nil
} }
r := b.Config.Resources r := b.Config.Resources
for i := range r.Jobs { for i := range r.Jobs {
overrideJobCompute(r.Jobs[i], b.Config.Bundle.ComputeID) overrideJobCompute(r.Jobs[i], b.Config.Bundle.ClusterId)
} }
return nil return nil

View File

@ -20,7 +20,7 @@ func TestOverrideDevelopment(t *testing.T) {
Config: config.Root{ Config: config.Root{
Bundle: config.Bundle{ Bundle: config.Bundle{
Mode: config.Development, Mode: config.Development,
ComputeID: "newClusterID", ClusterId: "newClusterID",
}, },
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{
@ -144,7 +144,7 @@ func TestOverrideProduction(t *testing.T) {
b := &bundle.Bundle{ b := &bundle.Bundle{
Config: config.Root{ Config: config.Root{
Bundle: config.Bundle{ Bundle: config.Bundle{
ComputeID: "newClusterID", ClusterId: "newClusterID",
}, },
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{

View File

@ -13,6 +13,7 @@ import (
"github.com/databricks/cli/libs/tags" "github.com/databricks/cli/libs/tags"
sdkconfig "github.com/databricks/databricks-sdk-go/config" sdkconfig "github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml" "github.com/databricks/databricks-sdk-go/service/ml"
@ -119,6 +120,9 @@ func mockBundle(mode config.Mode) *bundle.Bundle {
Schemas: map[string]*resources.Schema{ Schemas: map[string]*resources.Schema{
"schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}}, "schema1": {CreateSchema: &catalog.CreateSchema{Name: "schema1"}},
}, },
Clusters: map[string]*resources.Cluster{
"cluster1": {ClusterSpec: &compute.ClusterSpec{ClusterName: "cluster1", SparkVersion: "13.2.x", NumWorkers: 1}},
},
}, },
}, },
// Use AWS implementation for testing. // Use AWS implementation for testing.
@ -177,6 +181,9 @@ func TestProcessTargetModeDevelopment(t *testing.T) {
// Schema 1 // Schema 1
assert.Equal(t, "dev_lennart_schema1", b.Config.Resources.Schemas["schema1"].Name) assert.Equal(t, "dev_lennart_schema1", b.Config.Resources.Schemas["schema1"].Name)
// Clusters
assert.Equal(t, "[dev lennart] cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
} }
func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) { func TestProcessTargetModeDevelopmentTagNormalizationForAws(t *testing.T) {
@ -281,6 +288,7 @@ func TestProcessTargetModeDefault(t *testing.T) {
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
} }
func TestProcessTargetModeProduction(t *testing.T) { func TestProcessTargetModeProduction(t *testing.T) {
@ -312,6 +320,7 @@ func TestProcessTargetModeProduction(t *testing.T) {
b.Config.Resources.Experiments["experiment2"].Permissions = permissions b.Config.Resources.Experiments["experiment2"].Permissions = permissions
b.Config.Resources.Models["model1"].Permissions = permissions b.Config.Resources.Models["model1"].Permissions = permissions
b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Permissions = permissions b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Permissions = permissions
b.Config.Resources.Clusters["cluster1"].Permissions = permissions
diags = validateProductionMode(context.Background(), b, false) diags = validateProductionMode(context.Background(), b, false)
require.NoError(t, diags.Error()) require.NoError(t, diags.Error())
@ -322,6 +331,7 @@ func TestProcessTargetModeProduction(t *testing.T) {
assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name) assert.Equal(t, "servingendpoint1", b.Config.Resources.ModelServingEndpoints["servingendpoint1"].Name)
assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name) assert.Equal(t, "registeredmodel1", b.Config.Resources.RegisteredModels["registeredmodel1"].Name)
assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName) assert.Equal(t, "qualityMonitor1", b.Config.Resources.QualityMonitors["qualityMonitor1"].TableName)
assert.Equal(t, "cluster1", b.Config.Resources.Clusters["cluster1"].ClusterName)
} }
func TestProcessTargetModeProductionOkForPrincipal(t *testing.T) { func TestProcessTargetModeProductionOkForPrincipal(t *testing.T) {

View File

@ -32,6 +32,7 @@ func allResourceTypes(t *testing.T) []string {
// the dyn library gives us the correct list of all resources supported. Please // the dyn library gives us the correct list of all resources supported. Please
// also update this check when adding a new resource // also update this check when adding a new resource
require.Equal(t, []string{ require.Equal(t, []string{
"clusters",
"experiments", "experiments",
"jobs", "jobs",
"model_serving_endpoints", "model_serving_endpoints",
@ -133,6 +134,7 @@ func TestRunAsErrorForUnsupportedResources(t *testing.T) {
// some point in the future. These resources are (implicitly) on the deny list, since // some point in the future. These resources are (implicitly) on the deny list, since
// they are not on the allow list below. // they are not on the allow list below.
allowList := []string{ allowList := []string{
"clusters",
"jobs", "jobs",
"models", "models",
"registered_models", "registered_models",

View File

@ -19,6 +19,7 @@ type Resources struct {
RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"` RegisteredModels map[string]*resources.RegisteredModel `json:"registered_models,omitempty"`
QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"` QualityMonitors map[string]*resources.QualityMonitor `json:"quality_monitors,omitempty"`
Schemas map[string]*resources.Schema `json:"schemas,omitempty"` Schemas map[string]*resources.Schema `json:"schemas,omitempty"`
Clusters map[string]*resources.Cluster `json:"clusters,omitempty"`
} }
type ConfigResource interface { type ConfigResource interface {

View File

@ -0,0 +1,39 @@
package resources
import (
"context"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/compute"
)
type Cluster struct {
ID string `json:"id,omitempty" bundle:"readonly"`
Permissions []Permission `json:"permissions,omitempty"`
ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
*compute.ClusterSpec
}
func (s *Cluster) UnmarshalJSON(b []byte) error {
return marshal.Unmarshal(b, s)
}
func (s Cluster) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s)
}
func (s *Cluster) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
_, err := w.Clusters.GetByClusterId(ctx, id)
if err != nil {
log.Debugf(ctx, "cluster %s does not exist", id)
return false, err
}
return true, nil
}
func (s *Cluster) TerraformResourceName() string {
return "databricks_cluster"
}

View File

@ -366,9 +366,9 @@ func (r *Root) MergeTargetOverrides(name string) error {
} }
} }
// Merge `compute_id`. This field must be overwritten if set, not merged. // Merge `cluster_id`. This field must be overwritten if set, not merged.
if v := target.Get("compute_id"); v.Kind() != dyn.KindInvalid { if v := target.Get("cluster_id"); v.Kind() != dyn.KindInvalid {
root, err = dyn.SetByPath(root, dyn.NewPath(dyn.Key("bundle"), dyn.Key("compute_id")), v) root, err = dyn.SetByPath(root, dyn.NewPath(dyn.Key("bundle"), dyn.Key("cluster_id")), v)
if err != nil { if err != nil {
return err return err
} }

View File

@ -24,8 +24,11 @@ type Target struct {
// name prefix of deployed resources. // name prefix of deployed resources.
Presets Presets `json:"presets,omitempty"` Presets Presets `json:"presets,omitempty"`
// Overrides the compute used for jobs and other supported assets. // DEPRECATED: Overrides the compute used for jobs and other supported assets.
ComputeID string `json:"compute_id,omitempty"` ComputeId string `json:"compute_id,omitempty"`
// Overrides the cluster used for jobs and other supported assets.
ClusterId string `json:"cluster_id,omitempty"`
Bundle *Bundle `json:"bundle,omitempty"` Bundle *Bundle `json:"bundle,omitempty"`

View File

@ -8,9 +8,12 @@ import (
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/sync"
) )
type upload struct{} type upload struct {
outputHandler sync.OutputHandler
}
func (m *upload) Name() string { func (m *upload) Name() string {
return "files.Upload" return "files.Upload"
@ -18,11 +21,18 @@ func (m *upload) Name() string {
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath)) cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath))
sync, err := GetSync(ctx, bundle.ReadOnly(b)) opts, err := GetSyncOptions(ctx, bundle.ReadOnly(b))
if err != nil { if err != nil {
return diag.FromErr(err) return diag.FromErr(err)
} }
opts.OutputHandler = m.outputHandler
sync, err := sync.New(ctx, *opts)
if err != nil {
return diag.FromErr(err)
}
defer sync.Close()
b.Files, err = sync.RunOnce(ctx) b.Files, err = sync.RunOnce(ctx)
if err != nil { if err != nil {
return diag.FromErr(err) return diag.FromErr(err)
@ -32,6 +42,6 @@ func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
return nil return nil
} }
func Upload() bundle.Mutator { func Upload(outputHandler sync.OutputHandler) bundle.Mutator {
return &upload{} return &upload{outputHandler}
} }

View File

@ -231,6 +231,13 @@ func BundleToTerraform(config *config.Root) *schema.Root {
tfroot.Resource.QualityMonitor[k] = &dst tfroot.Resource.QualityMonitor[k] = &dst
} }
for k, src := range config.Resources.Clusters {
noResources = false
var dst schema.ResourceCluster
conv(src, &dst)
tfroot.Resource.Cluster[k] = &dst
}
// We explicitly set "resource" to nil to omit it from a JSON encoding. // We explicitly set "resource" to nil to omit it from a JSON encoding.
// This is required because the terraform CLI requires >= 1 resources defined // This is required because the terraform CLI requires >= 1 resources defined
// if the "resource" property is used in a .tf.json file. // if the "resource" property is used in a .tf.json file.
@ -394,6 +401,16 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
} }
cur.ID = instance.Attributes.ID cur.ID = instance.Attributes.ID
config.Resources.Schemas[resource.Name] = cur config.Resources.Schemas[resource.Name] = cur
case "databricks_cluster":
if config.Resources.Clusters == nil {
config.Resources.Clusters = make(map[string]*resources.Cluster)
}
cur := config.Resources.Clusters[resource.Name]
if cur == nil {
cur = &resources.Cluster{ModifiedStatus: resources.ModifiedStatusDeleted}
}
cur.ID = instance.Attributes.ID
config.Resources.Clusters[resource.Name] = cur
case "databricks_permissions": case "databricks_permissions":
case "databricks_grants": case "databricks_grants":
// Ignore; no need to pull these back into the configuration. // Ignore; no need to pull these back into the configuration.
@ -443,6 +460,11 @@ func TerraformToBundle(state *resourcesState, config *config.Root) error {
src.ModifiedStatus = resources.ModifiedStatusCreated src.ModifiedStatus = resources.ModifiedStatusCreated
} }
} }
for _, src := range config.Resources.Clusters {
if src.ModifiedStatus == "" && src.ID == "" {
src.ModifiedStatus = resources.ModifiedStatusCreated
}
}
return nil return nil
} }

View File

@ -663,6 +663,14 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "1"}}, {Attributes: stateInstanceAttributes{ID: "1"}},
}, },
}, },
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
}, },
} }
err := TerraformToBundle(&tfState, &config) err := TerraformToBundle(&tfState, &config)
@ -692,6 +700,9 @@ func TestTerraformToBundleEmptyLocalResources(t *testing.T) {
assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID) assert.Equal(t, "1", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Schemas["test_schema"].ModifiedStatus)
assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }
@ -754,6 +765,13 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
}, },
}, },
}, },
Clusters: map[string]*resources.Cluster{
"test_cluster": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster",
},
},
},
}, },
} }
var tfState = resourcesState{ var tfState = resourcesState{
@ -786,6 +804,9 @@ func TestTerraformToBundleEmptyRemoteResources(t *testing.T) {
assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID) assert.Equal(t, "", config.Resources.Schemas["test_schema"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }
@ -888,6 +909,18 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
}, },
}, },
}, },
Clusters: map[string]*resources.Cluster{
"test_cluster": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster",
},
},
"test_cluster_new": {
ClusterSpec: &compute.ClusterSpec{
ClusterName: "test_cluster_new",
},
},
},
}, },
} }
var tfState = resourcesState{ var tfState = resourcesState{
@ -1020,6 +1053,22 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
{Attributes: stateInstanceAttributes{ID: "2"}}, {Attributes: stateInstanceAttributes{ID: "2"}},
}, },
}, },
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "1"}},
},
},
{
Type: "databricks_cluster",
Mode: "managed",
Name: "test_cluster_old",
Instances: []stateResourceInstance{
{Attributes: stateInstanceAttributes{ID: "2"}},
},
},
}, },
} }
err := TerraformToBundle(&tfState, &config) err := TerraformToBundle(&tfState, &config)
@ -1081,6 +1130,13 @@ func TestTerraformToBundleModifiedResources(t *testing.T) {
assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID) assert.Equal(t, "", config.Resources.Schemas["test_schema_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus) assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Schemas["test_schema_new"].ModifiedStatus)
assert.Equal(t, "1", config.Resources.Clusters["test_cluster"].ID)
assert.Equal(t, "", config.Resources.Clusters["test_cluster"].ModifiedStatus)
assert.Equal(t, "2", config.Resources.Clusters["test_cluster_old"].ID)
assert.Equal(t, resources.ModifiedStatusDeleted, config.Resources.Clusters["test_cluster_old"].ModifiedStatus)
assert.Equal(t, "", config.Resources.Clusters["test_cluster_new"].ID)
assert.Equal(t, resources.ModifiedStatusCreated, config.Resources.Clusters["test_cluster_new"].ModifiedStatus)
AssertFullResourceCoverage(t, &config) AssertFullResourceCoverage(t, &config)
} }

View File

@ -58,6 +58,8 @@ func (m *interpolateMutator) Apply(ctx context.Context, b *bundle.Bundle) diag.D
path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...) path = dyn.NewPath(dyn.Key("databricks_quality_monitor")).Append(path[2:]...)
case dyn.Key("schemas"): case dyn.Key("schemas"):
path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...) path = dyn.NewPath(dyn.Key("databricks_schema")).Append(path[2:]...)
case dyn.Key("clusters"):
path = dyn.NewPath(dyn.Key("databricks_cluster")).Append(path[2:]...)
default: default:
// Trigger "key not found" for unknown resource types. // Trigger "key not found" for unknown resource types.
return dyn.GetByPath(root, path) return dyn.GetByPath(root, path)

View File

@ -31,6 +31,7 @@ func TestInterpolate(t *testing.T) {
"other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}", "other_model_serving": "${resources.model_serving_endpoints.other_model_serving.id}",
"other_registered_model": "${resources.registered_models.other_registered_model.id}", "other_registered_model": "${resources.registered_models.other_registered_model.id}",
"other_schema": "${resources.schemas.other_schema.id}", "other_schema": "${resources.schemas.other_schema.id}",
"other_cluster": "${resources.clusters.other_cluster.id}",
}, },
Tasks: []jobs.Task{ Tasks: []jobs.Task{
{ {
@ -67,6 +68,7 @@ func TestInterpolate(t *testing.T) {
assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"]) assert.Equal(t, "${databricks_model_serving.other_model_serving.id}", j.Tags["other_model_serving"])
assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"]) assert.Equal(t, "${databricks_registered_model.other_registered_model.id}", j.Tags["other_registered_model"])
assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"]) assert.Equal(t, "${databricks_schema.other_schema.id}", j.Tags["other_schema"])
assert.Equal(t, "${databricks_cluster.other_cluster.id}", j.Tags["other_cluster"])
m := b.Config.Resources.Models["my_model"] m := b.Config.Resources.Models["my_model"]
assert.Equal(t, "my_model", m.Model.Name) assert.Equal(t, "my_model", m.Model.Name)

View File

@ -0,0 +1,52 @@
package tfdyn
import (
"context"
"fmt"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/compute"
)
func convertClusterResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Normalize the output value to the target schema.
vout, diags := convert.Normalize(compute.ClusterSpec{}, vin)
for _, diag := range diags {
log.Debugf(ctx, "cluster normalization diagnostic: %s", diag.Summary)
}
return vout, nil
}
type clusterConverter struct{}
func (clusterConverter) Convert(ctx context.Context, key string, vin dyn.Value, out *schema.Resources) error {
vout, err := convertClusterResource(ctx, vin)
if err != nil {
return err
}
// We always set no_wait as it allows DABs not to wait for cluster to be started.
vout, err = dyn.Set(vout, "no_wait", dyn.V(true))
if err != nil {
return err
}
// Add the converted resource to the output.
out.Cluster[key] = vout.AsAny()
// Configure permissions for this resource.
if permissions := convertPermissionsResource(ctx, vin); permissions != nil {
permissions.JobId = fmt.Sprintf("${databricks_cluster.%s.id}", key)
out.Permissions["cluster_"+key] = permissions
}
return nil
}
func init() {
registerConverter("clusters", clusterConverter{})
}

View File

@ -0,0 +1,97 @@
package tfdyn
import (
"context"
"testing"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/tf/schema"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/convert"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestConvertCluster(t *testing.T) {
var src = resources.Cluster{
ClusterSpec: &compute.ClusterSpec{
NumWorkers: 3,
SparkVersion: "13.3.x-scala2.12",
ClusterName: "cluster",
SparkConf: map[string]string{
"spark.executor.memory": "2g",
},
AwsAttributes: &compute.AwsAttributes{
Availability: "ON_DEMAND",
},
AzureAttributes: &compute.AzureAttributes{
Availability: "SPOT",
},
DataSecurityMode: "USER_ISOLATION",
NodeTypeId: "m5.xlarge",
Autoscale: &compute.AutoScale{
MinWorkers: 1,
MaxWorkers: 10,
},
},
Permissions: []resources.Permission{
{
Level: "CAN_RUN",
UserName: "jack@gmail.com",
},
{
Level: "CAN_MANAGE",
ServicePrincipalName: "sp",
},
},
}
vin, err := convert.FromTyped(src, dyn.NilValue)
require.NoError(t, err)
ctx := context.Background()
out := schema.NewResources()
err = clusterConverter{}.Convert(ctx, "my_cluster", vin, out)
require.NoError(t, err)
cluster := out.Cluster["my_cluster"]
assert.Equal(t, map[string]any{
"num_workers": int64(3),
"spark_version": "13.3.x-scala2.12",
"cluster_name": "cluster",
"spark_conf": map[string]any{
"spark.executor.memory": "2g",
},
"aws_attributes": map[string]any{
"availability": "ON_DEMAND",
},
"azure_attributes": map[string]any{
"availability": "SPOT",
},
"data_security_mode": "USER_ISOLATION",
"no_wait": true,
"node_type_id": "m5.xlarge",
"autoscale": map[string]any{
"min_workers": int64(1),
"max_workers": int64(10),
},
}, cluster)
// Assert equality on the permissions
assert.Equal(t, &schema.ResourcePermissions{
JobId: "${databricks_cluster.my_cluster.id}",
AccessControl: []schema.ResourcePermissionsAccessControl{
{
PermissionLevel: "CAN_RUN",
UserName: "jack@gmail.com",
},
{
PermissionLevel: "CAN_MANAGE",
ServicePrincipalName: "sp",
},
},
}, out.Permissions["cluster_my_cluster"])
}

View File

@ -18,6 +18,7 @@ import (
"github.com/databricks/cli/bundle/python" "github.com/databricks/cli/bundle/python"
"github.com/databricks/cli/bundle/scripts" "github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/sync"
terraformlib "github.com/databricks/cli/libs/terraform" terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json" tfjson "github.com/hashicorp/terraform-json"
) )
@ -128,7 +129,7 @@ properties such as the 'catalog' or 'storage' are changed:`
} }
// The deploy phase deploys artifacts and resources. // The deploy phase deploys artifacts and resources.
func Deploy() bundle.Mutator { func Deploy(outputHandler sync.OutputHandler) bundle.Mutator {
// Core mutators that CRUD resources and modify deployment state. These // Core mutators that CRUD resources and modify deployment state. These
// mutators need informed consent if they are potentially destructive. // mutators need informed consent if they are potentially destructive.
deployCore := bundle.Defer( deployCore := bundle.Defer(
@ -157,7 +158,7 @@ func Deploy() bundle.Mutator {
libraries.ExpandGlobReferences(), libraries.ExpandGlobReferences(),
libraries.Upload(), libraries.Upload(),
python.TransformWheelTask(), python.TransformWheelTask(),
files.Upload(), files.Upload(outputHandler),
deploy.StateUpdate(), deploy.StateUpdate(),
deploy.StatePush(), deploy.StatePush(),
permissions.ApplyWorkspaceRootPermissions(), permissions.ApplyWorkspaceRootPermissions(),

View File

@ -0,0 +1,36 @@
bundle:
name: clusters
workspace:
host: https://acme.cloud.databricks.com/
resources:
clusters:
foo:
cluster_name: foo
num_workers: 2
node_type_id: "i3.xlarge"
autoscale:
min_workers: 2
max_workers: 7
spark_version: "13.3.x-scala2.12"
spark_conf:
"spark.executor.memory": "2g"
targets:
default:
development:
resources:
clusters:
foo:
cluster_name: foo-override
num_workers: 3
node_type_id: "m5.xlarge"
autoscale:
min_workers: 1
max_workers: 3
spark_version: "15.2.x-scala2.12"
spark_conf:
"spark.executor.memory": "4g"
"spark.executor.memory2": "4g"

View File

@ -0,0 +1,36 @@
package config_tests
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestClusters(t *testing.T) {
b := load(t, "./clusters")
assert.Equal(t, "clusters", b.Config.Bundle.Name)
cluster := b.Config.Resources.Clusters["foo"]
assert.Equal(t, "foo", cluster.ClusterName)
assert.Equal(t, "13.3.x-scala2.12", cluster.SparkVersion)
assert.Equal(t, "i3.xlarge", cluster.NodeTypeId)
assert.Equal(t, 2, cluster.NumWorkers)
assert.Equal(t, "2g", cluster.SparkConf["spark.executor.memory"])
assert.Equal(t, 2, cluster.Autoscale.MinWorkers)
assert.Equal(t, 7, cluster.Autoscale.MaxWorkers)
}
func TestClustersOverride(t *testing.T) {
b := loadTarget(t, "./clusters", "development")
assert.Equal(t, "clusters", b.Config.Bundle.Name)
cluster := b.Config.Resources.Clusters["foo"]
assert.Equal(t, "foo-override", cluster.ClusterName)
assert.Equal(t, "15.2.x-scala2.12", cluster.SparkVersion)
assert.Equal(t, "m5.xlarge", cluster.NodeTypeId)
assert.Equal(t, 3, cluster.NumWorkers)
assert.Equal(t, "4g", cluster.SparkConf["spark.executor.memory"])
assert.Equal(t, "4g", cluster.SparkConf["spark.executor.memory2"])
assert.Equal(t, 1, cluster.Autoscale.MinWorkers)
assert.Equal(t, 3, cluster.Autoscale.MaxWorkers)
}

View File

@ -15,43 +15,43 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
type LogoutSession struct { type logoutSession struct {
Profile string profile string
File config.File file config.File
PersistentAuth *auth.PersistentAuth persistentAuth *auth.PersistentAuth
} }
func (l *LogoutSession) load(ctx context.Context, profileName string, persistentAuth *auth.PersistentAuth) error { func (l *logoutSession) load(ctx context.Context, profileName string, persistentAuth *auth.PersistentAuth) error {
l.Profile = profileName l.profile = profileName
l.PersistentAuth = persistentAuth l.persistentAuth = persistentAuth
iniFile, err := profile.DefaultProfiler.Get(ctx) iniFile, err := profile.DefaultProfiler.Get(ctx)
if errors.Is(err, fs.ErrNotExist) { if errors.Is(err, fs.ErrNotExist) {
return err return err
} else if err != nil { } else if err != nil {
return fmt.Errorf("cannot parse config file: %w", err) return fmt.Errorf("cannot parse config file: %w", err)
} }
l.File = *iniFile l.file = *iniFile
if err := l.setHostAndAccountIdFromProfile(); err != nil { if err := l.setHostAndAccountIdFromProfile(); err != nil {
return err return err
} }
return nil return nil
} }
func (l *LogoutSession) setHostAndAccountIdFromProfile() error { func (l *logoutSession) setHostAndAccountIdFromProfile() error {
sectionMap, err := l.getConfigSectionMap() sectionMap, err := l.getConfigSectionMap()
if err != nil { if err != nil {
return err return err
} }
if sectionMap["host"] == "" { if sectionMap["host"] == "" {
return fmt.Errorf("no host configured for profile %s", l.Profile) return fmt.Errorf("no host configured for profile %s", l.profile)
} }
l.PersistentAuth.Host = sectionMap["host"] l.persistentAuth.Host = sectionMap["host"]
l.PersistentAuth.AccountID = sectionMap["account_id"] l.persistentAuth.AccountID = sectionMap["account_id"]
return nil return nil
} }
func (l *LogoutSession) getConfigSectionMap() (map[string]string, error) { func (l *logoutSession) getConfigSectionMap() (map[string]string, error) {
section, err := l.File.GetSection(l.Profile) section, err := l.file.GetSection(l.profile)
if err != nil { if err != nil {
return map[string]string{}, fmt.Errorf("profile does not exist in config file: %w", err) return map[string]string{}, fmt.Errorf("profile does not exist in config file: %w", err)
} }
@ -59,16 +59,16 @@ func (l *LogoutSession) getConfigSectionMap() (map[string]string, error) {
} }
// clear token from ~/.databricks/token-cache.json // clear token from ~/.databricks/token-cache.json
func (l *LogoutSession) clearTokenCache(ctx context.Context) error { func (l *logoutSession) clearTokenCache(ctx context.Context) error {
return l.PersistentAuth.ClearToken(ctx) return l.persistentAuth.ClearToken(ctx)
} }
// Overrewrite profile to .databrickscfg without fields marked as sensitive // Overrewrite profile to .databrickscfg without fields marked as sensitive
// Other attributes are preserved. // Other attributes are preserved.
func (l *LogoutSession) clearConfigFile(ctx context.Context, sectionMap map[string]string) error { func (l *logoutSession) clearConfigFile(ctx context.Context, sectionMap map[string]string) error {
return databrickscfg.SaveToProfile(ctx, &config.Config{ return databrickscfg.SaveToProfile(ctx, &config.Config{
ConfigFile: l.File.Path(), ConfigFile: l.file.Path(),
Profile: l.Profile, Profile: l.profile,
Host: sectionMap["host"], Host: sectionMap["host"],
ClusterID: sectionMap["cluster_id"], ClusterID: sectionMap["cluster_id"],
WarehouseID: sectionMap["warehouse_id"], WarehouseID: sectionMap["warehouse_id"],
@ -114,7 +114,7 @@ func newLogoutCommand(persistentAuth *auth.PersistentAuth) *cobra.Command {
} }
} }
defer persistentAuth.Close() defer persistentAuth.Close()
logoutSession := &LogoutSession{} logoutSession := &logoutSession{}
logoutSession.load(ctx, profileName, persistentAuth) logoutSession.load(ctx, profileName, persistentAuth)
configSectionMap, err := logoutSession.getConfigSectionMap() configSectionMap, err := logoutSession.getConfigSectionMap()
if err != nil { if err != nil {

View File

@ -26,11 +26,11 @@ func TestLogout_ClearConfigFile(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
iniFile, err := config.LoadFile(path) iniFile, err := config.LoadFile(path)
require.NoError(t, err) require.NoError(t, err)
logout := &LogoutSession{ logout := &logoutSession{
Profile: "abc", profile: "abc",
File: *iniFile, file: *iniFile,
} }
section, err := logout.File.GetSection("abc") section, err := logout.file.GetSection("abc")
assert.NoError(t, err) assert.NoError(t, err)
sectionMap := section.KeysHash() sectionMap := section.KeysHash()
err = logout.clearConfigFile(ctx, sectionMap) err = logout.clearConfigFile(ctx, sectionMap)
@ -62,15 +62,15 @@ func TestLogout_setHostAndAccountIdFromProfile(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
iniFile, err := config.LoadFile(path) iniFile, err := config.LoadFile(path)
require.NoError(t, err) require.NoError(t, err)
logout := &LogoutSession{ logout := &logoutSession{
Profile: "abc", profile: "abc",
File: *iniFile, file: *iniFile,
PersistentAuth: &auth.PersistentAuth{}, persistentAuth: &auth.PersistentAuth{},
} }
err = logout.setHostAndAccountIdFromProfile() err = logout.setHostAndAccountIdFromProfile()
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, logout.PersistentAuth.Host, "https://foo") assert.Equal(t, logout.persistentAuth.Host, "https://foo")
assert.Empty(t, logout.PersistentAuth.AccountID) assert.Empty(t, logout.persistentAuth.AccountID)
} }
func TestLogout_getConfigSectionMap(t *testing.T) { func TestLogout_getConfigSectionMap(t *testing.T) {
@ -86,10 +86,10 @@ func TestLogout_getConfigSectionMap(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
iniFile, err := config.LoadFile(path) iniFile, err := config.LoadFile(path)
require.NoError(t, err) require.NoError(t, err)
logout := &LogoutSession{ logout := &logoutSession{
Profile: "abc", profile: "abc",
File: *iniFile, file: *iniFile,
PersistentAuth: &auth.PersistentAuth{}, persistentAuth: &auth.PersistentAuth{},
} }
configSectionMap, err := logout.getConfigSectionMap() configSectionMap, err := logout.getConfigSectionMap()
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -10,6 +10,7 @@ import (
"github.com/databricks/cli/cmd/bundle/utils" "github.com/databricks/cli/cmd/bundle/utils"
"github.com/databricks/cli/cmd/root" "github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/sync"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -23,13 +24,19 @@ func newDeployCommand() *cobra.Command {
var force bool var force bool
var forceLock bool var forceLock bool
var failOnActiveRuns bool var failOnActiveRuns bool
var computeID string var clusterId string
var autoApprove bool var autoApprove bool
var verbose bool
cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.") cmd.Flags().BoolVar(&force, "force", false, "Force-override Git branch validation.")
cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.") cmd.Flags().BoolVar(&forceLock, "force-lock", false, "Force acquisition of deployment lock.")
cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.") cmd.Flags().BoolVar(&failOnActiveRuns, "fail-on-active-runs", false, "Fail if there are running jobs or pipelines in the deployment.")
cmd.Flags().StringVarP(&computeID, "compute-id", "c", "", "Override compute in the deployment with the given compute ID.") cmd.Flags().StringVar(&clusterId, "compute-id", "", "Override cluster in the deployment with the given compute ID.")
cmd.Flags().StringVarP(&clusterId, "cluster-id", "c", "", "Override cluster in the deployment with the given cluster ID.")
cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals that might be required for deployment.") cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "Skip interactive approvals that might be required for deployment.")
cmd.Flags().MarkDeprecated("compute-id", "use --cluster-id instead")
cmd.Flags().BoolVar(&verbose, "verbose", false, "Enable verbose output.")
// Verbose flag currently only affects file sync output, it's used by the vscode extension
cmd.Flags().MarkHidden("verbose")
cmd.RunE = func(cmd *cobra.Command, args []string) error { cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context() ctx := cmd.Context()
@ -42,7 +49,10 @@ func newDeployCommand() *cobra.Command {
b.AutoApprove = autoApprove b.AutoApprove = autoApprove
if cmd.Flag("compute-id").Changed { if cmd.Flag("compute-id").Changed {
b.Config.Bundle.ComputeID = computeID b.Config.Bundle.ClusterId = clusterId
}
if cmd.Flag("cluster-id").Changed {
b.Config.Bundle.ClusterId = clusterId
} }
if cmd.Flag("fail-on-active-runs").Changed { if cmd.Flag("fail-on-active-runs").Changed {
b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns b.Config.Bundle.Deployment.FailOnActiveRuns = failOnActiveRuns
@ -51,11 +61,18 @@ func newDeployCommand() *cobra.Command {
return nil return nil
}) })
var outputHandler sync.OutputHandler
if verbose {
outputHandler = func(ctx context.Context, c <-chan sync.Event) {
sync.TextOutput(ctx, c, cmd.OutOrStdout())
}
}
diags = diags.Extend( diags = diags.Extend(
bundle.Apply(ctx, b, bundle.Seq( bundle.Apply(ctx, b, bundle.Seq(
phases.Initialize(), phases.Initialize(),
phases.Build(), phases.Build(),
phases.Deploy(), phases.Deploy(outputHandler),
)), )),
) )
} }

View File

@ -55,13 +55,7 @@ task or a Python wheel task, the second example applies.
return diags.Error() return diags.Error()
} }
diags = bundle.Apply(ctx, b, bundle.Seq( diags = bundle.Apply(ctx, b, phases.Initialize())
phases.Initialize(),
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
terraform.Load(terraform.ErrorOnEmptyState),
))
if err := diags.Error(); err != nil { if err := diags.Error(); err != nil {
return err return err
} }
@ -84,6 +78,16 @@ task or a Python wheel task, the second example applies.
return fmt.Errorf("expected a KEY of the resource to run") return fmt.Errorf("expected a KEY of the resource to run")
} }
diags = bundle.Apply(ctx, b, bundle.Seq(
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
terraform.Load(terraform.ErrorOnEmptyState),
))
if err := diags.Error(); err != nil {
return err
}
runner, err := run.Find(b, args[0]) runner, err := run.Find(b, args[0])
if err != nil { if err != nil {
return err return err

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
stdsync "sync"
"time" "time"
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
@ -46,6 +45,21 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
return nil, flag.ErrHelp return nil, flag.ErrHelp
} }
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch f.output {
case flags.OutputText:
outputFunc = sync.TextOutput
case flags.OutputJSON:
outputFunc = sync.JsonOutput
}
var outputHandler sync.OutputHandler
if outputFunc != nil {
outputHandler = func(ctx context.Context, events <-chan sync.Event) {
outputFunc(ctx, events, cmd.OutOrStdout())
}
}
opts := sync.SyncOptions{ opts := sync.SyncOptions{
LocalRoot: vfs.MustNew(args[0]), LocalRoot: vfs.MustNew(args[0]),
Paths: []string{"."}, Paths: []string{"."},
@ -62,6 +76,8 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn
// exist and add it to the `.gitignore` file in the root. // exist and add it to the `.gitignore` file in the root.
SnapshotBasePath: filepath.Join(args[0], ".databricks"), SnapshotBasePath: filepath.Join(args[0], ".databricks"),
WorkspaceClient: root.WorkspaceClient(cmd.Context()), WorkspaceClient: root.WorkspaceClient(cmd.Context()),
OutputHandler: outputHandler,
} }
return &opts, nil return &opts, nil
} }
@ -118,23 +134,7 @@ func New() *cobra.Command {
if err != nil { if err != nil {
return err return err
} }
defer s.Close()
var outputFunc func(context.Context, <-chan sync.Event, io.Writer)
switch f.output {
case flags.OutputText:
outputFunc = textOutput
case flags.OutputJSON:
outputFunc = jsonOutput
}
var wg stdsync.WaitGroup
if outputFunc != nil {
wg.Add(1)
go func() {
defer wg.Done()
outputFunc(ctx, s.Events(), cmd.OutOrStdout())
}()
}
if f.watch { if f.watch {
err = s.RunContinuous(ctx) err = s.RunContinuous(ctx)
@ -142,8 +142,6 @@ func New() *cobra.Command {
_, err = s.RunOnce(ctx) _, err = s.RunOnce(ctx)
} }
s.Close()
wg.Wait()
return err return err
} }

View File

@ -0,0 +1,16 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for job name"
},
"spark_version": {
"type": "string",
"description": "Spark version used for job cluster"
},
"node_type_id": {
"type": "string",
"description": "Node type id for job cluster"
}
}
}

View File

@ -0,0 +1,24 @@
bundle:
name: basic
workspace:
root_path: "~/.bundle/{{.unique_id}}"
resources:
clusters:
test_cluster:
cluster_name: "test-cluster-{{.unique_id}}"
spark_version: "{{.spark_version}}"
node_type_id: "{{.node_type_id}}"
num_workers: 2
spark_conf:
"spark.executor.memory": "2g"
jobs:
foo:
name: test-job-with-cluster-{{.unique_id}}
tasks:
- task_key: my_notebook_task
existing_cluster_id: "${resources.clusters.test_cluster.cluster_id}"
spark_python_task:
python_file: ./hello_world.py

View File

@ -0,0 +1 @@
print("Hello World!")

View File

@ -0,0 +1,56 @@
package bundle
import (
"fmt"
"testing"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/internal/acc"
"github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/env"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
func TestAccDeployBundleWithCluster(t *testing.T) {
ctx, wt := acc.WorkspaceTest(t)
if testutil.IsAWSCloud(wt.T) {
t.Skip("Skipping test for AWS cloud because it is not permitted to create clusters")
}
nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
uniqueId := uuid.New().String()
root, err := initTestTemplate(t, ctx, "clusters", map[string]any{
"unique_id": uniqueId,
"node_type_id": nodeTypeId,
"spark_version": defaultSparkVersion,
})
require.NoError(t, err)
t.Cleanup(func() {
err = destroyBundle(t, ctx, root)
require.NoError(t, err)
cluster, err := wt.W.Clusters.GetByClusterName(ctx, fmt.Sprintf("test-cluster-%s", uniqueId))
if err != nil {
require.ErrorContains(t, err, "does not exist")
} else {
require.Contains(t, []compute.State{compute.StateTerminated, compute.StateTerminating}, cluster.State)
}
})
err = deployBundle(t, ctx, root)
require.NoError(t, err)
// Cluster should exists after bundle deployment
cluster, err := wt.W.Clusters.GetByClusterName(ctx, fmt.Sprintf("test-cluster-%s", uniqueId))
require.NoError(t, err)
require.NotNil(t, cluster)
out, err := runResource(t, ctx, root, "foo")
require.NoError(t, err)
require.Contains(t, out, "Hello World!")
}

View File

@ -49,3 +49,7 @@ func GetCloud(t *testing.T) Cloud {
} }
return -1 return -1
} }
func IsAWSCloud(t *testing.T) bool {
return GetCloud(t) == AWS
}

View File

@ -9,7 +9,7 @@ import (
type TokenCache interface { type TokenCache interface {
Store(key string, t *oauth2.Token) error Store(key string, t *oauth2.Token) error
Lookup(key string) (*oauth2.Token, error) Lookup(key string) (*oauth2.Token, error)
DeleteKey(key string) error Delete(key string) error
} }
var tokenCache int var tokenCache int

View File

@ -52,11 +52,7 @@ func (c *FileTokenCache) Store(key string, t *oauth2.Token) error {
c.Tokens = map[string]*oauth2.Token{} c.Tokens = map[string]*oauth2.Token{}
} }
c.Tokens[key] = t c.Tokens[key] = t
raw, err := json.MarshalIndent(c, "", " ") return c.write()
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
return os.WriteFile(c.fileLocation, raw, ownerReadWrite)
} }
func (c *FileTokenCache) Lookup(key string) (*oauth2.Token, error) { func (c *FileTokenCache) Lookup(key string) (*oauth2.Token, error) {
@ -73,14 +69,13 @@ func (c *FileTokenCache) Lookup(key string) (*oauth2.Token, error) {
return t, nil return t, nil
} }
func (c *FileTokenCache) DeleteKey(key string) error { func (c *FileTokenCache) Delete(key string) error {
err := c.load() err := c.load()
if errors.Is(err, fs.ErrNotExist) { if errors.Is(err, fs.ErrNotExist) {
return ErrNotConfigured return ErrNotConfigured
} else if err != nil { } else if err != nil {
return fmt.Errorf("load: %w", err) return fmt.Errorf("load: %w", err)
} }
c.Version = tokenCacheVersion
if c.Tokens == nil { if c.Tokens == nil {
c.Tokens = map[string]*oauth2.Token{} c.Tokens = map[string]*oauth2.Token{}
} }
@ -89,11 +84,7 @@ func (c *FileTokenCache) DeleteKey(key string) error {
return ErrNotConfigured return ErrNotConfigured
} }
delete(c.Tokens, key) delete(c.Tokens, key)
raw, err := json.MarshalIndent(c, "", " ") return c.write()
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
return os.WriteFile(c.fileLocation, raw, ownerReadWrite)
} }
func (c *FileTokenCache) location() (string, error) { func (c *FileTokenCache) location() (string, error) {
@ -128,4 +119,12 @@ func (c *FileTokenCache) load() error {
return nil return nil
} }
func (c *FileTokenCache) write() error {
raw, err := json.MarshalIndent(c, "", " ")
if err != nil {
return fmt.Errorf("marshal: %w", err)
}
return os.WriteFile(c.fileLocation, raw, ownerReadWrite)
}
var _ TokenCache = (*FileTokenCache)(nil) var _ TokenCache = (*FileTokenCache)(nil)

View File

@ -1,6 +1,7 @@
package cache package cache
import ( import (
"encoding/json"
"os" "os"
"path/filepath" "path/filepath"
"runtime" "runtime"
@ -118,7 +119,7 @@ func TestStoreAndDeleteKey(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
l := &FileTokenCache{} l := &FileTokenCache{}
err = l.DeleteKey("x") err = l.Delete("x")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, len(l.Tokens)) assert.Equal(t, 1, len(l.Tokens))
@ -134,9 +135,33 @@ func TestDeleteKeyNotExist(t *testing.T) {
c := &FileTokenCache{ c := &FileTokenCache{
Tokens: map[string]*oauth2.Token{}, Tokens: map[string]*oauth2.Token{},
} }
err := c.DeleteKey("x") err := c.Delete("x")
assert.Equal(t, ErrNotConfigured, err) assert.Equal(t, ErrNotConfigured, err)
_, err = c.Lookup("x") _, err = c.Lookup("x")
assert.Equal(t, ErrNotConfigured, err) assert.Equal(t, ErrNotConfigured, err)
} }
func TestWrite(t *testing.T) {
tempFile := filepath.Join(t.TempDir(), "token-cache.json")
tokenMap := map[string]*oauth2.Token{}
token := &oauth2.Token{
AccessToken: "some-access-token",
}
tokenMap["test"] = token
cache := &FileTokenCache{
fileLocation: tempFile,
Tokens: tokenMap,
}
err := cache.write()
assert.NoError(t, err)
content, err := os.ReadFile(tempFile)
require.NoError(t, err)
expected, _ := json.MarshalIndent(&cache, "", " ")
assert.Equal(t, content, expected)
}

View File

@ -23,8 +23,8 @@ func (i *InMemoryTokenCache) Store(key string, t *oauth2.Token) error {
return nil return nil
} }
// DeleteKey implements TokenCache. // Delete implements TokenCache.
func (i *InMemoryTokenCache) DeleteKey(key string) error { func (i *InMemoryTokenCache) Delete(key string) error {
_, ok := i.Tokens[key] _, ok := i.Tokens[key]
if !ok { if !ok {
return ErrNotConfigured return ErrNotConfigured

View File

@ -58,7 +58,7 @@ func TestInMemoryDeleteKey(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
err = c.DeleteKey("x") err = c.Delete("x")
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, len(c.Tokens)) assert.Equal(t, 1, len(c.Tokens))
@ -74,7 +74,7 @@ func TestInMemoryDeleteKeyNotExist(t *testing.T) {
c := &InMemoryTokenCache{ c := &InMemoryTokenCache{
Tokens: map[string]*oauth2.Token{}, Tokens: map[string]*oauth2.Token{},
} }
err := c.DeleteKey("x") err := c.Delete("x")
assert.Equal(t, ErrNotConfigured, err) assert.Equal(t, ErrNotConfigured, err)
_, err = c.Lookup("x") _, err = c.Lookup("x")

View File

@ -152,7 +152,7 @@ func (a *PersistentAuth) ClearToken(ctx context.Context) error {
} }
// lookup token identified by host (and possibly the account id) // lookup token identified by host (and possibly the account id)
key := a.key() key := a.key()
return a.cache.DeleteKey(key) return a.cache.Delete(key)
} }
func (a *PersistentAuth) init(ctx context.Context) error { func (a *PersistentAuth) init(ctx context.Context) error {

View File

@ -53,9 +53,9 @@ func TestOidcForWorkspace(t *testing.T) {
} }
type tokenCacheMock struct { type tokenCacheMock struct {
store func(key string, t *oauth2.Token) error store func(key string, t *oauth2.Token) error
lookup func(key string) (*oauth2.Token, error) lookup func(key string) (*oauth2.Token, error)
deleteKey func(key string) error delete func(key string) error
} }
func (m *tokenCacheMock) Store(key string, t *oauth2.Token) error { func (m *tokenCacheMock) Store(key string, t *oauth2.Token) error {
@ -72,11 +72,11 @@ func (m *tokenCacheMock) Lookup(key string) (*oauth2.Token, error) {
return m.lookup(key) return m.lookup(key)
} }
func (m *tokenCacheMock) DeleteKey(key string) error { func (m *tokenCacheMock) Delete(key string) error {
if m.deleteKey == nil { if m.delete == nil {
panic("no deleteKey mock") panic("no deleteKey mock")
} }
return m.deleteKey(key) return m.delete(key)
} }
func TestLoad(t *testing.T) { func TestLoad(t *testing.T) {
@ -246,7 +246,7 @@ func TestClearToken(t *testing.T) {
assert.Equal(t, "https://abc/oidc/accounts/xyz", key) assert.Equal(t, "https://abc/oidc/accounts/xyz", key)
return &oauth2.Token{}, ErrNotConfigured return &oauth2.Token{}, ErrNotConfigured
}, },
deleteKey: func(key string) error { delete: func(key string) error {
assert.Equal(t, "https://abc/oidc/accounts/xyz", key) assert.Equal(t, "https://abc/oidc/accounts/xyz", key)
return nil return nil
}, },
@ -269,7 +269,7 @@ func TestClearTokenNotExist(t *testing.T) {
assert.Equal(t, "https://abc/oidc/accounts/xyz", key) assert.Equal(t, "https://abc/oidc/accounts/xyz", key)
return &oauth2.Token{}, ErrNotConfigured return &oauth2.Token{}, ErrNotConfigured
}, },
deleteKey: func(key string) error { delete: func(key string) error {
assert.Equal(t, "https://abc/oidc/accounts/xyz", key) assert.Equal(t, "https://abc/oidc/accounts/xyz", key)
return ErrNotConfigured return ErrNotConfigured
}, },

View File

@ -209,7 +209,26 @@ func TestRepositoryGitConfigWhenNotARepo(t *testing.T) {
} }
func TestRepositoryOriginUrlRemovesUserCreds(t *testing.T) { func TestRepositoryOriginUrlRemovesUserCreds(t *testing.T) {
repo := newTestRepository(t) tcases := []struct {
repo.addOriginUrl("https://username:token@github.com/databricks/foobar.git") url string
repo.assertOriginUrl("https://github.com/databricks/foobar.git") expected string
}{
{
url: "https://username:token@github.com/databricks/foobar.git",
expected: "https://github.com/databricks/foobar.git",
},
{
// Note: The token is still considered and parsed as a username here.
// However credentials integrations by Git providers like GitHub
// allow for setting a PAT token as a username.
url: "https://token@github.com/databricks/foobar.git",
expected: "https://github.com/databricks/foobar.git",
},
}
for _, tc := range tcases {
repo := newTestRepository(t)
repo.addOriginUrl(tc.url)
repo.assertOriginUrl(tc.expected)
}
} }

View File

@ -5,12 +5,10 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"io" "io"
"github.com/databricks/cli/libs/sync"
) )
// Read synchronization events and write them as JSON to the specified writer (typically stdout). // Read synchronization events and write them as JSON to the specified writer (typically stdout).
func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { func JsonOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
enc := json.NewEncoder(w) enc := json.NewEncoder(w)
for { for {
select { select {
@ -31,7 +29,7 @@ func jsonOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) {
} }
// Read synchronization events and write them as text to the specified writer (typically stdout). // Read synchronization events and write them as text to the specified writer (typically stdout).
func textOutput(ctx context.Context, ch <-chan sync.Event, w io.Writer) { func TextOutput(ctx context.Context, ch <-chan Event, w io.Writer) {
bw := bufio.NewWriter(w) bw := bufio.NewWriter(w)
for { for {

View File

@ -3,6 +3,7 @@ package sync
import ( import (
"context" "context"
"fmt" "fmt"
stdsync "sync"
"time" "time"
"github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/filer"
@ -15,6 +16,8 @@ import (
"github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/iam"
) )
type OutputHandler func(context.Context, <-chan Event)
type SyncOptions struct { type SyncOptions struct {
LocalRoot vfs.Path LocalRoot vfs.Path
Paths []string Paths []string
@ -34,6 +37,8 @@ type SyncOptions struct {
CurrentUser *iam.User CurrentUser *iam.User
Host string Host string
OutputHandler OutputHandler
} }
type Sync struct { type Sync struct {
@ -49,6 +54,10 @@ type Sync struct {
// Synchronization progress events are sent to this event notifier. // Synchronization progress events are sent to this event notifier.
notifier EventNotifier notifier EventNotifier
seq int seq int
// WaitGroup is automatically created when an output handler is provided in the SyncOptions.
// Close call is required to ensure the output handler goroutine handles all events in time.
outputWaitGroup *stdsync.WaitGroup
} }
// New initializes and returns a new [Sync] instance. // New initializes and returns a new [Sync] instance.
@ -106,31 +115,41 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
return nil, err return nil, err
} }
var notifier EventNotifier
var outputWaitGroup = &stdsync.WaitGroup{}
if opts.OutputHandler != nil {
ch := make(chan Event, MaxRequestsInFlight)
notifier = &ChannelNotifier{ch}
outputWaitGroup.Add(1)
go func() {
defer outputWaitGroup.Done()
opts.OutputHandler(ctx, ch)
}()
} else {
notifier = &NopNotifier{}
}
return &Sync{ return &Sync{
SyncOptions: &opts, SyncOptions: &opts,
fileSet: fileSet, fileSet: fileSet,
includeFileSet: includeFileSet, includeFileSet: includeFileSet,
excludeFileSet: excludeFileSet, excludeFileSet: excludeFileSet,
snapshot: snapshot, snapshot: snapshot,
filer: filer, filer: filer,
notifier: &NopNotifier{}, notifier: notifier,
seq: 0, outputWaitGroup: outputWaitGroup,
seq: 0,
}, nil }, nil
} }
func (s *Sync) Events() <-chan Event {
ch := make(chan Event, MaxRequestsInFlight)
s.notifier = &ChannelNotifier{ch}
return ch
}
func (s *Sync) Close() { func (s *Sync) Close() {
if s.notifier == nil { if s.notifier == nil {
return return
} }
s.notifier.Close() s.notifier.Close()
s.notifier = nil s.notifier = nil
s.outputWaitGroup.Wait()
} }
func (s *Sync) notifyStart(ctx context.Context, d diff) { func (s *Sync) notifyStart(ctx context.Context, d diff) {

View File

@ -3,6 +3,12 @@ resources:
pipelines: pipelines:
{{.project_name}}_pipeline: {{.project_name}}_pipeline:
name: {{.project_name}}_pipeline name: {{.project_name}}_pipeline
{{- if eq default_catalog ""}}
## Specify the 'catalog' field to configure this pipeline to make use of Unity Catalog:
# catalog: catalog_name
{{- else}}
catalog: {{default_catalog}}
{{- end}}
target: {{.project_name}}_${bundle.environment} target: {{.project_name}}_${bundle.environment}
libraries: libraries:
- notebook: - notebook: