mirror of https://github.com/databricks/cli.git
Remove bundle.{Parallel,ReadOnlyBundle} (#2414)
## Changes - Remove bundle.Parallel & bundle.ReadOnlyBundle. - Add bundle.ApplyParallel, as a helper to migrate from bundle.Parallel. - Keep ReadOnlyMutator as a separate type but it's now a subtype of Mutator so it works on regular *Bundle. Having it as a separate type prevents non-readonly mutators being passed to ApplyParallel - validate.Validate becomes a function (was Mutator). ## Why This a follow up to #2390 where we removed most of the tools to construct chains of mutators. Same motivation applies here. When it comes to read-only bundles, it's a leaky abstraction -- since it's a shallow copy, it does not actually guarantee or enforce readonly access to bundle. A better approach would be to run parallel operations on independent narrowly-focused deep-copied structs, with just enough information to carry out the task (this is not implemented here, but the eventual goal). Now that we can just write regular code in phases and not limited to mutator interface, we can switch to that approach. ## Tests Existing tests. --------- Co-authored-by: shreyas-goenka <88374338+shreyas-goenka@users.noreply.github.com>
This commit is contained in:
parent
2c5b61538d
commit
e4cd782852
|
@ -1,15 +0,0 @@
|
|||
10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel
|
||||
10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly)
|
||||
10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) mutator (read-only)=parallel
|
||||
10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) mutator (read-only)=parallel mutator (read-only)=validate:SingleNodeCluster
|
||||
10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) mutator (read-only)=parallel mutator (read-only)=validate:artifact_paths
|
||||
10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) mutator (read-only)=parallel mutator (read-only)=validate:job_cluster_key_defined
|
||||
10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=fast_validate(readonly) mutator (read-only)=parallel mutator (read-only)=validate:job_task_cluster_spec
|
||||
10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync
|
||||
10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:folder_permissions
|
||||
10:07:59 Debug: ApplyReadOnly pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:validate_sync_patterns
|
||||
10:07:59 Debug: Path /Workspace/Users/[USERNAME]/.bundle/debug/default/files has type directory (ID: 0) pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync
|
||||
10:07:59 Debug: non-retriable error: Workspace path not found pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync sdk=true
|
||||
< HTTP/0.0 000 OK pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync sdk=true
|
||||
< } pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync sdk=true
|
||||
< } pid=12345 mutator=validate mutator (read-only)=parallel mutator (read-only)=validate:files_to_sync sdk=true
|
|
@ -72,18 +72,30 @@
|
|||
10:07:59 Debug: Environment variables for Terraform: ...redacted... pid=12345 mutator=terraform.Initialize
|
||||
10:07:59 Debug: Apply pid=12345 mutator=scripts.postinit
|
||||
10:07:59 Debug: No script defined for postinit, skipping pid=12345 mutator=scripts.postinit
|
||||
10:07:59 Debug: Apply pid=12345 mutator=validate
|
||||
10:07:59 Debug: ApplyParallel pid=12345 mutator=fast_validate(readonly)
|
||||
10:07:59 Debug: ApplyParallel pid=12345 mutator=validate:files_to_sync
|
||||
10:07:59 Debug: ApplyParallel pid=12345 mutator=validate:folder_permissions
|
||||
10:07:59 Debug: ApplyParallel pid=12345 mutator=validate:validate_sync_patterns
|
||||
10:07:59 Debug: ApplyParallel pid=12345 mutator=fast_validate(readonly) mutator=validate:job_cluster_key_defined
|
||||
10:07:59 Debug: ApplyParallel pid=12345 mutator=fast_validate(readonly) mutator=validate:job_task_cluster_spec
|
||||
10:07:59 Debug: ApplyParallel pid=12345 mutator=fast_validate(readonly) mutator=validate:SingleNodeCluster
|
||||
10:07:59 Debug: ApplyParallel pid=12345 mutator=fast_validate(readonly) mutator=validate:artifact_paths
|
||||
10:07:59 Debug: GET /api/2.0/workspace/get-status?path=/Workspace/Users/[USERNAME]/.bundle/debug/default/files
|
||||
< HTTP/1.1 404 Not Found
|
||||
< {
|
||||
< "message": "Workspace path not found"
|
||||
< } pid=12345 mutator=validate:files_to_sync sdk=true
|
||||
10:07:59 Debug: non-retriable error: Workspace path not found pid=12345 mutator=validate:files_to_sync sdk=true
|
||||
10:07:59 Debug: POST /api/2.0/workspace/mkdirs
|
||||
> {
|
||||
> "path": "/Workspace/Users/[USERNAME]/.bundle/debug/default/files"
|
||||
> }
|
||||
< HTTP/1.1 200 OK pid=12345 mutator=validate:files_to_sync sdk=true
|
||||
10:07:59 Debug: GET /api/2.0/workspace/get-status?path=/Workspace/Users/[USERNAME]/.bundle/debug/default/files
|
||||
< HTTP/1.1 200 OK
|
||||
< {
|
||||
< "object_type": "DIRECTORY",
|
||||
< "path": "/Workspace/Users/[USERNAME]/.bundle/debug/default/files"
|
||||
< } pid=12345 mutator=validate:files_to_sync sdk=true
|
||||
10:07:59 Debug: Path /Workspace/Users/[USERNAME]/.bundle/debug/default/files has type directory (ID: 0) pid=12345 mutator=validate:files_to_sync
|
||||
10:07:59 Info: completed execution pid=12345 exit_code=0
|
||||
|
|
|
@ -1,4 +1 @@
|
|||
$CLI bundle validate --debug 2> full.stderr.txt
|
||||
grep -vw parallel full.stderr.txt > out.stderr.txt
|
||||
grep -w parallel full.stderr.txt | sed 's/[0-9]/0/g' | sort_lines.py > out.stderr.parallel.txt
|
||||
rm full.stderr.txt
|
||||
$CLI bundle validate --debug 2> out.stderr.txt
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
package bundle
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/libs/vfs"
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
)
|
||||
|
||||
type ReadOnlyBundle struct {
|
||||
b *Bundle
|
||||
}
|
||||
|
||||
func ReadOnly(b *Bundle) ReadOnlyBundle {
|
||||
return ReadOnlyBundle{b: b}
|
||||
}
|
||||
|
||||
func (r ReadOnlyBundle) Config() config.Root {
|
||||
return r.b.Config
|
||||
}
|
||||
|
||||
func (r ReadOnlyBundle) RootPath() string {
|
||||
return r.b.BundleRootPath
|
||||
}
|
||||
|
||||
func (r ReadOnlyBundle) BundleRoot() vfs.Path {
|
||||
return r.b.BundleRoot
|
||||
}
|
||||
|
||||
func (r ReadOnlyBundle) SyncRoot() vfs.Path {
|
||||
return r.b.SyncRoot
|
||||
}
|
||||
|
||||
func (r ReadOnlyBundle) WorktreeRoot() vfs.Path {
|
||||
return r.b.WorktreeRoot
|
||||
}
|
||||
|
||||
func (r ReadOnlyBundle) WorkspaceClient() *databricks.WorkspaceClient {
|
||||
return r.b.WorkspaceClient()
|
||||
}
|
||||
|
||||
func (r ReadOnlyBundle) CacheDir(ctx context.Context, paths ...string) (string, error) {
|
||||
return r.b.CacheDir(ctx, paths...)
|
||||
}
|
||||
|
||||
func (r ReadOnlyBundle) GetSyncIncludePatterns(ctx context.Context) ([]string, error) {
|
||||
return r.b.GetSyncIncludePatterns(ctx)
|
||||
}
|
|
@ -14,18 +14,18 @@ import (
|
|||
// 2. The validation is blocking for bundle deployments.
|
||||
//
|
||||
// The full suite of validation mutators is available in the [Validate] mutator.
|
||||
type fastValidateReadonly struct{}
|
||||
type fastValidate struct{ bundle.RO }
|
||||
|
||||
func FastValidateReadonly() bundle.ReadOnlyMutator {
|
||||
return &fastValidateReadonly{}
|
||||
func FastValidate() bundle.ReadOnlyMutator {
|
||||
return &fastValidate{}
|
||||
}
|
||||
|
||||
func (f *fastValidateReadonly) Name() string {
|
||||
func (f *fastValidate) Name() string {
|
||||
return "fast_validate(readonly)"
|
||||
}
|
||||
|
||||
func (f *fastValidateReadonly) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
return bundle.ApplyReadOnly(ctx, rb, bundle.Parallel(
|
||||
func (f *fastValidate) Apply(ctx context.Context, rb *bundle.Bundle) diag.Diagnostics {
|
||||
return bundle.ApplyParallel(ctx, rb,
|
||||
// Fast mutators with only in-memory checks
|
||||
JobClusterKeyDefined(),
|
||||
JobTaskClusterSpec(),
|
||||
|
@ -33,19 +33,5 @@ func (f *fastValidateReadonly) Apply(ctx context.Context, rb bundle.ReadOnlyBund
|
|||
|
||||
// Blocking mutators. Deployments will fail if these checks fail.
|
||||
ValidateArtifactPath(),
|
||||
))
|
||||
}
|
||||
|
||||
type fastValidate struct{}
|
||||
|
||||
func FastValidate() bundle.Mutator {
|
||||
return &fastValidate{}
|
||||
}
|
||||
|
||||
func (f *fastValidate) Name() string {
|
||||
return "fast_validate"
|
||||
}
|
||||
|
||||
func (f *fastValidate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
return bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), FastValidateReadonly())
|
||||
)
|
||||
}
|
||||
|
|
|
@ -13,20 +13,20 @@ func FilesToSync() bundle.ReadOnlyMutator {
|
|||
return &filesToSync{}
|
||||
}
|
||||
|
||||
type filesToSync struct{}
|
||||
type filesToSync struct{ bundle.RO }
|
||||
|
||||
func (v *filesToSync) Name() string {
|
||||
return "validate:files_to_sync"
|
||||
}
|
||||
|
||||
func (v *filesToSync) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
func (v *filesToSync) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
// The user may be intentional about not synchronizing any files.
|
||||
// In this case, we should not show any warnings.
|
||||
if len(rb.Config().Sync.Paths) == 0 {
|
||||
if len(b.Config.Sync.Paths) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sync, err := files.GetSync(ctx, rb)
|
||||
sync, err := files.GetSync(ctx, b)
|
||||
if err != nil {
|
||||
return diag.FromErr(err)
|
||||
}
|
||||
|
@ -42,20 +42,20 @@ func (v *filesToSync) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.
|
|||
}
|
||||
|
||||
diags := diag.Diagnostics{}
|
||||
if len(rb.Config().Sync.Exclude) == 0 {
|
||||
if len(b.Config.Sync.Exclude) == 0 {
|
||||
diags = diags.Append(diag.Diagnostic{
|
||||
Severity: diag.Warning,
|
||||
Summary: "There are no files to sync, please check your .gitignore",
|
||||
})
|
||||
} else {
|
||||
loc := location{path: "sync.exclude", rb: rb}
|
||||
path := "sync.exclude"
|
||||
diags = diags.Append(diag.Diagnostic{
|
||||
Severity: diag.Warning,
|
||||
Summary: "There are no files to sync, please check your .gitignore and sync.exclude configuration",
|
||||
// Show all locations where sync.exclude is defined, since merging
|
||||
// sync.exclude is additive.
|
||||
Locations: loc.Locations(),
|
||||
Paths: []dyn.Path{loc.Path()},
|
||||
Locations: b.Config.GetLocations(path),
|
||||
Paths: []dyn.Path{dyn.MustPathFromString(path)},
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -29,8 +29,7 @@ func TestFilesToSync_NoPaths(t *testing.T) {
|
|||
}
|
||||
|
||||
ctx := context.Background()
|
||||
rb := bundle.ReadOnly(b)
|
||||
diags := bundle.ApplyReadOnly(ctx, rb, FilesToSync())
|
||||
diags := FilesToSync().Apply(ctx, b)
|
||||
assert.Empty(t, diags)
|
||||
}
|
||||
|
||||
|
@ -85,8 +84,7 @@ func TestFilesToSync_EverythingIgnored(t *testing.T) {
|
|||
testutil.WriteFile(t, filepath.Join(b.BundleRootPath, ".gitignore"), "*\n.*\n")
|
||||
|
||||
ctx := context.Background()
|
||||
rb := bundle.ReadOnly(b)
|
||||
diags := bundle.ApplyReadOnly(ctx, rb, FilesToSync())
|
||||
diags := FilesToSync().Apply(ctx, b)
|
||||
require.Len(t, diags, 1)
|
||||
assert.Equal(t, diag.Warning, diags[0].Severity)
|
||||
assert.Equal(t, "There are no files to sync, please check your .gitignore", diags[0].Summary)
|
||||
|
@ -99,8 +97,7 @@ func TestFilesToSync_EverythingExcluded(t *testing.T) {
|
|||
b.Config.Sync.Exclude = []string{"*"}
|
||||
|
||||
ctx := context.Background()
|
||||
rb := bundle.ReadOnly(b)
|
||||
diags := bundle.ApplyReadOnly(ctx, rb, FilesToSync())
|
||||
diags := FilesToSync().Apply(ctx, b)
|
||||
require.Len(t, diags, 1)
|
||||
assert.Equal(t, diag.Warning, diags[0].Severity)
|
||||
assert.Equal(t, "There are no files to sync, please check your .gitignore and sync.exclude configuration", diags[0].Summary)
|
||||
|
|
|
@ -16,15 +16,14 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type folderPermissions struct{}
|
||||
type folderPermissions struct{ bundle.RO }
|
||||
|
||||
// Apply implements bundle.ReadOnlyMutator.
|
||||
func (f *folderPermissions) Apply(ctx context.Context, b bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
if len(b.Config().Permissions) == 0 {
|
||||
func (f *folderPermissions) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
if len(b.Config.Permissions) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
bundlePaths := paths.CollectUniqueWorkspacePathPrefixes(b.Config().Workspace)
|
||||
bundlePaths := paths.CollectUniqueWorkspacePathPrefixes(b.Config.Workspace)
|
||||
|
||||
var diags diag.Diagnostics
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
|
@ -48,7 +47,7 @@ func (f *folderPermissions) Apply(ctx context.Context, b bundle.ReadOnlyBundle)
|
|||
return diags
|
||||
}
|
||||
|
||||
func checkFolderPermission(ctx context.Context, b bundle.ReadOnlyBundle, folderPath string) diag.Diagnostics {
|
||||
func checkFolderPermission(ctx context.Context, b *bundle.Bundle, folderPath string) diag.Diagnostics {
|
||||
// If the folder is shared, then we don't need to check permissions as it was already checked in the other mutator before.
|
||||
if libraries.IsWorkspaceSharedPath(folderPath) {
|
||||
return nil
|
||||
|
@ -69,7 +68,7 @@ func checkFolderPermission(ctx context.Context, b bundle.ReadOnlyBundle, folderP
|
|||
}
|
||||
|
||||
p := permissions.ObjectAclToResourcePermissions(folderPath, objPermissions.AccessControlList)
|
||||
return p.Compare(b.Config().Permissions)
|
||||
return p.Compare(b.Config.Permissions)
|
||||
}
|
||||
|
||||
func getClosestExistingObject(ctx context.Context, w workspace.WorkspaceInterface, folderPath string) (*workspace.ObjectInfo, error) {
|
||||
|
|
|
@ -69,9 +69,7 @@ func TestFolderPermissionsInheritedWhenRootPathDoesNotExist(t *testing.T) {
|
|||
}, nil)
|
||||
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
rb := bundle.ReadOnly(b)
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), rb, ValidateFolderPermissions())
|
||||
diags := ValidateFolderPermissions().Apply(context.Background(), b)
|
||||
require.Empty(t, diags)
|
||||
}
|
||||
|
||||
|
@ -118,9 +116,7 @@ func TestValidateFolderPermissionsFailsOnMissingBundlePermission(t *testing.T) {
|
|||
}, nil)
|
||||
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
rb := bundle.ReadOnly(b)
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), rb, ValidateFolderPermissions())
|
||||
diags := ValidateFolderPermissions().Apply(context.Background(), b)
|
||||
require.Len(t, diags, 1)
|
||||
require.Equal(t, "untracked permissions apply to target workspace path", diags[0].Summary)
|
||||
require.Equal(t, diag.Warning, diags[0].Severity)
|
||||
|
@ -164,9 +160,7 @@ func TestValidateFolderPermissionsFailsOnPermissionMismatch(t *testing.T) {
|
|||
}, nil)
|
||||
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
rb := bundle.ReadOnly(b)
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), rb, ValidateFolderPermissions())
|
||||
diags := ValidateFolderPermissions().Apply(context.Background(), b)
|
||||
require.Len(t, diags, 1)
|
||||
require.Equal(t, "untracked permissions apply to target workspace path", diags[0].Summary)
|
||||
require.Equal(t, diag.Warning, diags[0].Severity)
|
||||
|
@ -199,9 +193,7 @@ func TestValidateFolderPermissionsFailsOnNoRootFolder(t *testing.T) {
|
|||
})
|
||||
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
rb := bundle.ReadOnly(b)
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), rb, ValidateFolderPermissions())
|
||||
diags := ValidateFolderPermissions().Apply(context.Background(), b)
|
||||
require.Len(t, diags, 1)
|
||||
require.Equal(t, "folder / and its parent folders do not exist", diags[0].Summary)
|
||||
require.Equal(t, diag.Error, diags[0].Severity)
|
||||
|
|
|
@ -13,16 +13,16 @@ func JobClusterKeyDefined() bundle.ReadOnlyMutator {
|
|||
return &jobClusterKeyDefined{}
|
||||
}
|
||||
|
||||
type jobClusterKeyDefined struct{}
|
||||
type jobClusterKeyDefined struct{ bundle.RO }
|
||||
|
||||
func (v *jobClusterKeyDefined) Name() string {
|
||||
return "validate:job_cluster_key_defined"
|
||||
}
|
||||
|
||||
func (v *jobClusterKeyDefined) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
func (v *jobClusterKeyDefined) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
diags := diag.Diagnostics{}
|
||||
|
||||
for k, job := range rb.Config().Resources.Jobs {
|
||||
for k, job := range b.Config.Resources.Jobs {
|
||||
jobClusterKeys := make(map[string]bool)
|
||||
for _, cluster := range job.JobClusters {
|
||||
if cluster.JobClusterKey != "" {
|
||||
|
@ -33,10 +33,7 @@ func (v *jobClusterKeyDefined) Apply(ctx context.Context, rb bundle.ReadOnlyBund
|
|||
for index, task := range job.Tasks {
|
||||
if task.JobClusterKey != "" {
|
||||
if _, ok := jobClusterKeys[task.JobClusterKey]; !ok {
|
||||
loc := location{
|
||||
path: fmt.Sprintf("resources.jobs.%s.tasks[%d].job_cluster_key", k, index),
|
||||
rb: rb,
|
||||
}
|
||||
path := fmt.Sprintf("resources.jobs.%s.tasks[%d].job_cluster_key", k, index)
|
||||
|
||||
diags = diags.Append(diag.Diagnostic{
|
||||
Severity: diag.Warning,
|
||||
|
@ -44,8 +41,8 @@ func (v *jobClusterKeyDefined) Apply(ctx context.Context, rb bundle.ReadOnlyBund
|
|||
// Show only the location where the job_cluster_key is defined.
|
||||
// Other associated locations are not relevant since they are
|
||||
// overridden during merging.
|
||||
Locations: []dyn.Location{loc.Location()},
|
||||
Paths: []dyn.Path{loc.Path()},
|
||||
Locations: b.Config.GetLocations(path),
|
||||
Paths: []dyn.Path{dyn.MustPathFromString(path)},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ func TestJobClusterKeyDefined(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobClusterKeyDefined())
|
||||
diags := JobClusterKeyDefined().Apply(context.Background(), b)
|
||||
require.Empty(t, diags)
|
||||
require.NoError(t, diags.Error())
|
||||
}
|
||||
|
@ -56,7 +56,7 @@ func TestJobClusterKeyNotDefined(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobClusterKeyDefined())
|
||||
diags := JobClusterKeyDefined().Apply(context.Background(), b)
|
||||
require.Len(t, diags, 1)
|
||||
require.NoError(t, diags.Error())
|
||||
require.Equal(t, diag.Warning, diags[0].Severity)
|
||||
|
@ -89,7 +89,7 @@ func TestJobClusterKeyDefinedInDifferentJob(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobClusterKeyDefined())
|
||||
diags := JobClusterKeyDefined().Apply(context.Background(), b)
|
||||
require.Len(t, diags, 1)
|
||||
require.NoError(t, diags.Error())
|
||||
require.Equal(t, diag.Warning, diags[0].Severity)
|
||||
|
|
|
@ -17,31 +17,31 @@ func JobTaskClusterSpec() bundle.ReadOnlyMutator {
|
|||
return &jobTaskClusterSpec{}
|
||||
}
|
||||
|
||||
type jobTaskClusterSpec struct{}
|
||||
type jobTaskClusterSpec struct{ bundle.RO }
|
||||
|
||||
func (v *jobTaskClusterSpec) Name() string {
|
||||
return "validate:job_task_cluster_spec"
|
||||
}
|
||||
|
||||
func (v *jobTaskClusterSpec) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
func (v *jobTaskClusterSpec) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
diags := diag.Diagnostics{}
|
||||
|
||||
jobsPath := dyn.NewPath(dyn.Key("resources"), dyn.Key("jobs"))
|
||||
|
||||
for resourceName, job := range rb.Config().Resources.Jobs {
|
||||
for resourceName, job := range b.Config.Resources.Jobs {
|
||||
resourcePath := jobsPath.Append(dyn.Key(resourceName))
|
||||
|
||||
for taskIndex, task := range job.Tasks {
|
||||
taskPath := resourcePath.Append(dyn.Key("tasks"), dyn.Index(taskIndex))
|
||||
|
||||
diags = diags.Extend(validateJobTask(rb, task, taskPath))
|
||||
diags = diags.Extend(validateJobTask(b, task, taskPath))
|
||||
}
|
||||
}
|
||||
|
||||
return diags
|
||||
}
|
||||
|
||||
func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics {
|
||||
func validateJobTask(b *bundle.Bundle, task jobs.Task, taskPath dyn.Path) diag.Diagnostics {
|
||||
diags := diag.Diagnostics{}
|
||||
|
||||
var specified []string
|
||||
|
@ -74,7 +74,7 @@ func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path
|
|||
if task.ForEachTask != nil {
|
||||
forEachTaskPath := taskPath.Append(dyn.Key("for_each_task"), dyn.Key("task"))
|
||||
|
||||
diags = diags.Extend(validateJobTask(rb, task.ForEachTask.Task, forEachTaskPath))
|
||||
diags = diags.Extend(validateJobTask(b, task.ForEachTask.Task, forEachTaskPath))
|
||||
}
|
||||
|
||||
if isComputeTask(task) && len(specified) == 0 {
|
||||
|
@ -92,7 +92,7 @@ func validateJobTask(rb bundle.ReadOnlyBundle, task jobs.Task, taskPath dyn.Path
|
|||
Severity: diag.Error,
|
||||
Summary: "Missing required cluster or environment settings",
|
||||
Detail: detail,
|
||||
Locations: rb.Config().GetLocations(taskPath.String()),
|
||||
Locations: b.Config.GetLocations(taskPath.String()),
|
||||
Paths: []dyn.Path{taskPath},
|
||||
})
|
||||
}
|
||||
|
|
|
@ -174,7 +174,7 @@ Specify one of the following fields: job_cluster_key, environment_key, existing_
|
|||
}
|
||||
|
||||
b := createBundle(map[string]*resources.Job{"job1": job})
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), JobTaskClusterSpec())
|
||||
diags := JobTaskClusterSpec().Apply(context.Background(), b)
|
||||
|
||||
if tc.errorPath != "" || tc.errorDetail != "" || tc.errorSummary != "" {
|
||||
assert.Len(t, diags, 1)
|
||||
|
|
|
@ -16,7 +16,7 @@ func SingleNodeCluster() bundle.ReadOnlyMutator {
|
|||
return &singleNodeCluster{}
|
||||
}
|
||||
|
||||
type singleNodeCluster struct{}
|
||||
type singleNodeCluster struct{ bundle.RO }
|
||||
|
||||
func (m *singleNodeCluster) Name() string {
|
||||
return "validate:SingleNodeCluster"
|
||||
|
@ -98,7 +98,7 @@ func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
func (m *singleNodeCluster) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
diags := diag.Diagnostics{}
|
||||
|
||||
patterns := []dyn.Pattern{
|
||||
|
@ -115,7 +115,7 @@ func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle)
|
|||
}
|
||||
|
||||
for _, p := range patterns {
|
||||
_, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
||||
_, err := dyn.MapByPattern(b.Config.Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
||||
warning := diag.Diagnostic{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
|
|
|
@ -116,7 +116,7 @@ func TestValidateSingleNodeClusterFailForInteractiveClusters(t *testing.T) {
|
|||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0))
|
||||
})
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
diags := SingleNodeCluster().Apply(ctx, b)
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
|
@ -165,7 +165,7 @@ func TestValidateSingleNodeClusterFailForJobClusters(t *testing.T) {
|
|||
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
diags := SingleNodeCluster().Apply(ctx, b)
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
|
@ -214,7 +214,7 @@ func TestValidateSingleNodeClusterFailForJobTaskClusters(t *testing.T) {
|
|||
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
diags := bundle.Apply(ctx, b, SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
|
@ -260,7 +260,7 @@ func TestValidateSingleNodeClusterFailForPipelineClusters(t *testing.T) {
|
|||
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
diags := bundle.Apply(ctx, b, SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
|
@ -313,7 +313,7 @@ func TestValidateSingleNodeClusterFailForJobForEachTaskCluster(t *testing.T) {
|
|||
return dyn.Set(v, "resources.jobs.foo.tasks[0].for_each_task.task.new_cluster.num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
diags := bundle.Apply(ctx, b, SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
|
@ -397,7 +397,7 @@ func TestValidateSingleNodeClusterPassInteractiveClusters(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
diags := bundle.Apply(ctx, b, SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
|
@ -437,7 +437,7 @@ func TestValidateSingleNodeClusterPassJobClusters(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
diags := bundle.Apply(ctx, b, SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
|
@ -477,7 +477,7 @@ func TestValidateSingleNodeClusterPassJobTaskClusters(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
diags := bundle.Apply(ctx, b, SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
|
@ -514,7 +514,7 @@ func TestValidateSingleNodeClusterPassPipelineClusters(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
diags := bundle.Apply(ctx, b, SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
|
@ -558,7 +558,7 @@ func TestValidateSingleNodeClusterPassJobForEachTaskCluster(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
diags := bundle.Apply(ctx, b, SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -5,46 +5,16 @@ import (
|
|||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
)
|
||||
|
||||
type validate struct{}
|
||||
|
||||
type location struct {
|
||||
path string
|
||||
rb bundle.ReadOnlyBundle
|
||||
}
|
||||
|
||||
func (l location) Location() dyn.Location {
|
||||
return l.rb.Config().GetLocation(l.path)
|
||||
}
|
||||
|
||||
func (l location) Locations() []dyn.Location {
|
||||
return l.rb.Config().GetLocations(l.path)
|
||||
}
|
||||
|
||||
func (l location) Path() dyn.Path {
|
||||
return dyn.MustPathFromString(l.path)
|
||||
}
|
||||
|
||||
// Apply implements bundle.Mutator.
|
||||
func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
return bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), bundle.Parallel(
|
||||
FastValidateReadonly(),
|
||||
func Validate(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
return bundle.ApplyParallel(ctx, b,
|
||||
FastValidate(),
|
||||
|
||||
// Slow mutators that require network or file i/o. These are only
|
||||
// run in the `bundle validate` command.
|
||||
FilesToSync(),
|
||||
ValidateFolderPermissions(),
|
||||
ValidateSyncPatterns(),
|
||||
))
|
||||
}
|
||||
|
||||
// Name implements bundle.Mutator.
|
||||
func (v *validate) Name() string {
|
||||
return "validate"
|
||||
}
|
||||
|
||||
func Validate() bundle.Mutator {
|
||||
return &validate{}
|
||||
)
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ import (
|
|||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
)
|
||||
|
||||
type validateArtifactPath struct{}
|
||||
type validateArtifactPath struct{ bundle.RO }
|
||||
|
||||
func ValidateArtifactPath() bundle.ReadOnlyMutator {
|
||||
return &validateArtifactPath{}
|
||||
|
@ -74,9 +74,9 @@ func findVolumeInBundle(r config.Root, catalogName, schemaName, volumeName strin
|
|||
return nil, nil, false
|
||||
}
|
||||
|
||||
func (v *validateArtifactPath) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
func (v *validateArtifactPath) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
// We only validate UC Volumes paths right now.
|
||||
if !libraries.IsVolumesPath(rb.Config().Workspace.ArtifactPath) {
|
||||
if !libraries.IsVolumesPath(b.Config.Workspace.ArtifactPath) {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -85,25 +85,25 @@ func (v *validateArtifactPath) Apply(ctx context.Context, rb bundle.ReadOnlyBund
|
|||
{
|
||||
Summary: s,
|
||||
Severity: diag.Error,
|
||||
Locations: rb.Config().GetLocations("workspace.artifact_path"),
|
||||
Locations: b.Config.GetLocations("workspace.artifact_path"),
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("workspace.artifact_path")},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
catalogName, schemaName, volumeName, err := extractVolumeFromPath(rb.Config().Workspace.ArtifactPath)
|
||||
catalogName, schemaName, volumeName, err := extractVolumeFromPath(b.Config.Workspace.ArtifactPath)
|
||||
if err != nil {
|
||||
return wrapErrorMsg(err.Error())
|
||||
}
|
||||
volumeFullName := fmt.Sprintf("%s.%s.%s", catalogName, schemaName, volumeName)
|
||||
w := rb.WorkspaceClient()
|
||||
w := b.WorkspaceClient()
|
||||
_, err = w.Volumes.ReadByName(ctx, volumeFullName)
|
||||
|
||||
if errors.Is(err, apierr.ErrPermissionDenied) {
|
||||
return wrapErrorMsg(fmt.Sprintf("cannot access volume %s: %s", volumeFullName, err))
|
||||
}
|
||||
if errors.Is(err, apierr.ErrNotFound) {
|
||||
path, locations, ok := findVolumeInBundle(rb.Config(), catalogName, schemaName, volumeName)
|
||||
path, locations, ok := findVolumeInBundle(b.Config, catalogName, schemaName, volumeName)
|
||||
if !ok {
|
||||
return wrapErrorMsg(fmt.Sprintf("volume %s does not exist", volumeFullName))
|
||||
}
|
||||
|
@ -117,7 +117,7 @@ func (v *validateArtifactPath) Apply(ctx context.Context, rb bundle.ReadOnlyBund
|
|||
this bundle but which has not been deployed yet. Please first deploy
|
||||
the volume using 'bundle deploy' and then switch over to using it in
|
||||
the artifact_path.`,
|
||||
Locations: slices.Concat(rb.Config().GetLocations("workspace.artifact_path"), locations),
|
||||
Locations: slices.Concat(b.Config.GetLocations("workspace.artifact_path"), locations),
|
||||
Paths: append([]dyn.Path{dyn.MustPathFromString("workspace.artifact_path")}, path),
|
||||
}}
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ func TestValidateArtifactPathWithVolumeInBundle(t *testing.T) {
|
|||
})
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), ValidateArtifactPath())
|
||||
diags := ValidateArtifactPath().Apply(ctx, b)
|
||||
assert.Equal(t, diag.Diagnostics{{
|
||||
Severity: diag.Error,
|
||||
Summary: "volume catalogN.schemaN.volumeN does not exist",
|
||||
|
@ -88,7 +88,6 @@ func TestValidateArtifactPath(t *testing.T) {
|
|||
}}, diags)
|
||||
}
|
||||
|
||||
rb := bundle.ReadOnly(b)
|
||||
ctx := context.Background()
|
||||
|
||||
tcases := []struct {
|
||||
|
@ -123,7 +122,7 @@ func TestValidateArtifactPath(t *testing.T) {
|
|||
api.EXPECT().ReadByName(mock.Anything, "catalogN.schemaN.volumeN").Return(nil, tc.err)
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, rb, ValidateArtifactPath())
|
||||
diags := ValidateArtifactPath().Apply(ctx, b)
|
||||
assertDiags(t, diags, tc.expectedSummary)
|
||||
}
|
||||
}
|
||||
|
@ -167,7 +166,7 @@ func TestValidateArtifactPathWithInvalidPaths(t *testing.T) {
|
|||
|
||||
bundletest.SetLocation(b, "workspace.artifact_path", []dyn.Location{{File: "config.yml", Line: 1, Column: 2}})
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), ValidateArtifactPath())
|
||||
diags := ValidateArtifactPath().Apply(context.Background(), b)
|
||||
require.Equal(t, diag.Diagnostics{{
|
||||
Severity: diag.Error,
|
||||
Summary: "expected UC volume path to be in the format /Volumes/<catalog>/<schema>/<volume>/..., got " + p,
|
||||
|
|
|
@ -17,24 +17,24 @@ func ValidateSyncPatterns() bundle.ReadOnlyMutator {
|
|||
return &validateSyncPatterns{}
|
||||
}
|
||||
|
||||
type validateSyncPatterns struct{}
|
||||
type validateSyncPatterns struct{ bundle.RO }
|
||||
|
||||
func (v *validateSyncPatterns) Name() string {
|
||||
return "validate:validate_sync_patterns"
|
||||
}
|
||||
|
||||
func (v *validateSyncPatterns) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
s := rb.Config().Sync
|
||||
func (v *validateSyncPatterns) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
s := b.Config.Sync
|
||||
if len(s.Exclude) == 0 && len(s.Include) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
diags, err := checkPatterns(s.Exclude, "sync.exclude", rb)
|
||||
diags, err := checkPatterns(s.Exclude, "sync.exclude", b)
|
||||
if err != nil {
|
||||
return diag.FromErr(err)
|
||||
}
|
||||
|
||||
includeDiags, err := checkPatterns(s.Include, "sync.include", rb)
|
||||
includeDiags, err := checkPatterns(s.Include, "sync.include", b)
|
||||
if err != nil {
|
||||
return diag.FromErr(err)
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ func (v *validateSyncPatterns) Apply(ctx context.Context, rb bundle.ReadOnlyBund
|
|||
return diags.Extend(includeDiags)
|
||||
}
|
||||
|
||||
func checkPatterns(patterns []string, path string, rb bundle.ReadOnlyBundle) (diag.Diagnostics, error) {
|
||||
func checkPatterns(patterns []string, path string, b *bundle.Bundle) (diag.Diagnostics, error) {
|
||||
var mu sync.Mutex
|
||||
var errs errgroup.Group
|
||||
var diags diag.Diagnostics
|
||||
|
@ -55,7 +55,7 @@ func checkPatterns(patterns []string, path string, rb bundle.ReadOnlyBundle) (di
|
|||
// it means: do not include these files into result set
|
||||
p := strings.TrimPrefix(pattern, "!")
|
||||
errs.Go(func() error {
|
||||
fs, err := fileset.NewGlobSet(rb.BundleRoot(), []string{p})
|
||||
fs, err := fileset.NewGlobSet(b.BundleRoot, []string{p})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -66,13 +66,13 @@ func checkPatterns(patterns []string, path string, rb bundle.ReadOnlyBundle) (di
|
|||
}
|
||||
|
||||
if len(all) == 0 {
|
||||
loc := location{path: fmt.Sprintf("%s[%d]", path, index), rb: rb}
|
||||
path := fmt.Sprintf("%s[%d]", path, index)
|
||||
mu.Lock()
|
||||
diags = diags.Append(diag.Diagnostic{
|
||||
Severity: diag.Warning,
|
||||
Summary: fmt.Sprintf("Pattern %s does not match any files", pattern),
|
||||
Locations: []dyn.Location{loc.Location()},
|
||||
Paths: []dyn.Path{loc.Path()},
|
||||
Locations: b.Config.GetLocations(path),
|
||||
Paths: []dyn.Path{dyn.MustPathFromString(path)},
|
||||
})
|
||||
mu.Unlock()
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ func (m *delete) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
|||
}
|
||||
|
||||
func deleteSnapshotFile(ctx context.Context, b *bundle.Bundle) error {
|
||||
opts, err := GetSyncOptions(ctx, bundle.ReadOnly(b))
|
||||
opts, err := GetSyncOptions(ctx, b)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot get sync options: %w", err)
|
||||
}
|
||||
|
|
|
@ -8,43 +8,43 @@ import (
|
|||
"github.com/databricks/cli/libs/sync"
|
||||
)
|
||||
|
||||
func GetSync(ctx context.Context, rb bundle.ReadOnlyBundle) (*sync.Sync, error) {
|
||||
opts, err := GetSyncOptions(ctx, rb)
|
||||
func GetSync(ctx context.Context, b *bundle.Bundle) (*sync.Sync, error) {
|
||||
opts, err := GetSyncOptions(ctx, b)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get sync options: %w", err)
|
||||
}
|
||||
return sync.New(ctx, *opts)
|
||||
}
|
||||
|
||||
func GetSyncOptions(ctx context.Context, rb bundle.ReadOnlyBundle) (*sync.SyncOptions, error) {
|
||||
cacheDir, err := rb.CacheDir(ctx)
|
||||
func GetSyncOptions(ctx context.Context, b *bundle.Bundle) (*sync.SyncOptions, error) {
|
||||
cacheDir, err := b.CacheDir(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get bundle cache directory: %w", err)
|
||||
}
|
||||
|
||||
includes, err := rb.GetSyncIncludePatterns(ctx)
|
||||
includes, err := b.GetSyncIncludePatterns(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get list of sync includes: %w", err)
|
||||
}
|
||||
|
||||
opts := &sync.SyncOptions{
|
||||
WorktreeRoot: rb.WorktreeRoot(),
|
||||
LocalRoot: rb.SyncRoot(),
|
||||
Paths: rb.Config().Sync.Paths,
|
||||
WorktreeRoot: b.WorktreeRoot,
|
||||
LocalRoot: b.SyncRoot,
|
||||
Paths: b.Config.Sync.Paths,
|
||||
Include: includes,
|
||||
Exclude: rb.Config().Sync.Exclude,
|
||||
Exclude: b.Config.Sync.Exclude,
|
||||
|
||||
RemotePath: rb.Config().Workspace.FilePath,
|
||||
Host: rb.WorkspaceClient().Config.Host,
|
||||
RemotePath: b.Config.Workspace.FilePath,
|
||||
Host: b.WorkspaceClient().Config.Host,
|
||||
|
||||
Full: false,
|
||||
|
||||
SnapshotBasePath: cacheDir,
|
||||
WorkspaceClient: rb.WorkspaceClient(),
|
||||
WorkspaceClient: b.WorkspaceClient(),
|
||||
}
|
||||
|
||||
if rb.Config().Workspace.CurrentUser != nil {
|
||||
opts.CurrentUser = rb.Config().Workspace.CurrentUser.User
|
||||
if b.Config.Workspace.CurrentUser != nil {
|
||||
opts.CurrentUser = b.Config.Workspace.CurrentUser.User
|
||||
}
|
||||
|
||||
return opts, nil
|
||||
|
|
|
@ -30,7 +30,7 @@ func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
|||
}
|
||||
|
||||
cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath))
|
||||
opts, err := GetSyncOptions(ctx, bundle.ReadOnly(b))
|
||||
opts, err := GetSyncOptions(ctx, b)
|
||||
if err != nil {
|
||||
return diag.FromErr(err)
|
||||
}
|
||||
|
|
|
@ -85,7 +85,7 @@ func (s *statePull) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostic
|
|||
}
|
||||
|
||||
// Create a new snapshot based on the deployment state file.
|
||||
opts, err := files.GetSyncOptions(ctx, bundle.ReadOnly(b))
|
||||
opts, err := files.GetSyncOptions(ctx, b)
|
||||
if err != nil {
|
||||
return diag.FromErr(err)
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ func testStatePull(t *testing.T, opts statePullOpts) {
|
|||
}
|
||||
|
||||
if opts.withExistingSnapshot {
|
||||
opts, err := files.GetSyncOptions(ctx, bundle.ReadOnly(b))
|
||||
opts, err := files.GetSyncOptions(ctx, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
snapshotPath, err := sync.SnapshotPath(opts)
|
||||
|
@ -134,7 +134,7 @@ func testStatePull(t *testing.T, opts statePullOpts) {
|
|||
}
|
||||
|
||||
if opts.expects.snapshotState != nil {
|
||||
syncOpts, err := files.GetSyncOptions(ctx, bundle.ReadOnly(b))
|
||||
syncOpts, err := files.GetSyncOptions(ctx, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
snapshotPath, err := sync.SnapshotPath(syncOpts)
|
||||
|
|
|
@ -2,28 +2,59 @@ package bundle
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/log"
|
||||
)
|
||||
|
||||
// ReadOnlyMutator is the interface type that allows access to bundle configuration but does not allow any mutations.
|
||||
type ReadOnlyMutator interface {
|
||||
// Name returns the mutators name.
|
||||
Name() string
|
||||
Mutator
|
||||
|
||||
// Apply access the specified read-only bundle object.
|
||||
Apply(context.Context, ReadOnlyBundle) diag.Diagnostics
|
||||
// This is just tag, to differentiate this interface from bundle.Mutator
|
||||
// This prevents non-readonly mutators being passed to ApplyParallel().
|
||||
IsRO()
|
||||
}
|
||||
|
||||
func ApplyReadOnly(ctx context.Context, rb ReadOnlyBundle, m ReadOnlyMutator) diag.Diagnostics {
|
||||
ctx = log.NewContext(ctx, log.GetLogger(ctx).With("mutator (read-only)", m.Name()))
|
||||
// Helper to mark the mutator as "read-only"
|
||||
type RO struct{}
|
||||
|
||||
log.Debugf(ctx, "ApplyReadOnly")
|
||||
diags := m.Apply(ctx, rb)
|
||||
if err := diags.Error(); err != nil {
|
||||
log.Debugf(ctx, "Error: %s", err)
|
||||
func (*RO) IsRO() {}
|
||||
|
||||
// Run mutators in parallel. Unlike Apply and ApplySeq, this does not perform sync between
|
||||
// dynamic and static configuration.
|
||||
// Warning: none of the mutators involved must modify bundle directly or indirectly. In particular,
|
||||
// they must not call bundle.Apply or bundle.ApplySeq because those include writes to config even if mutator does not.
|
||||
// Deprecated: do not use for new use cases. Refactor your parallel task not to depend on bundle at all.
|
||||
func ApplyParallel(ctx context.Context, b *Bundle, mutators ...ReadOnlyMutator) diag.Diagnostics {
|
||||
var allDiags diag.Diagnostics
|
||||
resultsChan := make(chan diag.Diagnostics, len(mutators))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
contexts := make([]context.Context, len(mutators))
|
||||
|
||||
for ind, m := range mutators {
|
||||
contexts[ind] = log.NewContext(ctx, log.GetLogger(ctx).With("mutator", m.Name()))
|
||||
// log right away to have deterministic order of log messages
|
||||
log.Debug(contexts[ind], "ApplyParallel")
|
||||
}
|
||||
|
||||
return diags
|
||||
for ind, m := range mutators {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// We're not using bundle.Apply here because we don't do copy between typed and dynamic values
|
||||
resultsChan <- m.Apply(contexts[ind], b)
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(resultsChan)
|
||||
|
||||
// Collect results into a single slice
|
||||
for diags := range resultsChan {
|
||||
allDiags = append(allDiags, diags...)
|
||||
}
|
||||
|
||||
return allDiags
|
||||
}
|
||||
|
|
|
@ -1,43 +0,0 @@
|
|||
package bundle
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
)
|
||||
|
||||
type parallel struct {
|
||||
mutators []ReadOnlyMutator
|
||||
}
|
||||
|
||||
func (m *parallel) Name() string {
|
||||
return "parallel"
|
||||
}
|
||||
|
||||
func (m *parallel) Apply(ctx context.Context, rb ReadOnlyBundle) diag.Diagnostics {
|
||||
var wg sync.WaitGroup
|
||||
var mu sync.Mutex
|
||||
var diags diag.Diagnostics
|
||||
|
||||
wg.Add(len(m.mutators))
|
||||
for _, mutator := range m.mutators {
|
||||
go func(mutator ReadOnlyMutator) {
|
||||
defer wg.Done()
|
||||
d := ApplyReadOnly(ctx, rb, mutator)
|
||||
|
||||
mu.Lock()
|
||||
diags = diags.Extend(d)
|
||||
mu.Unlock()
|
||||
}(mutator)
|
||||
}
|
||||
wg.Wait()
|
||||
return diags
|
||||
}
|
||||
|
||||
// Parallel runs the given mutators in parallel.
|
||||
func Parallel(mutators ...ReadOnlyMutator) ReadOnlyMutator {
|
||||
return ¶llel{
|
||||
mutators: mutators,
|
||||
}
|
||||
}
|
|
@ -1,82 +0,0 @@
|
|||
package bundle
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type addToContainer struct {
|
||||
t *testing.T
|
||||
container *[]int
|
||||
value int
|
||||
err bool
|
||||
|
||||
// mu is a mutex that protects container. It is used to ensure that the
|
||||
// container slice is only modified by one goroutine at a time.
|
||||
mu *sync.Mutex
|
||||
}
|
||||
|
||||
func (m *addToContainer) Apply(ctx context.Context, b ReadOnlyBundle) diag.Diagnostics {
|
||||
if m.err {
|
||||
return diag.Errorf("error")
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
*m.container = append(*m.container, m.value)
|
||||
m.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *addToContainer) Name() string {
|
||||
return "addToContainer"
|
||||
}
|
||||
|
||||
func TestParallelMutatorWork(t *testing.T) {
|
||||
b := &Bundle{
|
||||
Config: config.Root{},
|
||||
}
|
||||
|
||||
container := []int{}
|
||||
var mu sync.Mutex
|
||||
m1 := &addToContainer{t: t, container: &container, value: 1, mu: &mu}
|
||||
m2 := &addToContainer{t: t, container: &container, value: 2, mu: &mu}
|
||||
m3 := &addToContainer{t: t, container: &container, value: 3, mu: &mu}
|
||||
|
||||
m := Parallel(m1, m2, m3)
|
||||
|
||||
// Apply the mutator
|
||||
diags := ApplyReadOnly(context.Background(), ReadOnly(b), m)
|
||||
require.Empty(t, diags)
|
||||
require.Len(t, container, 3)
|
||||
require.Contains(t, container, 1)
|
||||
require.Contains(t, container, 2)
|
||||
require.Contains(t, container, 3)
|
||||
}
|
||||
|
||||
func TestParallelMutatorWorkWithErrors(t *testing.T) {
|
||||
b := &Bundle{
|
||||
Config: config.Root{},
|
||||
}
|
||||
|
||||
container := []int{}
|
||||
var mu sync.Mutex
|
||||
m1 := &addToContainer{container: &container, value: 1, mu: &mu}
|
||||
m2 := &addToContainer{container: &container, err: true, value: 2, mu: &mu}
|
||||
m3 := &addToContainer{container: &container, value: 3, mu: &mu}
|
||||
|
||||
m := Parallel(m1, m2, m3)
|
||||
|
||||
// Apply the mutator
|
||||
diags := ApplyReadOnly(context.Background(), ReadOnly(b), m)
|
||||
require.Len(t, diags, 1)
|
||||
require.Equal(t, "error", diags[0].Summary)
|
||||
require.Len(t, container, 2)
|
||||
require.Contains(t, container, 1)
|
||||
require.Contains(t, container, 3)
|
||||
}
|
|
@ -13,7 +13,7 @@ import (
|
|||
func TestJobClusterKeyNotDefinedTest(t *testing.T) {
|
||||
b := loadTarget(t, "./job_cluster_key", "default")
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.JobClusterKeyDefined())
|
||||
diags := bundle.Apply(context.Background(), b, validate.JobClusterKeyDefined())
|
||||
require.Len(t, diags, 1)
|
||||
require.NoError(t, diags.Error())
|
||||
require.Equal(t, diag.Warning, diags[0].Severity)
|
||||
|
@ -23,6 +23,6 @@ func TestJobClusterKeyNotDefinedTest(t *testing.T) {
|
|||
func TestJobClusterKeyDefinedTest(t *testing.T) {
|
||||
b := loadTarget(t, "./job_cluster_key", "development")
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.JobClusterKeyDefined())
|
||||
diags := bundle.Apply(context.Background(), b, validate.JobClusterKeyDefined())
|
||||
require.Empty(t, diags)
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ import (
|
|||
func TestSyncIncludeExcludeNoMatchesTest(t *testing.T) {
|
||||
b := loadTarget(t, "./sync/override", "development")
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.ValidateSyncPatterns())
|
||||
diags := bundle.Apply(context.Background(), b, validate.ValidateSyncPatterns())
|
||||
require.Len(t, diags, 3)
|
||||
require.NoError(t, diags.Error())
|
||||
|
||||
|
@ -46,7 +46,7 @@ func TestSyncIncludeExcludeNoMatchesTest(t *testing.T) {
|
|||
func TestSyncIncludeWithNegate(t *testing.T) {
|
||||
b := loadTarget(t, "./sync/negate", "default")
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.ValidateSyncPatterns())
|
||||
diags := bundle.Apply(context.Background(), b, validate.ValidateSyncPatterns())
|
||||
require.Empty(t, diags)
|
||||
require.NoError(t, diags.Error())
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ func TestSyncIncludeWithNegate(t *testing.T) {
|
|||
func TestSyncIncludeWithNegateNoMatches(t *testing.T) {
|
||||
b := loadTarget(t, "./sync/negate", "dev")
|
||||
|
||||
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), validate.ValidateSyncPatterns())
|
||||
diags := bundle.Apply(context.Background(), b, validate.ValidateSyncPatterns())
|
||||
require.Len(t, diags, 1)
|
||||
require.NoError(t, diags.Error())
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ type syncFlags struct {
|
|||
}
|
||||
|
||||
func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOptions, error) {
|
||||
opts, err := files.GetSyncOptions(cmd.Context(), bundle.ReadOnly(b))
|
||||
opts, err := files.GetSyncOptions(cmd.Context(), b)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get sync options: %w", err)
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ func newValidateCommand() *cobra.Command {
|
|||
}
|
||||
|
||||
if !diags.HasError() {
|
||||
diags = diags.Extend(bundle.Apply(ctx, b, validate.Validate()))
|
||||
diags = diags.Extend(validate.Validate(ctx, b))
|
||||
}
|
||||
|
||||
switch root.OutputType(cmd) {
|
||||
|
|
|
@ -33,7 +33,7 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b *
|
|||
return nil, errors.New("SRC and DST are not configurable in the context of a bundle")
|
||||
}
|
||||
|
||||
opts, err := files.GetSyncOptions(cmd.Context(), bundle.ReadOnly(b))
|
||||
opts, err := files.GetSyncOptions(cmd.Context(), b)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get sync options: %w", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue