From e4cd7828525ae53547581896c9dc3baecc118d0b Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Mon, 3 Mar 2025 14:35:36 +0100 Subject: [PATCH] Remove bundle.{Parallel,ReadOnlyBundle} (#2414) ## Changes - Remove bundle.Parallel & bundle.ReadOnlyBundle. - Add bundle.ApplyParallel, as a helper to migrate from bundle.Parallel. - Keep ReadOnlyMutator as a separate type but it's now a subtype of Mutator so it works on regular *Bundle. Having it as a separate type prevents non-readonly mutators being passed to ApplyParallel - validate.Validate becomes a function (was Mutator). ## Why This a follow up to #2390 where we removed most of the tools to construct chains of mutators. Same motivation applies here. When it comes to read-only bundles, it's a leaky abstraction -- since it's a shallow copy, it does not actually guarantee or enforce readonly access to bundle. A better approach would be to run parallel operations on independent narrowly-focused deep-copied structs, with just enough information to carry out the task (this is not implemented here, but the eventual goal). Now that we can just write regular code in phases and not limited to mutator interface, we can switch to that approach. ## Tests Existing tests. --------- Co-authored-by: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com> --- .../bundle/debug/out.stderr.parallel.txt | 15 ---- acceptance/bundle/debug/out.stderr.txt | 14 +++- acceptance/bundle/debug/script | 5 +- bundle/bundle_read_only.go | 49 ----------- bundle/config/validate/fast_validate.go | 28 ++----- bundle/config/validate/files_to_sync.go | 16 ++-- bundle/config/validate/files_to_sync_test.go | 9 +- bundle/config/validate/folder_permissions.go | 13 ++- .../validate/folder_permissions_test.go | 16 +--- .../validate/job_cluster_key_defined.go | 15 ++-- .../validate/job_cluster_key_defined_test.go | 6 +- .../config/validate/job_task_cluster_spec.go | 14 ++-- .../validate/job_task_cluster_spec_test.go | 2 +- bundle/config/validate/single_node_cluster.go | 6 +- .../validate/single_node_cluster_test.go | 20 ++--- bundle/config/validate/validate.go | 38 +-------- .../config/validate/validate_artifact_path.go | 16 ++-- .../validate/validate_artifact_path_test.go | 7 +- .../config/validate/validate_sync_patterns.go | 20 ++--- bundle/deploy/files/delete.go | 2 +- bundle/deploy/files/sync.go | 28 +++---- bundle/deploy/files/upload.go | 2 +- bundle/deploy/state_pull.go | 2 +- bundle/deploy/state_pull_test.go | 4 +- bundle/mutator_read_only.go | 55 ++++++++++--- bundle/parallel.go | 43 ---------- bundle/parallel_test.go | 82 ------------------- bundle/tests/job_cluster_key_test.go | 4 +- .../sync_include_exclude_no_matches_test.go | 6 +- cmd/bundle/sync.go | 2 +- cmd/bundle/validate.go | 2 +- cmd/sync/sync.go | 2 +- 32 files changed, 167 insertions(+), 376 deletions(-) delete mode 100644 acceptance/bundle/debug/out.stderr.parallel.txt delete mode 100644 bundle/bundle_read_only.go delete mode 100644 bundle/parallel.go delete mode 100644 bundle/parallel_test.go diff --git a/acceptance/bundle/debug/out.stderr.parallel.txt b/acceptance/bundle/debug/out.stderr.parallel.txt deleted file mode 100644 index 13c81c511..000000000 --- a/acceptance/bundle/debug/out.stderr.parallel.txt +++ /dev/null @@ -1,15 +0,0 @@ -10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel -10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) -10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) mutator (read-only)=parallel -10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) mutator (read-only)=parallel mutator (read-only)=validate:SingleNodeCluster -10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) mutator (read-only)=parallel mutator (read-only)=validate:artifact_paths -10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) mutator (read-only)=parallel mutator (read-only)=validate:job_cluster_key_defined -10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) mutator (read-only)=parallel mutator (read-only)=validate:job_task_cluster_spec -10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync -10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:folder_permissions -10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:validate_sync_patterns -10:07:59 Debug: Path /Workspace/Users/[USERNAME]/.bundle/debug/default/files has type directory (ID: 0) pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync -10:07:59 Debug: non-retriable error: Workspace path not found pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync sdk=true -< HTTP/0.0 000 OK pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync sdk=true -< } pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync sdk=true -< } pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync sdk=true diff --git a/acceptance/bundle/debug/out.stderr.txt b/acceptance/bundle/debug/out.stderr.txt index 147b33a45..b9ac5c4d9 100644 --- a/acceptance/bundle/debug/out.stderr.txt +++ b/acceptance/bundle/debug/out.stderr.txt @@ -72,18 +72,30 @@ 10:07:59 Debug: Environment variables for Terraform: ...redacted... pid=12345 mutator=terraform.Initialize 10:07:59 Debug: Apply pid=12345 mutator=scripts.postinit 10:07:59 Debug: No script defined for postinit, skipping pid=12345 mutator=scripts.postinit -10:07:59 Debug: Apply pid=12345 mutator=validate +10:07:59 Debug: ApplyParallel pid=12345 mutator=fast_validate(readonly) +10:07:59 Debug: ApplyParallel pid=12345 mutator=validate:files_to_sync +10:07:59 Debug: ApplyParallel pid=12345 mutator=validate:folder_permissions +10:07:59 Debug: ApplyParallel pid=12345 mutator=validate:validate_sync_patterns +10:07:59 Debug: ApplyParallel pid=12345 mutator=fast_validate(readonly) mutator=validate:job_cluster_key_defined +10:07:59 Debug: ApplyParallel pid=12345 mutator=fast_validate(readonly) mutator=validate:job_task_cluster_spec +10:07:59 Debug: ApplyParallel pid=12345 mutator=fast_validate(readonly) mutator=validate:SingleNodeCluster +10:07:59 Debug: ApplyParallel pid=12345 mutator=fast_validate(readonly) mutator=validate:artifact_paths 10:07:59 Debug: GET /api/2.0/workspace/get-status?path=/Workspace/Users/[USERNAME]/.bundle/debug/default/files < HTTP/1.1 404 Not Found < { < "message": "Workspace path not found" +< } pid=12345 mutator=validate:files_to_sync sdk=true +10:07:59 Debug: non-retriable error: Workspace path not found pid=12345 mutator=validate:files_to_sync sdk=true 10:07:59 Debug: POST /api/2.0/workspace/mkdirs > { > "path": "/Workspace/Users/[USERNAME]/.bundle/debug/default/files" > } +< HTTP/1.1 200 OK pid=12345 mutator=validate:files_to_sync sdk=true 10:07:59 Debug: GET /api/2.0/workspace/get-status?path=/Workspace/Users/[USERNAME]/.bundle/debug/default/files < HTTP/1.1 200 OK < { < "object_type": "DIRECTORY", < "path": "/Workspace/Users/[USERNAME]/.bundle/debug/default/files" +< } pid=12345 mutator=validate:files_to_sync sdk=true +10:07:59 Debug: Path /Workspace/Users/[USERNAME]/.bundle/debug/default/files has type directory (ID: 0) pid=12345 mutator=validate:files_to_sync 10:07:59 Info: completed execution pid=12345 exit_code=0 diff --git a/acceptance/bundle/debug/script b/acceptance/bundle/debug/script index 005a1a341..913f07e41 100644 --- a/acceptance/bundle/debug/script +++ b/acceptance/bundle/debug/script @@ -1,4 +1 @@ -$CLI bundle validate --debug 2> full.stderr.txt -grep -vw parallel full.stderr.txt > out.stderr.txt -grep -w parallel full.stderr.txt | sed 's/[0-9]/0/g' | sort_lines.py > out.stderr.parallel.txt -rm full.stderr.txt +$CLI bundle validate --debug 2> out.stderr.txt diff --git a/bundle/bundle_read_only.go b/bundle/bundle_read_only.go deleted file mode 100644 index 4bdd94e59..000000000 --- a/bundle/bundle_read_only.go +++ /dev/null @@ -1,49 +0,0 @@ -package bundle - -import ( - "context" - - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/libs/vfs" - "github.com/databricks/databricks-sdk-go" -) - -type ReadOnlyBundle struct { - b *Bundle -} - -func ReadOnly(b *Bundle) ReadOnlyBundle { - return ReadOnlyBundle{b: b} -} - -func (r ReadOnlyBundle) Config() config.Root { - return r.b.Config -} - -func (r ReadOnlyBundle) RootPath() string { - return r.b.BundleRootPath -} - -func (r ReadOnlyBundle) BundleRoot() vfs.Path { - return r.b.BundleRoot -} - -func (r ReadOnlyBundle) SyncRoot() vfs.Path { - return r.b.SyncRoot -} - -func (r ReadOnlyBundle) WorktreeRoot() vfs.Path { - return r.b.WorktreeRoot -} - -func (r ReadOnlyBundle) WorkspaceClient() *databricks.WorkspaceClient { - return r.b.WorkspaceClient() -} - -func (r ReadOnlyBundle) CacheDir(ctx context.Context, paths ...string) (string, error) { - return r.b.CacheDir(ctx, paths...) -} - -func (r ReadOnlyBundle) GetSyncIncludePatterns(ctx context.Context) ([]string, error) { - return r.b.GetSyncIncludePatterns(ctx) -} diff --git a/bundle/config/validate/fast_validate.go b/bundle/config/validate/fast_validate.go index 47d83036d..e116034ad 100644 --- a/bundle/config/validate/fast_validate.go +++ b/bundle/config/validate/fast_validate.go @@ -14,18 +14,18 @@ import ( // 2. The validation is blocking for bundle deployments. // // The full suite of validation mutators is available in the [Validate] mutator. -type fastValidateReadonly struct{} +type fastValidate struct{ bundle.RO } -func FastValidateReadonly() bundle.ReadOnlyMutator { - return &fastValidateReadonly{} +func FastValidate() bundle.ReadOnlyMutator { + return &fastValidate{} } -func (f *fastValidateReadonly) Name() string { +func (f *fastValidate) Name() string { return "fast_validate(readonly)" } -func (f *fastValidateReadonly) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { - return bundle.ApplyReadOnly(ctx, rb, bundle.Parallel( +func (f *fastValidate) Apply(ctx context.Context, rb *bundle.Bundle) diag.Diagnostics { + return bundle.ApplyParallel(ctx, rb, // Fast mutators with only in-memory checks JobClusterKeyDefined(), JobTaskClusterSpec(), @@ -33,19 +33,5 @@ func (f *fastValidateReadonly) Apply(ctx context.Context, rb bundle.ReadOnlyBund // Blocking mutators. Deployments will fail if these checks fail. ValidateArtifactPath(), - )) -} - -type fastValidate struct{} - -func FastValidate() bundle.Mutator { - return &fastValidate{} -} - -func (f *fastValidate) Name() string { - return "fast_validate" -} - -func (f *fastValidate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - return bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), FastValidateReadonly()) + ) } diff --git a/bundle/config/validate/files_to_sync.go b/bundle/config/validate/files_to_sync.go index b4de06773..aea78f710 100644 --- a/bundle/config/validate/files_to_sync.go +++ b/bundle/config/validate/files_to_sync.go @@ -13,20 +13,20 @@ func FilesToSync() bundle.ReadOnlyMutator { return &filesToSync{} } -type filesToSync struct{} +type filesToSync struct{ bundle.RO } func (v *filesToSync) Name() string { return "validate:files_to_sync" } -func (v *filesToSync) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { +func (v *filesToSync) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { // The user may be intentional about not synchronizing any files. // In this case, we should not show any warnings. - if len(rb.Config().Sync.Paths) == 0 { + if len(b.Config.Sync.Paths) == 0 { return nil } - sync, err := files.GetSync(ctx, rb) + sync, err := files.GetSync(ctx, b) if err != nil { return diag.FromErr(err) } @@ -42,20 +42,20 @@ func (v *filesToSync) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag. } diags := diag.Diagnostics{} - if len(rb.Config().Sync.Exclude) == 0 { + if len(b.Config.Sync.Exclude) == 0 { diags = diags.Append(diag.Diagnostic{ Severity: diag.Warning, Summary: "There are no files to sync, please check your .gitignore", }) } else { - loc := location{path: "sync.exclude", rb: rb} + path := "sync.exclude" diags = diags.Append(diag.Diagnostic{ Severity: diag.Warning, Summary: "There are no files to sync, please check your .gitignore and sync.exclude configuration", // Show all locations where sync.exclude is defined, since merging // sync.exclude is additive. - Locations: loc.Locations(), - Paths: []dyn.Path{loc.Path()}, + Locations: b.Config.GetLocations(path), + Paths: []dyn.Path{dyn.MustPathFromString(path)}, }) } diff --git a/bundle/config/validate/files_to_sync_test.go b/bundle/config/validate/files_to_sync_test.go index dd40295c3..0a5f69727 100644 --- a/bundle/config/validate/files_to_sync_test.go +++ b/bundle/config/validate/files_to_sync_test.go @@ -29,8 +29,7 @@ func TestFilesToSync_NoPaths(t *testing.T) { } ctx := context.Background() - rb := bundle.ReadOnly(b) - diags := bundle.ApplyReadOnly(ctx, rb, FilesToSync()) + diags := FilesToSync().Apply(ctx, b) assert.Empty(t, diags) } @@ -85,8 +84,7 @@ func TestFilesToSync_EverythingIgnored(t *testing.T) { testutil.WriteFile(t, filepath.Join(b.BundleRootPath, ".gitignore"), "*\n.*\n") ctx := context.Background() - rb := bundle.ReadOnly(b) - diags := bundle.ApplyReadOnly(ctx, rb, FilesToSync()) + diags := FilesToSync().Apply(ctx, b) require.Len(t, diags, 1) assert.Equal(t, diag.Warning, diags[0].Severity) assert.Equal(t, "There are no files to sync, please check your .gitignore", diags[0].Summary) @@ -99,8 +97,7 @@ func TestFilesToSync_EverythingExcluded(t *testing.T) { b.Config.Sync.Exclude = []string{"*"} ctx := context.Background() - rb := bundle.ReadOnly(b) - diags := bundle.ApplyReadOnly(ctx, rb, FilesToSync()) + diags := FilesToSync().Apply(ctx, b) require.Len(t, diags, 1) assert.Equal(t, diag.Warning, diags[0].Severity) assert.Equal(t, "There are no files to sync, please check your .gitignore and sync.exclude configuration", diags[0].Summary) diff --git a/bundle/config/validate/folder_permissions.go b/bundle/config/validate/folder_permissions.go index 7b12b4b16..575702c34 100644 --- a/bundle/config/validate/folder_permissions.go +++ b/bundle/config/validate/folder_permissions.go @@ -16,15 +16,14 @@ import ( "golang.org/x/sync/errgroup" ) -type folderPermissions struct{} +type folderPermissions struct{ bundle.RO } -// Apply implements bundle.ReadOnlyMutator. -func (f *folderPermissions) Apply(ctx context.Context, b bundle.ReadOnlyBundle) diag.Diagnostics { - if len(b.Config().Permissions) == 0 { +func (f *folderPermissions) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + if len(b.Config.Permissions) == 0 { return nil } - bundlePaths := paths.CollectUniqueWorkspacePathPrefixes(b.Config().Workspace) + bundlePaths := paths.CollectUniqueWorkspacePathPrefixes(b.Config.Workspace) var diags diag.Diagnostics g, ctx := errgroup.WithContext(ctx) @@ -48,7 +47,7 @@ func (f *folderPermissions) Apply(ctx context.Context, b bundle.ReadOnlyBundle) return diags } -func checkFolderPermission(ctx context.Context, b bundle.ReadOnlyBundle, folderPath string) diag.Diagnostics { +func checkFolderPermission(ctx context.Context, b *bundle.Bundle, folderPath string) diag.Diagnostics { // If the folder is shared, then we don't need to check permissions as it was already checked in the other mutator before. if libraries.IsWorkspaceSharedPath(folderPath) { return nil @@ -69,7 +68,7 @@ func checkFolderPermission(ctx context.Context, b bundle.ReadOnlyBundle, folderP } p := permissions.ObjectAclToResourcePermissions(folderPath, objPermissions.AccessControlList) - return p.Compare(b.Config().Permissions) + return p.Compare(b.Config.Permissions) } func getClosestExistingObject(ctx context.Context, w workspace.WorkspaceInterface, folderPath string) (*workspace.ObjectInfo, error) { diff --git a/bundle/config/validate/folder_permissions_test.go b/bundle/config/validate/folder_permissions_test.go index 8e68c9fbf..40906ce6f 100644 --- a/bundle/config/validate/folder_permissions_test.go +++ b/bundle/config/validate/folder_permissions_test.go @@ -69,9 +69,7 @@ func TestFolderPermissionsInheritedWhenRootPathDoesNotExist(t *testing.T) { }, nil) b.SetWorkpaceClient(m.WorkspaceClient) - rb := bundle.ReadOnly(b) - - diags := bundle.ApplyReadOnly(context.Background(), rb, ValidateFolderPermissions()) + diags := ValidateFolderPermissions().Apply(context.Background(), b) require.Empty(t, diags) } @@ -118,9 +116,7 @@ func TestValidateFolderPermissionsFailsOnMissingBundlePermission(t *testing.T) { }, nil) b.SetWorkpaceClient(m.WorkspaceClient) - rb := bundle.ReadOnly(b) - - diags := bundle.ApplyReadOnly(context.Background(), rb, ValidateFolderPermissions()) + diags := ValidateFolderPermissions().Apply(context.Background(), b) require.Len(t, diags, 1) require.Equal(t, "untracked permissions apply to target workspace path", diags[0].Summary) require.Equal(t, diag.Warning, diags[0].Severity) @@ -164,9 +160,7 @@ func TestValidateFolderPermissionsFailsOnPermissionMismatch(t *testing.T) { }, nil) b.SetWorkpaceClient(m.WorkspaceClient) - rb := bundle.ReadOnly(b) - - diags := bundle.ApplyReadOnly(context.Background(), rb, ValidateFolderPermissions()) + diags := ValidateFolderPermissions().Apply(context.Background(), b) require.Len(t, diags, 1) require.Equal(t, "untracked permissions apply to target workspace path", diags[0].Summary) require.Equal(t, diag.Warning, diags[0].Severity) @@ -199,9 +193,7 @@ func TestValidateFolderPermissionsFailsOnNoRootFolder(t *testing.T) { }) b.SetWorkpaceClient(m.WorkspaceClient) - rb := bundle.ReadOnly(b) - - diags := bundle.ApplyReadOnly(context.Background(), rb, ValidateFolderPermissions()) + diags := ValidateFolderPermissions().Apply(context.Background(), b) require.Len(t, diags, 1) require.Equal(t, "folder / and its parent folders do not exist", diags[0].Summary) require.Equal(t, diag.Error, diags[0].Severity) diff --git a/bundle/config/validate/job_cluster_key_defined.go b/bundle/config/validate/job_cluster_key_defined.go index c3a1ab3df..5ae2f5437 100644 --- a/bundle/config/validate/job_cluster_key_defined.go +++ b/bundle/config/validate/job_cluster_key_defined.go @@ -13,16 +13,16 @@ func JobClusterKeyDefined() bundle.ReadOnlyMutator { return &jobClusterKeyDefined{} } -type jobClusterKeyDefined struct{} +type jobClusterKeyDefined struct{ bundle.RO } func (v *jobClusterKeyDefined) Name() string { return "validate:job_cluster_key_defined" } -func (v *jobClusterKeyDefined) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { +func (v *jobClusterKeyDefined) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { diags := diag.Diagnostics{} - for k, job := range rb.Config().Resources.Jobs { + for k, job := range b.Config.Resources.Jobs { jobClusterKeys := make(map[string]bool) for _, cluster := range job.JobClusters { if cluster.JobClusterKey != "" { @@ -33,10 +33,7 @@ func (v *jobClusterKeyDefined) Apply(ctx context.Context, rb bundle.ReadOnlyBund for index, task := range job.Tasks { if task.JobClusterKey != "" { if _, ok := jobClusterKeys[task.JobClusterKey]; !ok { - loc := location{ - path: fmt.Sprintf("resources.jobs.%s.tasks[%d].job_cluster_key", k, index), - rb: rb, - } + path := fmt.Sprintf("resources.jobs.%s.tasks[%d].job_cluster_key", k, index) diags = diags.Append(diag.Diagnostic{ Severity: diag.Warning, @@ -44,8 +41,8 @@ func (v *jobClusterKeyDefined) Apply(ctx context.Context, rb bundle.ReadOnlyBund // Show only the location where the job_cluster_key is defined. // Other associated locations are not relevant since they are // overridden during merging. - Locations: []dyn.Location{loc.Location()}, - Paths: []dyn.Path{loc.Path()}, + Locations: b.Config.GetLocations(path), + Paths: []dyn.Path{dyn.MustPathFromString(path)}, }) } } diff --git a/bundle/config/validate/job_cluster_key_defined_test.go b/bundle/config/validate/job_cluster_key_defined_test.go index 2cbdb7c6a..559bd1c46 100644 --- a/bundle/config/validate/job_cluster_key_defined_test.go +++ b/bundle/config/validate/job_cluster_key_defined_test.go @@ -33,7 +33,7 @@ func TestJobClusterKeyDefined(t *testing.T) { }, } - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobClusterKeyDefined()) + diags := JobClusterKeyDefined().Apply(context.Background(), b) require.Empty(t, diags) require.NoError(t, diags.Error()) } @@ -56,7 +56,7 @@ func TestJobClusterKeyNotDefined(t *testing.T) { }, } - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobClusterKeyDefined()) + diags := JobClusterKeyDefined().Apply(context.Background(), b) require.Len(t, diags, 1) require.NoError(t, diags.Error()) require.Equal(t, diag.Warning, diags[0].Severity) @@ -89,7 +89,7 @@ func TestJobClusterKeyDefinedInDifferentJob(t *testing.T) { }, } - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobClusterKeyDefined()) + diags := JobClusterKeyDefined().Apply(context.Background(), b) require.Len(t, diags, 1) require.NoError(t, diags.Error()) require.Equal(t, diag.Warning, diags[0].Severity) diff --git a/bundle/config/validate/job_task_cluster_spec.go b/bundle/config/validate/job_task_cluster_spec.go index 5f532acfe..79672be63 100644 --- a/bundle/config/validate/job_task_cluster_spec.go +++ b/bundle/config/validate/job_task_cluster_spec.go @@ -17,31 +17,31 @@ func JobTaskClusterSpec() bundle.ReadOnlyMutator { return &jobTaskClusterSpec{} } -type jobTaskClusterSpec struct{} +type jobTaskClusterSpec struct{ bundle.RO } func (v *jobTaskClusterSpec) Name() string { return "validate:job_task_cluster_spec" } -func (v *jobTaskClusterSpec) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { +func (v *jobTaskClusterSpec) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { diags := diag.Diagnostics{} jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs")) - for resourceName, job := range rb.Config().Resources.Jobs { + for resourceName, job := range b.Config.Resources.Jobs { resourcePath := jobsPath.Append(dyn.Key(resourceName)) for taskIndex, task := range job.Tasks { taskPath := resourcePath.Append(dyn.Key("tasks"), dyn.Index(taskIndex)) - diags = diags.Extend(validateJobTask(rb, task, taskPath)) + diags = diags.Extend(validateJobTask(b, task, taskPath)) } } return diags } -func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics { +func validateJobTask(b *bundle.Bundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics { diags := diag.Diagnostics{} var specified []string @@ -74,7 +74,7 @@ func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path if task.ForEachTask != nil { forEachTaskPath := taskPath.Append(dyn.Key("for_each_task"), dyn.Key("task")) - diags = diags.Extend(validateJobTask(rb, task.ForEachTask.Task, forEachTaskPath)) + diags = diags.Extend(validateJobTask(b, task.ForEachTask.Task, forEachTaskPath)) } if isComputeTask(task) && len(specified) == 0 { @@ -92,7 +92,7 @@ func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path Severity: diag.Error, Summary: "Missing required cluster or environment settings", Detail: detail, - Locations: rb.Config().GetLocations(taskPath.String()), + Locations: b.Config.GetLocations(taskPath.String()), Paths: []dyn.Path{taskPath}, }) } diff --git a/bundle/config/validate/job_task_cluster_spec_test.go b/bundle/config/validate/job_task_cluster_spec_test.go index a3a7ccf25..fd316d61f 100644 --- a/bundle/config/validate/job_task_cluster_spec_test.go +++ b/bundle/config/validate/job_task_cluster_spec_test.go @@ -174,7 +174,7 @@ Specify one of the following fields: job_cluster_key, environment_key, existing_ } b := createBundle(map[string]*resources.Job{"job1": job}) - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobTaskClusterSpec()) + diags := JobTaskClusterSpec().Apply(context.Background(), b) if tc.errorPath != "" || tc.errorDetail != "" || tc.errorSummary != "" { assert.Len(t, diags, 1) diff --git a/bundle/config/validate/single_node_cluster.go b/bundle/config/validate/single_node_cluster.go index 7c159f61a..acc5f444d 100644 --- a/bundle/config/validate/single_node_cluster.go +++ b/bundle/config/validate/single_node_cluster.go @@ -16,7 +16,7 @@ func SingleNodeCluster() bundle.ReadOnlyMutator { return &singleNodeCluster{} } -type singleNodeCluster struct{} +type singleNodeCluster struct{ bundle.RO } func (m *singleNodeCluster) Name() string { return "validate:SingleNodeCluster" @@ -98,7 +98,7 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool { return false } -func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { +func (m *singleNodeCluster) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { diags := diag.Diagnostics{} patterns := []dyn.Pattern{ @@ -115,7 +115,7 @@ func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) } for _, p := range patterns { - _, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { + _, err := dyn.MapByPattern(b.Config.Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) { warning := diag.Diagnostic{ Severity: diag.Warning, Summary: singleNodeWarningSummary, diff --git a/bundle/config/validate/single_node_cluster_test.go b/bundle/config/validate/single_node_cluster_test.go index be93420c6..c6d5f8ca9 100644 --- a/bundle/config/validate/single_node_cluster_test.go +++ b/bundle/config/validate/single_node_cluster_test.go @@ -116,7 +116,7 @@ func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) { bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) { return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0)) }) - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + diags := SingleNodeCluster().Apply(ctx, b) assert.Equal(t, diag.Diagnostics{ { Severity: diag.Warning, @@ -165,7 +165,7 @@ func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) { return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0)) }) - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + diags := SingleNodeCluster().Apply(ctx, b) assert.Equal(t, diag.Diagnostics{ { Severity: diag.Warning, @@ -214,7 +214,7 @@ func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) { return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0)) }) - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + diags := bundle.Apply(ctx, b, SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { Severity: diag.Warning, @@ -260,7 +260,7 @@ func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) { return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0)) }) - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + diags := bundle.Apply(ctx, b, SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { Severity: diag.Warning, @@ -313,7 +313,7 @@ func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) { return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0)) }) - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + diags := bundle.Apply(ctx, b, SingleNodeCluster()) assert.Equal(t, diag.Diagnostics{ { Severity: diag.Warning, @@ -397,7 +397,7 @@ func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) { }) } - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + diags := bundle.Apply(ctx, b, SingleNodeCluster()) assert.Empty(t, diags) }) } @@ -437,7 +437,7 @@ func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) { }) } - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + diags := bundle.Apply(ctx, b, SingleNodeCluster()) assert.Empty(t, diags) }) } @@ -477,7 +477,7 @@ func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) { }) } - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + diags := bundle.Apply(ctx, b, SingleNodeCluster()) assert.Empty(t, diags) }) } @@ -514,7 +514,7 @@ func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) { }) } - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + diags := bundle.Apply(ctx, b, SingleNodeCluster()) assert.Empty(t, diags) }) } @@ -558,7 +558,7 @@ func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) { }) } - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster()) + diags := bundle.Apply(ctx, b, SingleNodeCluster()) assert.Empty(t, diags) }) } diff --git a/bundle/config/validate/validate.go b/bundle/config/validate/validate.go index 8fdd704ab..83d17337d 100644 --- a/bundle/config/validate/validate.go +++ b/bundle/config/validate/validate.go @@ -5,46 +5,16 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/dyn" ) -type validate struct{} - -type location struct { - path string - rb bundle.ReadOnlyBundle -} - -func (l location) Location() dyn.Location { - return l.rb.Config().GetLocation(l.path) -} - -func (l location) Locations() []dyn.Location { - return l.rb.Config().GetLocations(l.path) -} - -func (l location) Path() dyn.Path { - return dyn.MustPathFromString(l.path) -} - -// Apply implements bundle.Mutator. -func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - return bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), bundle.Parallel( - FastValidateReadonly(), +func Validate(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + return bundle.ApplyParallel(ctx, b, + FastValidate(), // Slow mutators that require network or file i/o. These are only // run in the `bundle validate` command. FilesToSync(), ValidateFolderPermissions(), ValidateSyncPatterns(), - )) -} - -// Name implements bundle.Mutator. -func (v *validate) Name() string { - return "validate" -} - -func Validate() bundle.Mutator { - return &validate{} + ) } diff --git a/bundle/config/validate/validate_artifact_path.go b/bundle/config/validate/validate_artifact_path.go index aa4492670..78536d4bd 100644 --- a/bundle/config/validate/validate_artifact_path.go +++ b/bundle/config/validate/validate_artifact_path.go @@ -16,7 +16,7 @@ import ( "github.com/databricks/databricks-sdk-go/apierr" ) -type validateArtifactPath struct{} +type validateArtifactPath struct{ bundle.RO } func ValidateArtifactPath() bundle.ReadOnlyMutator { return &validateArtifactPath{} @@ -74,9 +74,9 @@ func findVolumeInBundle(r config.Root, catalogName, schemaName, volumeName strin return nil, nil, false } -func (v *validateArtifactPath) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { +func (v *validateArtifactPath) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { // We only validate UC Volumes paths right now. - if !libraries.IsVolumesPath(rb.Config().Workspace.ArtifactPath) { + if !libraries.IsVolumesPath(b.Config.Workspace.ArtifactPath) { return nil } @@ -85,25 +85,25 @@ func (v *validateArtifactPath) Apply(ctx context.Context, rb bundle.ReadOnlyBund { Summary: s, Severity: diag.Error, - Locations: rb.Config().GetLocations("workspace.artifact_path"), + Locations: b.Config.GetLocations("workspace.artifact_path"), Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, }, } } - catalogName, schemaName, volumeName, err := extractVolumeFromPath(rb.Config().Workspace.ArtifactPath) + catalogName, schemaName, volumeName, err := extractVolumeFromPath(b.Config.Workspace.ArtifactPath) if err != nil { return wrapErrorMsg(err.Error()) } volumeFullName := fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName) - w := rb.WorkspaceClient() + w := b.WorkspaceClient() _, err = w.Volumes.ReadByName(ctx, volumeFullName) if errors.Is(err, apierr.ErrPermissionDenied) { return wrapErrorMsg(fmt.Sprintf("cannot access volume %s: %s", volumeFullName, err)) } if errors.Is(err, apierr.ErrNotFound) { - path, locations, ok := findVolumeInBundle(rb.Config(), catalogName, schemaName, volumeName) + path, locations, ok := findVolumeInBundle(b.Config, catalogName, schemaName, volumeName) if !ok { return wrapErrorMsg(fmt.Sprintf("volume %s does not exist", volumeFullName)) } @@ -117,7 +117,7 @@ func (v *validateArtifactPath) Apply(ctx context.Context, rb bundle.ReadOnlyBund this bundle but which has not been deployed yet. Please first deploy the volume using 'bundle deploy' and then switch over to using it in the artifact_path.`, - Locations: slices.Concat(rb.Config().GetLocations("workspace.artifact_path"), locations), + Locations: slices.Concat(b.Config.GetLocations("workspace.artifact_path"), locations), Paths: append([]dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, path), }} diff --git a/bundle/config/validate/validate_artifact_path_test.go b/bundle/config/validate/validate_artifact_path_test.go index e1ae6af34..3f998567b 100644 --- a/bundle/config/validate/validate_artifact_path_test.go +++ b/bundle/config/validate/validate_artifact_path_test.go @@ -49,7 +49,7 @@ func TestValidateArtifactPathWithVolumeInBundle(t *testing.T) { }) b.SetWorkpaceClient(m.WorkspaceClient) - diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), ValidateArtifactPath()) + diags := ValidateArtifactPath().Apply(ctx, b) assert.Equal(t, diag.Diagnostics{{ Severity: diag.Error, Summary: "volume catalogN.schemaN.volumeN does not exist", @@ -88,7 +88,6 @@ func TestValidateArtifactPath(t *testing.T) { }}, diags) } - rb := bundle.ReadOnly(b) ctx := context.Background() tcases := []struct { @@ -123,7 +122,7 @@ func TestValidateArtifactPath(t *testing.T) { api.EXPECT().ReadByName(mock.Anything, "catalogN.schemaN.volumeN").Return(nil, tc.err) b.SetWorkpaceClient(m.WorkspaceClient) - diags := bundle.ApplyReadOnly(ctx, rb, ValidateArtifactPath()) + diags := ValidateArtifactPath().Apply(ctx, b) assertDiags(t, diags, tc.expectedSummary) } } @@ -167,7 +166,7 @@ func TestValidateArtifactPathWithInvalidPaths(t *testing.T) { bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}}) - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), ValidateArtifactPath()) + diags := ValidateArtifactPath().Apply(context.Background(), b) require.Equal(t, diag.Diagnostics{{ Severity: diag.Error, Summary: "expected UC volume path to be in the format /Volumes////..., got " + p, diff --git a/bundle/config/validate/validate_sync_patterns.go b/bundle/config/validate/validate_sync_patterns.go index 04acd28ab..04df36e4f 100644 --- a/bundle/config/validate/validate_sync_patterns.go +++ b/bundle/config/validate/validate_sync_patterns.go @@ -17,24 +17,24 @@ func ValidateSyncPatterns() bundle.ReadOnlyMutator { return &validateSyncPatterns{} } -type validateSyncPatterns struct{} +type validateSyncPatterns struct{ bundle.RO } func (v *validateSyncPatterns) Name() string { return "validate:validate_sync_patterns" } -func (v *validateSyncPatterns) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics { - s := rb.Config().Sync +func (v *validateSyncPatterns) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + s := b.Config.Sync if len(s.Exclude) == 0 && len(s.Include) == 0 { return nil } - diags, err := checkPatterns(s.Exclude, "sync.exclude", rb) + diags, err := checkPatterns(s.Exclude, "sync.exclude", b) if err != nil { return diag.FromErr(err) } - includeDiags, err := checkPatterns(s.Include, "sync.include", rb) + includeDiags, err := checkPatterns(s.Include, "sync.include", b) if err != nil { return diag.FromErr(err) } @@ -42,7 +42,7 @@ func (v *validateSyncPatterns) Apply(ctx context.Context, rb bundle.ReadOnlyBund return diags.Extend(includeDiags) } -func checkPatterns(patterns []string, path string, rb bundle.ReadOnlyBundle) (diag.Diagnostics, error) { +func checkPatterns(patterns []string, path string, b *bundle.Bundle) (diag.Diagnostics, error) { var mu sync.Mutex var errs errgroup.Group var diags diag.Diagnostics @@ -55,7 +55,7 @@ func checkPatterns(patterns []string, path string, rb bundle.ReadOnlyBundle) (di // it means: do not include these files into result set p := strings.TrimPrefix(pattern, "!") errs.Go(func() error { - fs, err := fileset.NewGlobSet(rb.BundleRoot(), []string{p}) + fs, err := fileset.NewGlobSet(b.BundleRoot, []string{p}) if err != nil { return err } @@ -66,13 +66,13 @@ func checkPatterns(patterns []string, path string, rb bundle.ReadOnlyBundle) (di } if len(all) == 0 { - loc := location{path: fmt.Sprintf("%s[%d]", path, index), rb: rb} + path := fmt.Sprintf("%s[%d]", path, index) mu.Lock() diags = diags.Append(diag.Diagnostic{ Severity: diag.Warning, Summary: fmt.Sprintf("Pattern %s does not match any files", pattern), - Locations: []dyn.Location{loc.Location()}, - Paths: []dyn.Path{loc.Path()}, + Locations: b.Config.GetLocations(path), + Paths: []dyn.Path{dyn.MustPathFromString(path)}, }) mu.Unlock() } diff --git a/bundle/deploy/files/delete.go b/bundle/deploy/files/delete.go index bb28c2722..971186d5b 100644 --- a/bundle/deploy/files/delete.go +++ b/bundle/deploy/files/delete.go @@ -40,7 +40,7 @@ func (m *delete) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { } func deleteSnapshotFile(ctx context.Context, b *bundle.Bundle) error { - opts, err := GetSyncOptions(ctx, bundle.ReadOnly(b)) + opts, err := GetSyncOptions(ctx, b) if err != nil { return fmt.Errorf("cannot get sync options: %w", err) } diff --git a/bundle/deploy/files/sync.go b/bundle/deploy/files/sync.go index e3abc5fef..a3ead13a4 100644 --- a/bundle/deploy/files/sync.go +++ b/bundle/deploy/files/sync.go @@ -8,43 +8,43 @@ import ( "github.com/databricks/cli/libs/sync" ) -func GetSync(ctx context.Context, rb bundle.ReadOnlyBundle) (*sync.Sync, error) { - opts, err := GetSyncOptions(ctx, rb) +func GetSync(ctx context.Context, b *bundle.Bundle) (*sync.Sync, error) { + opts, err := GetSyncOptions(ctx, b) if err != nil { return nil, fmt.Errorf("cannot get sync options: %w", err) } return sync.New(ctx, *opts) } -func GetSyncOptions(ctx context.Context, rb bundle.ReadOnlyBundle) (*sync.SyncOptions, error) { - cacheDir, err := rb.CacheDir(ctx) +func GetSyncOptions(ctx context.Context, b *bundle.Bundle) (*sync.SyncOptions, error) { + cacheDir, err := b.CacheDir(ctx) if err != nil { return nil, fmt.Errorf("cannot get bundle cache directory: %w", err) } - includes, err := rb.GetSyncIncludePatterns(ctx) + includes, err := b.GetSyncIncludePatterns(ctx) if err != nil { return nil, fmt.Errorf("cannot get list of sync includes: %w", err) } opts := &sync.SyncOptions{ - WorktreeRoot: rb.WorktreeRoot(), - LocalRoot: rb.SyncRoot(), - Paths: rb.Config().Sync.Paths, + WorktreeRoot: b.WorktreeRoot, + LocalRoot: b.SyncRoot, + Paths: b.Config.Sync.Paths, Include: includes, - Exclude: rb.Config().Sync.Exclude, + Exclude: b.Config.Sync.Exclude, - RemotePath: rb.Config().Workspace.FilePath, - Host: rb.WorkspaceClient().Config.Host, + RemotePath: b.Config.Workspace.FilePath, + Host: b.WorkspaceClient().Config.Host, Full: false, SnapshotBasePath: cacheDir, - WorkspaceClient: rb.WorkspaceClient(), + WorkspaceClient: b.WorkspaceClient(), } - if rb.Config().Workspace.CurrentUser != nil { - opts.CurrentUser = rb.Config().Workspace.CurrentUser.User + if b.Config.Workspace.CurrentUser != nil { + opts.CurrentUser = b.Config.Workspace.CurrentUser.User } return opts, nil diff --git a/bundle/deploy/files/upload.go b/bundle/deploy/files/upload.go index 452850dc4..bb46c97c9 100644 --- a/bundle/deploy/files/upload.go +++ b/bundle/deploy/files/upload.go @@ -30,7 +30,7 @@ func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { } cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath)) - opts, err := GetSyncOptions(ctx, bundle.ReadOnly(b)) + opts, err := GetSyncOptions(ctx, b) if err != nil { return diag.FromErr(err) } diff --git a/bundle/deploy/state_pull.go b/bundle/deploy/state_pull.go index 8fffca073..844dcb77e 100644 --- a/bundle/deploy/state_pull.go +++ b/bundle/deploy/state_pull.go @@ -85,7 +85,7 @@ func (s *statePull) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostic } // Create a new snapshot based on the deployment state file. - opts, err := files.GetSyncOptions(ctx, bundle.ReadOnly(b)) + opts, err := files.GetSyncOptions(ctx, b) if err != nil { return diag.FromErr(err) } diff --git a/bundle/deploy/state_pull_test.go b/bundle/deploy/state_pull_test.go index f38b71f6b..b3d838fce 100644 --- a/bundle/deploy/state_pull_test.go +++ b/bundle/deploy/state_pull_test.go @@ -92,7 +92,7 @@ func testStatePull(t *testing.T, opts statePullOpts) { } if opts.withExistingSnapshot { - opts, err := files.GetSyncOptions(ctx, bundle.ReadOnly(b)) + opts, err := files.GetSyncOptions(ctx, b) require.NoError(t, err) snapshotPath, err := sync.SnapshotPath(opts) @@ -134,7 +134,7 @@ func testStatePull(t *testing.T, opts statePullOpts) { } if opts.expects.snapshotState != nil { - syncOpts, err := files.GetSyncOptions(ctx, bundle.ReadOnly(b)) + syncOpts, err := files.GetSyncOptions(ctx, b) require.NoError(t, err) snapshotPath, err := sync.SnapshotPath(syncOpts) diff --git a/bundle/mutator_read_only.go b/bundle/mutator_read_only.go index 700a90d8d..87857022d 100644 --- a/bundle/mutator_read_only.go +++ b/bundle/mutator_read_only.go @@ -2,28 +2,59 @@ package bundle import ( "context" + "sync" "github.com/databricks/cli/libs/diag" "github.com/databricks/cli/libs/log" ) -// ReadOnlyMutator is the interface type that allows access to bundle configuration but does not allow any mutations. type ReadOnlyMutator interface { - // Name returns the mutators name. - Name() string + Mutator - // Apply access the specified read-only bundle object. - Apply(context.Context, ReadOnlyBundle) diag.Diagnostics + // This is just tag, to differentiate this interface from bundle.Mutator + // This prevents non-readonly mutators being passed to ApplyParallel(). + IsRO() } -func ApplyReadOnly(ctx context.Context, rb ReadOnlyBundle, m ReadOnlyMutator) diag.Diagnostics { - ctx = log.NewContext(ctx, log.GetLogger(ctx).With("mutator (read-only)", m.Name())) +// Helper to mark the mutator as "read-only" +type RO struct{} - log.Debugf(ctx, "ApplyReadOnly") - diags := m.Apply(ctx, rb) - if err := diags.Error(); err != nil { - log.Debugf(ctx, "Error: %s", err) +func (*RO) IsRO() {} + +// Run mutators in parallel. Unlike Apply and ApplySeq, this does not perform sync between +// dynamic and static configuration. +// Warning: none of the mutators involved must modify bundle directly or indirectly. In particular, +// they must not call bundle.Apply or bundle.ApplySeq because those include writes to config even if mutator does not. +// Deprecated: do not use for new use cases. Refactor your parallel task not to depend on bundle at all. +func ApplyParallel(ctx context.Context, b *Bundle, mutators ...ReadOnlyMutator) diag.Diagnostics { + var allDiags diag.Diagnostics + resultsChan := make(chan diag.Diagnostics, len(mutators)) + var wg sync.WaitGroup + + contexts := make([]context.Context, len(mutators)) + + for ind, m := range mutators { + contexts[ind] = log.NewContext(ctx, log.GetLogger(ctx).With("mutator", m.Name())) + // log right away to have deterministic order of log messages + log.Debug(contexts[ind], "ApplyParallel") } - return diags + for ind, m := range mutators { + wg.Add(1) + go func() { + defer wg.Done() + // We're not using bundle.Apply here because we don't do copy between typed and dynamic values + resultsChan <- m.Apply(contexts[ind], b) + }() + } + + wg.Wait() + close(resultsChan) + + // Collect results into a single slice + for diags := range resultsChan { + allDiags = append(allDiags, diags...) + } + + return allDiags } diff --git a/bundle/parallel.go b/bundle/parallel.go deleted file mode 100644 index ebb91661a..000000000 --- a/bundle/parallel.go +++ /dev/null @@ -1,43 +0,0 @@ -package bundle - -import ( - "context" - "sync" - - "github.com/databricks/cli/libs/diag" -) - -type parallel struct { - mutators []ReadOnlyMutator -} - -func (m *parallel) Name() string { - return "parallel" -} - -func (m *parallel) Apply(ctx context.Context, rb ReadOnlyBundle) diag.Diagnostics { - var wg sync.WaitGroup - var mu sync.Mutex - var diags diag.Diagnostics - - wg.Add(len(m.mutators)) - for _, mutator := range m.mutators { - go func(mutator ReadOnlyMutator) { - defer wg.Done() - d := ApplyReadOnly(ctx, rb, mutator) - - mu.Lock() - diags = diags.Extend(d) - mu.Unlock() - }(mutator) - } - wg.Wait() - return diags -} - -// Parallel runs the given mutators in parallel. -func Parallel(mutators ...ReadOnlyMutator) ReadOnlyMutator { - return ¶llel{ - mutators: mutators, - } -} diff --git a/bundle/parallel_test.go b/bundle/parallel_test.go deleted file mode 100644 index dfc7ddac9..000000000 --- a/bundle/parallel_test.go +++ /dev/null @@ -1,82 +0,0 @@ -package bundle - -import ( - "context" - "sync" - "testing" - - "github.com/databricks/cli/bundle/config" - "github.com/databricks/cli/libs/diag" - "github.com/stretchr/testify/require" -) - -type addToContainer struct { - t *testing.T - container *[]int - value int - err bool - - // mu is a mutex that protects container. It is used to ensure that the - // container slice is only modified by one goroutine at a time. - mu *sync.Mutex -} - -func (m *addToContainer) Apply(ctx context.Context, b ReadOnlyBundle) diag.Diagnostics { - if m.err { - return diag.Errorf("error") - } - - m.mu.Lock() - *m.container = append(*m.container, m.value) - m.mu.Unlock() - - return nil -} - -func (m *addToContainer) Name() string { - return "addToContainer" -} - -func TestParallelMutatorWork(t *testing.T) { - b := &Bundle{ - Config: config.Root{}, - } - - container := []int{} - var mu sync.Mutex - m1 := &addToContainer{t: t, container: &container, value: 1, mu: &mu} - m2 := &addToContainer{t: t, container: &container, value: 2, mu: &mu} - m3 := &addToContainer{t: t, container: &container, value: 3, mu: &mu} - - m := Parallel(m1, m2, m3) - - // Apply the mutator - diags := ApplyReadOnly(context.Background(), ReadOnly(b), m) - require.Empty(t, diags) - require.Len(t, container, 3) - require.Contains(t, container, 1) - require.Contains(t, container, 2) - require.Contains(t, container, 3) -} - -func TestParallelMutatorWorkWithErrors(t *testing.T) { - b := &Bundle{ - Config: config.Root{}, - } - - container := []int{} - var mu sync.Mutex - m1 := &addToContainer{container: &container, value: 1, mu: &mu} - m2 := &addToContainer{container: &container, err: true, value: 2, mu: &mu} - m3 := &addToContainer{container: &container, value: 3, mu: &mu} - - m := Parallel(m1, m2, m3) - - // Apply the mutator - diags := ApplyReadOnly(context.Background(), ReadOnly(b), m) - require.Len(t, diags, 1) - require.Equal(t, "error", diags[0].Summary) - require.Len(t, container, 2) - require.Contains(t, container, 1) - require.Contains(t, container, 3) -} diff --git a/bundle/tests/job_cluster_key_test.go b/bundle/tests/job_cluster_key_test.go index 6a08da89c..0306bf7b5 100644 --- a/bundle/tests/job_cluster_key_test.go +++ b/bundle/tests/job_cluster_key_test.go @@ -13,7 +13,7 @@ import ( func TestJobClusterKeyNotDefinedTest(t *testing.T) { b := loadTarget(t, "./job_cluster_key", "default") - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.JobClusterKeyDefined()) + diags := bundle.Apply(context.Background(), b, validate.JobClusterKeyDefined()) require.Len(t, diags, 1) require.NoError(t, diags.Error()) require.Equal(t, diag.Warning, diags[0].Severity) @@ -23,6 +23,6 @@ func TestJobClusterKeyNotDefinedTest(t *testing.T) { func TestJobClusterKeyDefinedTest(t *testing.T) { b := loadTarget(t, "./job_cluster_key", "development") - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.JobClusterKeyDefined()) + diags := bundle.Apply(context.Background(), b, validate.JobClusterKeyDefined()) require.Empty(t, diags) } diff --git a/bundle/tests/sync_include_exclude_no_matches_test.go b/bundle/tests/sync_include_exclude_no_matches_test.go index c206e7471..ed929518a 100644 --- a/bundle/tests/sync_include_exclude_no_matches_test.go +++ b/bundle/tests/sync_include_exclude_no_matches_test.go @@ -16,7 +16,7 @@ import ( func TestSyncIncludeExcludeNoMatchesTest(t *testing.T) { b := loadTarget(t, "./sync/override", "development") - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.ValidateSyncPatterns()) + diags := bundle.Apply(context.Background(), b, validate.ValidateSyncPatterns()) require.Len(t, diags, 3) require.NoError(t, diags.Error()) @@ -46,7 +46,7 @@ func TestSyncIncludeExcludeNoMatchesTest(t *testing.T) { func TestSyncIncludeWithNegate(t *testing.T) { b := loadTarget(t, "./sync/negate", "default") - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.ValidateSyncPatterns()) + diags := bundle.Apply(context.Background(), b, validate.ValidateSyncPatterns()) require.Empty(t, diags) require.NoError(t, diags.Error()) } @@ -54,7 +54,7 @@ func TestSyncIncludeWithNegate(t *testing.T) { func TestSyncIncludeWithNegateNoMatches(t *testing.T) { b := loadTarget(t, "./sync/negate", "dev") - diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.ValidateSyncPatterns()) + diags := bundle.Apply(context.Background(), b, validate.ValidateSyncPatterns()) require.Len(t, diags, 1) require.NoError(t, diags.Error()) diff --git a/cmd/bundle/sync.go b/cmd/bundle/sync.go index 3ada07b74..25475206d 100644 --- a/cmd/bundle/sync.go +++ b/cmd/bundle/sync.go @@ -25,7 +25,7 @@ type syncFlags struct { } func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOptions, error) { - opts, err := files.GetSyncOptions(cmd.Context(), bundle.ReadOnly(b)) + opts, err := files.GetSyncOptions(cmd.Context(), b) if err != nil { return nil, fmt.Errorf("cannot get sync options: %w", err) } diff --git a/cmd/bundle/validate.go b/cmd/bundle/validate.go index 0ff9c7867..0a902806f 100644 --- a/cmd/bundle/validate.go +++ b/cmd/bundle/validate.go @@ -50,7 +50,7 @@ func newValidateCommand() *cobra.Command { } if !diags.HasError() { - diags = diags.Extend(bundle.Apply(ctx, b, validate.Validate())) + diags = diags.Extend(validate.Validate(ctx, b)) } switch root.OutputType(cmd) { diff --git a/cmd/sync/sync.go b/cmd/sync/sync.go index dea40f96a..0f4a4aacc 100644 --- a/cmd/sync/sync.go +++ b/cmd/sync/sync.go @@ -33,7 +33,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b * return nil, errors.New("SRC and DST are not configurable in the context of a bundle") } - opts, err := files.GetSyncOptions(cmd.Context(), bundle.ReadOnly(b)) + opts, err := files.GetSyncOptions(cmd.Context(), b) if err != nil { return nil, fmt.Errorf("cannot get sync options: %w", err) }