mirror of https://github.com/databricks/cli.git
Compare commits
10 Commits
a4305ed568
...
28893e8929
Author | SHA1 | Date |
---|---|---|
shreyas-goenka | 28893e8929 | |
shreyas-goenka | 984c38e03e | |
Pieter Noordhuis | ade95d9649 | |
Andrew Nester | 592e1111b7 | |
Andrew Nester | fab3e8f168 | |
Ilya Kuznetsov | 756e55fabc | |
Pieter Noordhuis | 886e14910c | |
Shreyas Goenka | 4233a7c292 | |
Shreyas Goenka | 96a0a3ec27 | |
Shreyas Goenka | df0a98066a |
23
CHANGELOG.md
23
CHANGELOG.md
|
@ -1,5 +1,28 @@
|
|||
# Version changelog
|
||||
|
||||
## [Release] Release v0.235.0
|
||||
|
||||
**Note:** the `bundle generate` command now uses the `.<resource-type>.yml`
|
||||
sub-extension for the configuration files it writes. Existing configuration
|
||||
files that do not use this sub-extension are renamed to include it.
|
||||
|
||||
Bundles:
|
||||
* Make `TableName` field part of quality monitor schema ([#1903](https://github.com/databricks/cli/pull/1903)).
|
||||
* Do not prepend paths starting with ~ or variable reference ([#1905](https://github.com/databricks/cli/pull/1905)).
|
||||
* Fix workspace extensions filer accidentally reading notebooks ([#1891](https://github.com/databricks/cli/pull/1891)).
|
||||
* Fix template initialization when running on Databricks ([#1912](https://github.com/databricks/cli/pull/1912)).
|
||||
* Source-linked deployments for bundles in the workspace ([#1884](https://github.com/databricks/cli/pull/1884)).
|
||||
* Added integration test to deploy bundle to /Shared root path ([#1914](https://github.com/databricks/cli/pull/1914)).
|
||||
* Update filenames used by bundle generate to use `.<resource-type>.yml` ([#1901](https://github.com/databricks/cli/pull/1901)).
|
||||
|
||||
Internal:
|
||||
* Extract functionality to detect if the CLI is running on DBR ([#1889](https://github.com/databricks/cli/pull/1889)).
|
||||
* Consolidate test helpers for `io/fs` ([#1906](https://github.com/databricks/cli/pull/1906)).
|
||||
* Use `fs.FS` interface to read template ([#1910](https://github.com/databricks/cli/pull/1910)).
|
||||
* Use `filer.Filer` to write template instantiation ([#1911](https://github.com/databricks/cli/pull/1911)).
|
||||
|
||||
|
||||
|
||||
## [Release] Release v0.234.0
|
||||
|
||||
Bundles:
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/libs/dbr"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/textutil"
|
||||
|
@ -221,6 +222,15 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
|
|||
dashboard.DisplayName = prefix + dashboard.DisplayName
|
||||
}
|
||||
|
||||
if config.IsExplicitlyEnabled((b.Config.Presets.SourceLinkedDeployment)) {
|
||||
isDatabricksWorkspace := dbr.RunsOnRuntime(ctx) && strings.HasPrefix(b.SyncRootPath, "/Workspace/")
|
||||
if !isDatabricksWorkspace {
|
||||
disabled := false
|
||||
b.Config.Presets.SourceLinkedDeployment = &disabled
|
||||
diags = diags.Extend(diag.Warningf("source-linked deployment is available only in the Databricks Workspace"))
|
||||
}
|
||||
}
|
||||
|
||||
return diags
|
||||
}
|
||||
|
||||
|
|
|
@ -2,12 +2,14 @@ package mutator_test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/config/mutator"
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/cli/libs/dbr"
|
||||
"github.com/databricks/databricks-sdk-go/service/catalog"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -364,3 +366,86 @@ func TestApplyPresetsResourceNotDefined(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestApplyPresetsSourceLinkedDeployment(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("this test is not applicable on Windows because source-linked mode works only in the Databricks Workspace")
|
||||
}
|
||||
|
||||
testContext := context.Background()
|
||||
enabled := true
|
||||
disabled := false
|
||||
workspacePath := "/Workspace/user.name@company.com"
|
||||
|
||||
tests := []struct {
|
||||
bundlePath string
|
||||
ctx context.Context
|
||||
name string
|
||||
initialValue *bool
|
||||
expectedValue *bool
|
||||
expectedWarning string
|
||||
}{
|
||||
{
|
||||
name: "preset enabled, bundle in Workspace, databricks runtime",
|
||||
bundlePath: workspacePath,
|
||||
ctx: dbr.MockRuntime(testContext, true),
|
||||
initialValue: &enabled,
|
||||
expectedValue: &enabled,
|
||||
},
|
||||
{
|
||||
name: "preset enabled, bundle not in Workspace, databricks runtime",
|
||||
bundlePath: "/Users/user.name@company.com",
|
||||
ctx: dbr.MockRuntime(testContext, true),
|
||||
initialValue: &enabled,
|
||||
expectedValue: &disabled,
|
||||
expectedWarning: "source-linked deployment is available only in the Databricks Workspace",
|
||||
},
|
||||
{
|
||||
name: "preset enabled, bundle in Workspace, not databricks runtime",
|
||||
bundlePath: workspacePath,
|
||||
ctx: dbr.MockRuntime(testContext, false),
|
||||
initialValue: &enabled,
|
||||
expectedValue: &disabled,
|
||||
expectedWarning: "source-linked deployment is available only in the Databricks Workspace",
|
||||
},
|
||||
{
|
||||
name: "preset disabled, bundle in Workspace, databricks runtime",
|
||||
bundlePath: workspacePath,
|
||||
ctx: dbr.MockRuntime(testContext, true),
|
||||
initialValue: &disabled,
|
||||
expectedValue: &disabled,
|
||||
},
|
||||
{
|
||||
name: "preset nil, bundle in Workspace, databricks runtime",
|
||||
bundlePath: workspacePath,
|
||||
ctx: dbr.MockRuntime(testContext, true),
|
||||
initialValue: nil,
|
||||
expectedValue: nil,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
SyncRootPath: tt.bundlePath,
|
||||
Config: config.Root{
|
||||
Presets: config.Presets{
|
||||
SourceLinkedDeployment: tt.initialValue,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
diags := bundle.Apply(tt.ctx, b, mutator.ApplyPresets())
|
||||
if diags.HasError() {
|
||||
t.Fatalf("unexpected error: %v", diags)
|
||||
}
|
||||
|
||||
if tt.expectedWarning != "" {
|
||||
require.Equal(t, tt.expectedWarning, diags[0].Summary)
|
||||
}
|
||||
|
||||
require.Equal(t, tt.expectedValue, b.Config.Presets.SourceLinkedDeployment)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/libs/dbr"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/iamutil"
|
||||
|
@ -57,6 +58,14 @@ func transformDevelopmentMode(ctx context.Context, b *bundle.Bundle) {
|
|||
t.TriggerPauseStatus = config.Paused
|
||||
}
|
||||
|
||||
if !config.IsExplicitlyDisabled(t.SourceLinkedDeployment) {
|
||||
isInWorkspace := strings.HasPrefix(b.SyncRootPath, "/Workspace/")
|
||||
if isInWorkspace && dbr.RunsOnRuntime(ctx) {
|
||||
enabled := true
|
||||
t.SourceLinkedDeployment = &enabled
|
||||
}
|
||||
}
|
||||
|
||||
if !config.IsExplicitlyDisabled(t.PipelinesDevelopment) {
|
||||
enabled := true
|
||||
t.PipelinesDevelopment = &enabled
|
||||
|
|
|
@ -3,14 +3,17 @@ package mutator
|
|||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/cli/libs/dbr"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/tags"
|
||||
"github.com/databricks/cli/libs/vfs"
|
||||
sdkconfig "github.com/databricks/databricks-sdk-go/config"
|
||||
"github.com/databricks/databricks-sdk-go/service/catalog"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
|
@ -140,6 +143,7 @@ func mockBundle(mode config.Mode) *bundle.Bundle {
|
|||
},
|
||||
},
|
||||
},
|
||||
SyncRoot: vfs.MustNew("/Users/lennart.kats@databricks.com"),
|
||||
// Use AWS implementation for testing.
|
||||
Tagging: tags.ForCloud(&sdkconfig.Config{
|
||||
Host: "https://company.cloud.databricks.com",
|
||||
|
@ -522,3 +526,32 @@ func TestPipelinesDevelopmentDisabled(t *testing.T) {
|
|||
|
||||
assert.False(t, b.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development)
|
||||
}
|
||||
|
||||
func TestSourceLinkedDeploymentEnabled(t *testing.T) {
|
||||
b, diags := processSourceLinkedBundle(t, true)
|
||||
require.NoError(t, diags.Error())
|
||||
assert.True(t, *b.Config.Presets.SourceLinkedDeployment)
|
||||
}
|
||||
|
||||
func TestSourceLinkedDeploymentDisabled(t *testing.T) {
|
||||
b, diags := processSourceLinkedBundle(t, false)
|
||||
require.NoError(t, diags.Error())
|
||||
assert.False(t, *b.Config.Presets.SourceLinkedDeployment)
|
||||
}
|
||||
|
||||
func processSourceLinkedBundle(t *testing.T, presetEnabled bool) (*bundle.Bundle, diag.Diagnostics) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("this test is not applicable on Windows because source-linked mode works only in the Databricks Workspace")
|
||||
}
|
||||
|
||||
b := mockBundle(config.Development)
|
||||
|
||||
workspacePath := "/Workspace/lennart@company.com/"
|
||||
b.SyncRootPath = workspacePath
|
||||
b.Config.Presets.SourceLinkedDeployment = &presetEnabled
|
||||
|
||||
ctx := dbr.MockRuntime(context.Background(), true)
|
||||
m := bundle.Seq(ProcessTargetMode(), ApplyPresets())
|
||||
diags := bundle.Apply(ctx, b, m)
|
||||
return b, diags
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/notebook"
|
||||
|
@ -103,8 +104,13 @@ func (t *translateContext) rewritePath(
|
|||
return fmt.Errorf("path %s is not contained in sync root path", localPath)
|
||||
}
|
||||
|
||||
// Prefix remote path with its remote root path.
|
||||
remotePath := path.Join(t.b.Config.Workspace.FilePath, filepath.ToSlash(localRelPath))
|
||||
var workspacePath string
|
||||
if config.IsExplicitlyEnabled(t.b.Config.Presets.SourceLinkedDeployment) {
|
||||
workspacePath = t.b.SyncRootPath
|
||||
} else {
|
||||
workspacePath = t.b.Config.Workspace.FilePath
|
||||
}
|
||||
remotePath := path.Join(workspacePath, filepath.ToSlash(localRelPath))
|
||||
|
||||
// Convert local path into workspace path via specified function.
|
||||
interp, err := fn(*p, localPath, localRelPath, remotePath)
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
|
@ -787,3 +788,163 @@ func TestTranslatePathWithComplexVariables(t *testing.T) {
|
|||
b.Config.Resources.Jobs["job"].Tasks[0].Libraries[0].Whl,
|
||||
)
|
||||
}
|
||||
|
||||
func TestTranslatePathsWithSourceLinkedDeployment(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("this test is not applicable on Windows because source-linked mode works only in the Databricks Workspace")
|
||||
}
|
||||
|
||||
dir := t.TempDir()
|
||||
touchNotebookFile(t, filepath.Join(dir, "my_job_notebook.py"))
|
||||
touchNotebookFile(t, filepath.Join(dir, "my_pipeline_notebook.py"))
|
||||
touchEmptyFile(t, filepath.Join(dir, "my_python_file.py"))
|
||||
touchEmptyFile(t, filepath.Join(dir, "dist", "task.jar"))
|
||||
touchEmptyFile(t, filepath.Join(dir, "requirements.txt"))
|
||||
|
||||
enabled := true
|
||||
b := &bundle.Bundle{
|
||||
SyncRootPath: dir,
|
||||
SyncRoot: vfs.MustNew(dir),
|
||||
Config: config.Root{
|
||||
Workspace: config.Workspace{
|
||||
FilePath: "/bundle",
|
||||
},
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"job": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Tasks: []jobs.Task{
|
||||
{
|
||||
NotebookTask: &jobs.NotebookTask{
|
||||
NotebookPath: "my_job_notebook.py",
|
||||
},
|
||||
Libraries: []compute.Library{
|
||||
{Whl: "./dist/task.whl"},
|
||||
},
|
||||
},
|
||||
{
|
||||
NotebookTask: &jobs.NotebookTask{
|
||||
NotebookPath: "/Users/jane.doe@databricks.com/absolute_remote.py",
|
||||
},
|
||||
},
|
||||
{
|
||||
NotebookTask: &jobs.NotebookTask{
|
||||
NotebookPath: "my_job_notebook.py",
|
||||
},
|
||||
Libraries: []compute.Library{
|
||||
{Requirements: "requirements.txt"},
|
||||
},
|
||||
},
|
||||
{
|
||||
SparkPythonTask: &jobs.SparkPythonTask{
|
||||
PythonFile: "my_python_file.py",
|
||||
},
|
||||
},
|
||||
{
|
||||
SparkJarTask: &jobs.SparkJarTask{
|
||||
MainClassName: "HelloWorld",
|
||||
},
|
||||
Libraries: []compute.Library{
|
||||
{Jar: "./dist/task.jar"},
|
||||
},
|
||||
},
|
||||
{
|
||||
SparkJarTask: &jobs.SparkJarTask{
|
||||
MainClassName: "HelloWorldRemote",
|
||||
},
|
||||
Libraries: []compute.Library{
|
||||
{Jar: "dbfs:/bundle/dist/task_remote.jar"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Pipelines: map[string]*resources.Pipeline{
|
||||
"pipeline": {
|
||||
PipelineSpec: &pipelines.PipelineSpec{
|
||||
Libraries: []pipelines.PipelineLibrary{
|
||||
{
|
||||
Notebook: &pipelines.NotebookLibrary{
|
||||
Path: "my_pipeline_notebook.py",
|
||||
},
|
||||
},
|
||||
{
|
||||
Notebook: &pipelines.NotebookLibrary{
|
||||
Path: "/Users/jane.doe@databricks.com/absolute_remote.py",
|
||||
},
|
||||
},
|
||||
{
|
||||
File: &pipelines.FileLibrary{
|
||||
Path: "my_python_file.py",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Presets: config.Presets{
|
||||
SourceLinkedDeployment: &enabled,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, ".", []dyn.Location{{File: filepath.Join(dir, "resource.yml")}})
|
||||
diags := bundle.Apply(context.Background(), b, mutator.TranslatePaths())
|
||||
require.NoError(t, diags.Error())
|
||||
|
||||
// updated to source path
|
||||
assert.Equal(
|
||||
t,
|
||||
filepath.Join(dir, "my_job_notebook"),
|
||||
b.Config.Resources.Jobs["job"].Tasks[0].NotebookTask.NotebookPath,
|
||||
)
|
||||
assert.Equal(
|
||||
t,
|
||||
filepath.Join(dir, "requirements.txt"),
|
||||
b.Config.Resources.Jobs["job"].Tasks[2].Libraries[0].Requirements,
|
||||
)
|
||||
assert.Equal(
|
||||
t,
|
||||
filepath.Join(dir, "my_python_file.py"),
|
||||
b.Config.Resources.Jobs["job"].Tasks[3].SparkPythonTask.PythonFile,
|
||||
)
|
||||
assert.Equal(
|
||||
t,
|
||||
filepath.Join(dir, "my_pipeline_notebook"),
|
||||
b.Config.Resources.Pipelines["pipeline"].Libraries[0].Notebook.Path,
|
||||
)
|
||||
assert.Equal(
|
||||
t,
|
||||
filepath.Join(dir, "my_python_file.py"),
|
||||
b.Config.Resources.Pipelines["pipeline"].Libraries[2].File.Path,
|
||||
)
|
||||
|
||||
// left as is
|
||||
assert.Equal(
|
||||
t,
|
||||
filepath.Join("dist", "task.whl"),
|
||||
b.Config.Resources.Jobs["job"].Tasks[0].Libraries[0].Whl,
|
||||
)
|
||||
assert.Equal(
|
||||
t,
|
||||
"/Users/jane.doe@databricks.com/absolute_remote.py",
|
||||
b.Config.Resources.Jobs["job"].Tasks[1].NotebookTask.NotebookPath,
|
||||
)
|
||||
assert.Equal(
|
||||
t,
|
||||
filepath.Join("dist", "task.jar"),
|
||||
b.Config.Resources.Jobs["job"].Tasks[4].Libraries[0].Jar,
|
||||
)
|
||||
assert.Equal(
|
||||
t,
|
||||
"dbfs:/bundle/dist/task_remote.jar",
|
||||
b.Config.Resources.Jobs["job"].Tasks[5].Libraries[0].Jar,
|
||||
)
|
||||
assert.Equal(
|
||||
t,
|
||||
"/Users/jane.doe@databricks.com/absolute_remote.py",
|
||||
b.Config.Resources.Pipelines["pipeline"].Libraries[1].Notebook.Path,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,11 @@ type Presets struct {
|
|||
// JobsMaxConcurrentRuns is the default value for the max concurrent runs of jobs.
|
||||
JobsMaxConcurrentRuns int `json:"jobs_max_concurrent_runs,omitempty"`
|
||||
|
||||
// SourceLinkedDeployment indicates whether source-linked deployment is enabled. Works only in Databricks Workspace
|
||||
// When set to true, resources created during deployment will point to source files in the workspace instead of their workspace copies.
|
||||
// File synchronization to ${workspace.file_path} is skipped.
|
||||
SourceLinkedDeployment *bool `json:"source_linked_deployment,omitempty"`
|
||||
|
||||
// Tags to add to all resources.
|
||||
Tags map[string]string `json:"tags,omitempty"`
|
||||
}
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
package validate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/cli/libs/dyn/convert"
|
||||
"github.com/databricks/cli/libs/log"
|
||||
)
|
||||
|
||||
// Validates that any single node clusters defined in the bundle are correctly configured.
|
||||
func SingleNodeCluster() bundle.ReadOnlyMutator {
|
||||
return &singleNodeCluster{}
|
||||
}
|
||||
|
||||
type singleNodeCluster struct{}
|
||||
|
||||
func (m *singleNodeCluster) Name() string {
|
||||
return "validate:SingleNodeCluster"
|
||||
}
|
||||
|
||||
const singleNodeWarningDetail = `num_workers should be 0 only for single-node clusters. To create a
|
||||
valid single node cluster please ensure that the following properties
|
||||
are correctly set in the cluster specification:
|
||||
|
||||
spark_conf:
|
||||
spark.databricks.cluster.profile: singleNode
|
||||
spark.master: local[*]
|
||||
|
||||
custom_tags:
|
||||
ResourceClass: SingleNode
|
||||
`
|
||||
|
||||
const singleNodeWarningSummary = `Single node cluster is not correctly configured`
|
||||
|
||||
func showSingleNodeClusterWarning(ctx context.Context, v dyn.Value) bool {
|
||||
// Check if the user has explicitly set the num_workers to 0. Skip the warning
|
||||
// if that's not the case.
|
||||
numWorkers, ok := v.Get("num_workers").AsInt()
|
||||
if !ok || numWorkers > 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Convenient type that contains the common fields from compute.ClusterSpec and
|
||||
// pipelines.PipelineCluster that we are interested in.
|
||||
type ClusterConf struct {
|
||||
SparkConf map[string]string `json:"spark_conf"`
|
||||
CustomTags map[string]string `json:"custom_tags"`
|
||||
PolicyId string `json:"policy_id"`
|
||||
}
|
||||
|
||||
conf := &ClusterConf{}
|
||||
err := convert.ToTyped(conf, v)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
// If the policy id is set, we don't want to show the warning. This is because
|
||||
// the user might have configured `spark_conf` and `custom_tags` correctly
|
||||
// in their cluster policy.
|
||||
if conf.PolicyId != "" {
|
||||
return false
|
||||
}
|
||||
|
||||
profile, ok := conf.SparkConf["spark.databricks.cluster.profile"]
|
||||
if !ok {
|
||||
log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile not found in single-node cluster spec")
|
||||
return true
|
||||
}
|
||||
if profile != "singleNode" {
|
||||
log.Warnf(ctx, "spark_conf spark.databricks.cluster.profile is not singleNode in single-node cluster spec: %s", profile)
|
||||
return true
|
||||
}
|
||||
|
||||
master, ok := conf.SparkConf["spark.master"]
|
||||
if !ok {
|
||||
log.Warnf(ctx, "spark_conf spark.master not found in single-node cluster spec")
|
||||
return true
|
||||
}
|
||||
if !strings.HasPrefix(master, "local") {
|
||||
log.Warnf(ctx, "spark_conf spark.master does not start with local in single-node cluster spec: %s", master)
|
||||
return true
|
||||
}
|
||||
|
||||
resourceClass, ok := conf.CustomTags["ResourceClass"]
|
||||
if !ok {
|
||||
log.Warnf(ctx, "custom_tag ResourceClass not found in single-node cluster spec")
|
||||
return true
|
||||
}
|
||||
if resourceClass != "SingleNode" {
|
||||
log.Warnf(ctx, "custom_tag ResourceClass is not SingleNode in single-node cluster spec: %s", resourceClass)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *singleNodeCluster) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
|
||||
diags := diag.Diagnostics{}
|
||||
|
||||
patterns := []dyn.Pattern{
|
||||
// Interactive clusters
|
||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("clusters"), dyn.AnyKey()),
|
||||
// Job clusters
|
||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("job_clusters"), dyn.AnyIndex(), dyn.Key("new_cluster")),
|
||||
// Job task clusters
|
||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("jobs"), dyn.AnyKey(), dyn.Key("tasks"), dyn.AnyIndex(), dyn.Key("new_cluster")),
|
||||
// Pipeline clusters
|
||||
dyn.NewPattern(dyn.Key("resources"), dyn.Key("pipelines"), dyn.AnyKey(), dyn.Key("clusters"), dyn.AnyIndex()),
|
||||
}
|
||||
|
||||
for _, p := range patterns {
|
||||
_, err := dyn.MapByPattern(rb.Config().Value(), p, func(p dyn.Path, v dyn.Value) (dyn.Value, error) {
|
||||
warning := diag.Diagnostic{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: v.Locations(),
|
||||
Paths: []dyn.Path{p},
|
||||
}
|
||||
|
||||
if showSingleNodeClusterWarning(ctx, v) {
|
||||
diags = append(diags, warning)
|
||||
}
|
||||
return v, nil
|
||||
})
|
||||
if err != nil {
|
||||
log.Debugf(ctx, "Error while applying single node cluster validation: %s", err)
|
||||
}
|
||||
}
|
||||
return diags
|
||||
}
|
|
@ -0,0 +1,440 @@
|
|||
package validate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/config/resources"
|
||||
"github.com/databricks/cli/bundle/internal/bundletest"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/dyn"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/databricks/databricks-sdk-go/service/pipelines"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestValidateSingleNodeClusterFail(t *testing.T) {
|
||||
failCases := []struct {
|
||||
name string
|
||||
sparkConf map[string]string
|
||||
customTags map[string]string
|
||||
}{
|
||||
{
|
||||
name: "no tags or conf",
|
||||
},
|
||||
{
|
||||
name: "no tags",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no conf",
|
||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||
},
|
||||
{
|
||||
name: "invalid spark cluster profile",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "invalid",
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||
},
|
||||
{
|
||||
name: "invalid spark.master",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
"spark.master": "invalid",
|
||||
},
|
||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||
},
|
||||
{
|
||||
name: "invalid tags",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
customTags: map[string]string{"ResourceClass": "invalid"},
|
||||
},
|
||||
{
|
||||
name: "missing ResourceClass tag",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
customTags: map[string]string{"what": "ever"},
|
||||
},
|
||||
{
|
||||
name: "missing spark.master",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
},
|
||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||
},
|
||||
{
|
||||
name: "missing spark.databricks.cluster.profile",
|
||||
sparkConf: map[string]string{
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
customTags: map[string]string{"ResourceClass": "SingleNode"},
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Interactive clusters.
|
||||
for _, tc := range failCases {
|
||||
t.Run("interactive_"+tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Clusters: map[string]*resources.Cluster{
|
||||
"foo": {
|
||||
ClusterSpec: &compute.ClusterSpec{
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.clusters.foo", []dyn.Location{{File: "a.yml", Line: 1, Column: 1}})
|
||||
|
||||
// We can't set num_workers to 0 explicitly in the typed configuration.
|
||||
// Do it on the dyn.Value directly.
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(0))
|
||||
})
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: []dyn.Location{{File: "a.yml", Line: 1, Column: 1}},
|
||||
Paths: []dyn.Path{dyn.NewPath(dyn.Key("resources"), dyn.Key("clusters"), dyn.Key("foo"))},
|
||||
},
|
||||
}, diags)
|
||||
})
|
||||
}
|
||||
|
||||
// Job clusters.
|
||||
for _, tc := range failCases {
|
||||
t.Run("job_"+tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"foo": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
JobClusters: []jobs.JobCluster{
|
||||
{
|
||||
NewCluster: compute.ClusterSpec{
|
||||
ClusterName: "my_cluster",
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.jobs.foo.job_clusters[0].new_cluster", []dyn.Location{{File: "b.yml", Line: 1, Column: 1}})
|
||||
|
||||
// We can't set num_workers to 0 explicitly in the typed configuration.
|
||||
// Do it on the dyn.Value directly.
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: []dyn.Location{{File: "b.yml", Line: 1, Column: 1}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.job_clusters[0].new_cluster")},
|
||||
},
|
||||
}, diags)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
// Job task clusters.
|
||||
for _, tc := range failCases {
|
||||
t.Run("task_"+tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"foo": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Tasks: []jobs.Task{
|
||||
{
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
ClusterName: "my_cluster",
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.jobs.foo.tasks[0].new_cluster", []dyn.Location{{File: "c.yml", Line: 1, Column: 1}})
|
||||
|
||||
// We can't set num_workers to 0 explicitly in the typed configuration.
|
||||
// Do it on the dyn.Value directly.
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: []dyn.Location{{File: "c.yml", Line: 1, Column: 1}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("resources.jobs.foo.tasks[0].new_cluster")},
|
||||
},
|
||||
}, diags)
|
||||
})
|
||||
}
|
||||
|
||||
// Pipeline clusters.
|
||||
for _, tc := range failCases {
|
||||
t.Run("pipeline_"+tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Pipelines: map[string]*resources.Pipeline{
|
||||
"foo": {
|
||||
PipelineSpec: &pipelines.PipelineSpec{
|
||||
Clusters: []pipelines.PipelineCluster{
|
||||
{
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
bundletest.SetLocation(b, "resources.pipelines.foo.clusters[0]", []dyn.Location{{File: "d.yml", Line: 1, Column: 1}})
|
||||
|
||||
// We can't set num_workers to 0 explicitly in the typed configuration.
|
||||
// Do it on the dyn.Value directly.
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(0))
|
||||
})
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Equal(t, diag.Diagnostics{
|
||||
{
|
||||
Severity: diag.Warning,
|
||||
Summary: singleNodeWarningSummary,
|
||||
Detail: singleNodeWarningDetail,
|
||||
Locations: []dyn.Location{{File: "d.yml", Line: 1, Column: 1}},
|
||||
Paths: []dyn.Path{dyn.MustPathFromString("resources.pipelines.foo.clusters[0]")},
|
||||
},
|
||||
}, diags)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestValidateSingleNodeClusterPass(t *testing.T) {
|
||||
zero := 0
|
||||
one := 1
|
||||
|
||||
passCases := []struct {
|
||||
name string
|
||||
numWorkers *int
|
||||
sparkConf map[string]string
|
||||
customTags map[string]string
|
||||
policyId string
|
||||
}{
|
||||
{
|
||||
name: "single node cluster",
|
||||
sparkConf: map[string]string{
|
||||
"spark.databricks.cluster.profile": "singleNode",
|
||||
"spark.master": "local[*]",
|
||||
},
|
||||
customTags: map[string]string{
|
||||
"ResourceClass": "SingleNode",
|
||||
},
|
||||
numWorkers: &zero,
|
||||
},
|
||||
{
|
||||
name: "num workers is not zero",
|
||||
numWorkers: &one,
|
||||
},
|
||||
{
|
||||
name: "num workers is not set",
|
||||
},
|
||||
{
|
||||
name: "policy id is not empty",
|
||||
policyId: "policy-abc",
|
||||
numWorkers: &zero,
|
||||
},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Interactive clusters.
|
||||
for _, tc := range passCases {
|
||||
t.Run("interactive_"+tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Clusters: map[string]*resources.Cluster{
|
||||
"foo": {
|
||||
ClusterSpec: &compute.ClusterSpec{
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
PolicyId: tc.policyId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if tc.numWorkers != nil {
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.clusters.foo.num_workers", dyn.V(*tc.numWorkers))
|
||||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
|
||||
// Job clusters.
|
||||
for _, tc := range passCases {
|
||||
t.Run("job_"+tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"foo": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
JobClusters: []jobs.JobCluster{
|
||||
{
|
||||
NewCluster: compute.ClusterSpec{
|
||||
ClusterName: "my_cluster",
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
PolicyId: tc.policyId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if tc.numWorkers != nil {
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.jobs.foo.job_clusters[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
|
||||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
|
||||
// Job task clusters.
|
||||
for _, tc := range passCases {
|
||||
t.Run("task_"+tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Jobs: map[string]*resources.Job{
|
||||
"foo": {
|
||||
JobSettings: &jobs.JobSettings{
|
||||
Tasks: []jobs.Task{
|
||||
{
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
ClusterName: "my_cluster",
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
PolicyId: tc.policyId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if tc.numWorkers != nil {
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.jobs.foo.tasks[0].new_cluster.num_workers", dyn.V(*tc.numWorkers))
|
||||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
|
||||
// Pipeline clusters.
|
||||
for _, tc := range passCases {
|
||||
t.Run("pipeline_"+tc.name, func(t *testing.T) {
|
||||
b := &bundle.Bundle{
|
||||
Config: config.Root{
|
||||
Resources: config.Resources{
|
||||
Pipelines: map[string]*resources.Pipeline{
|
||||
"foo": {
|
||||
PipelineSpec: &pipelines.PipelineSpec{
|
||||
Clusters: []pipelines.PipelineCluster{
|
||||
{
|
||||
SparkConf: tc.sparkConf,
|
||||
CustomTags: tc.customTags,
|
||||
PolicyId: tc.policyId,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if tc.numWorkers != nil {
|
||||
bundletest.Mutate(t, b, func(v dyn.Value) (dyn.Value, error) {
|
||||
return dyn.Set(v, "resources.pipelines.foo.clusters[0].num_workers", dyn.V(*tc.numWorkers))
|
||||
})
|
||||
}
|
||||
|
||||
diags := bundle.ApplyReadOnly(ctx, bundle.ReadOnly(b), SingleNodeCluster())
|
||||
assert.Empty(t, diags)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -36,6 +36,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
|
|||
ValidateSyncPatterns(),
|
||||
JobTaskClusterSpec(),
|
||||
ValidateFolderPermissions(),
|
||||
SingleNodeCluster(),
|
||||
))
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io/fs"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/permissions"
|
||||
"github.com/databricks/cli/libs/cmdio"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
|
@ -23,6 +24,11 @@ func (m *upload) Name() string {
|
|||
}
|
||||
|
||||
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
if config.IsExplicitlyEnabled(b.Config.Presets.SourceLinkedDeployment) {
|
||||
cmdio.LogString(ctx, "Source-linked deployment is enabled. Deployed resources reference the source files in your working tree instead of separate copies.")
|
||||
return nil
|
||||
}
|
||||
|
||||
cmdio.LogString(ctx, fmt.Sprintf("Uploading bundle files to %s...", b.Config.Workspace.FilePath))
|
||||
opts, err := GetSyncOptions(ctx, bundle.ReadOnly(b))
|
||||
if err != nil {
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/databricks/cli/bundle"
|
||||
"github.com/databricks/cli/bundle/config"
|
||||
"github.com/databricks/cli/bundle/libraries"
|
||||
"github.com/databricks/cli/libs/diag"
|
||||
"github.com/databricks/cli/libs/log"
|
||||
|
@ -22,6 +23,9 @@ func WrapperWarning() bundle.Mutator {
|
|||
|
||||
func (m *wrapperWarning) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
||||
if isPythonWheelWrapperOn(b) {
|
||||
if config.IsExplicitlyEnabled(b.Config.Presets.SourceLinkedDeployment) {
|
||||
return diag.Warningf("Python wheel notebook wrapper is not available when using source-linked deployment mode. You can disable this mode by setting 'presets.source_linked_deployment: false'")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -3,8 +3,10 @@ package generate
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
@ -90,7 +92,7 @@ func TestGeneratePipelineCommand(t *testing.T) {
|
|||
err := cmd.RunE(cmd, []string{})
|
||||
require.NoError(t, err)
|
||||
|
||||
data, err := os.ReadFile(filepath.Join(configDir, "test_pipeline.yml"))
|
||||
data, err := os.ReadFile(filepath.Join(configDir, "test_pipeline.pipeline.yml"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, fmt.Sprintf(`resources:
|
||||
pipelines:
|
||||
|
@ -186,7 +188,123 @@ func TestGenerateJobCommand(t *testing.T) {
|
|||
err := cmd.RunE(cmd, []string{})
|
||||
require.NoError(t, err)
|
||||
|
||||
data, err := os.ReadFile(filepath.Join(configDir, "test_job.yml"))
|
||||
data, err := os.ReadFile(filepath.Join(configDir, "test_job.job.yml"))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, fmt.Sprintf(`resources:
|
||||
jobs:
|
||||
test_job:
|
||||
name: test-job
|
||||
job_clusters:
|
||||
- new_cluster:
|
||||
custom_tags:
|
||||
"Tag1": "24X7-1234"
|
||||
- new_cluster:
|
||||
spark_conf:
|
||||
"spark.databricks.delta.preview.enabled": "true"
|
||||
tasks:
|
||||
- task_key: notebook_task
|
||||
notebook_task:
|
||||
notebook_path: %s
|
||||
parameters:
|
||||
- name: empty
|
||||
default: ""
|
||||
`, filepath.Join("..", "src", "notebook.py")), string(data))
|
||||
|
||||
data, err = os.ReadFile(filepath.Join(srcDir, "notebook.py"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "# Databricks notebook source\nNotebook content", string(data))
|
||||
}
|
||||
|
||||
func touchEmptyFile(t *testing.T, path string) {
|
||||
err := os.MkdirAll(filepath.Dir(path), 0700)
|
||||
require.NoError(t, err)
|
||||
f, err := os.Create(path)
|
||||
require.NoError(t, err)
|
||||
f.Close()
|
||||
}
|
||||
|
||||
func TestGenerateJobCommandOldFileRename(t *testing.T) {
|
||||
cmd := NewGenerateJobCommand()
|
||||
|
||||
root := t.TempDir()
|
||||
b := &bundle.Bundle{
|
||||
BundleRootPath: root,
|
||||
}
|
||||
|
||||
m := mocks.NewMockWorkspaceClient(t)
|
||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||
|
||||
jobsApi := m.GetMockJobsAPI()
|
||||
jobsApi.EXPECT().Get(mock.Anything, jobs.GetJobRequest{JobId: 1234}).Return(&jobs.Job{
|
||||
Settings: &jobs.JobSettings{
|
||||
Name: "test-job",
|
||||
JobClusters: []jobs.JobCluster{
|
||||
{NewCluster: compute.ClusterSpec{
|
||||
CustomTags: map[string]string{
|
||||
"Tag1": "24X7-1234",
|
||||
},
|
||||
}},
|
||||
{NewCluster: compute.ClusterSpec{
|
||||
SparkConf: map[string]string{
|
||||
"spark.databricks.delta.preview.enabled": "true",
|
||||
},
|
||||
}},
|
||||
},
|
||||
Tasks: []jobs.Task{
|
||||
{
|
||||
TaskKey: "notebook_task",
|
||||
NotebookTask: &jobs.NotebookTask{
|
||||
NotebookPath: "/test/notebook",
|
||||
},
|
||||
},
|
||||
},
|
||||
Parameters: []jobs.JobParameterDefinition{
|
||||
{
|
||||
Name: "empty",
|
||||
Default: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil)
|
||||
|
||||
workspaceApi := m.GetMockWorkspaceAPI()
|
||||
workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/notebook").Return(&workspace.ObjectInfo{
|
||||
ObjectType: workspace.ObjectTypeNotebook,
|
||||
Language: workspace.LanguagePython,
|
||||
Path: "/test/notebook",
|
||||
}, nil)
|
||||
|
||||
notebookContent := io.NopCloser(bytes.NewBufferString("# Databricks notebook source\nNotebook content"))
|
||||
workspaceApi.EXPECT().Download(mock.Anything, "/test/notebook", mock.Anything).Return(notebookContent, nil)
|
||||
|
||||
cmd.SetContext(bundle.Context(context.Background(), b))
|
||||
cmd.Flag("existing-job-id").Value.Set("1234")
|
||||
|
||||
configDir := filepath.Join(root, "resources")
|
||||
cmd.Flag("config-dir").Value.Set(configDir)
|
||||
|
||||
srcDir := filepath.Join(root, "src")
|
||||
cmd.Flag("source-dir").Value.Set(srcDir)
|
||||
|
||||
var key string
|
||||
cmd.Flags().StringVar(&key, "key", "test_job", "")
|
||||
|
||||
// Create an old generated file first
|
||||
oldFilename := filepath.Join(configDir, "test_job.yml")
|
||||
touchEmptyFile(t, oldFilename)
|
||||
|
||||
// Having an existing files require --force flag to regenerate them
|
||||
cmd.Flag("force").Value.Set("true")
|
||||
|
||||
err := cmd.RunE(cmd, []string{})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Make sure file do not exists after the run
|
||||
_, err = os.Stat(oldFilename)
|
||||
require.True(t, errors.Is(err, fs.ErrNotExist))
|
||||
|
||||
data, err := os.ReadFile(filepath.Join(configDir, "test_job.job.yml"))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, fmt.Sprintf(`resources:
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package generate
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
|
@ -83,7 +85,17 @@ func NewGenerateJobCommand() *cobra.Command {
|
|||
return err
|
||||
}
|
||||
|
||||
filename := filepath.Join(configDir, fmt.Sprintf("%s.yml", jobKey))
|
||||
oldFilename := filepath.Join(configDir, fmt.Sprintf("%s.yml", jobKey))
|
||||
filename := filepath.Join(configDir, fmt.Sprintf("%s.job.yml", jobKey))
|
||||
|
||||
// User might continuously run generate command to update their bundle jobs with any changes made in Databricks UI.
|
||||
// Due to changing in the generated file names, we need to first rename existing resource file to the new name.
|
||||
// Otherwise users can end up with duplicated resources.
|
||||
err = os.Rename(oldFilename, filename)
|
||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return fmt.Errorf("failed to rename file %s. DABs uses the resource type as a sub-extension for generated content, please rename it to %s, err: %w", oldFilename, filename, err)
|
||||
}
|
||||
|
||||
saver := yamlsaver.NewSaverWithStyle(map[string]yaml.Style{
|
||||
// Including all JobSettings and nested fields which are map[string]string type
|
||||
"spark_conf": yaml.DoubleQuotedStyle,
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package generate
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
|
@ -83,7 +85,17 @@ func NewGeneratePipelineCommand() *cobra.Command {
|
|||
return err
|
||||
}
|
||||
|
||||
filename := filepath.Join(configDir, fmt.Sprintf("%s.yml", pipelineKey))
|
||||
oldFilename := filepath.Join(configDir, fmt.Sprintf("%s.yml", pipelineKey))
|
||||
filename := filepath.Join(configDir, fmt.Sprintf("%s.pipeline.yml", pipelineKey))
|
||||
|
||||
// User might continuously run generate command to update their bundle jobs with any changes made in Databricks UI.
|
||||
// Due to changing in the generated file names, we need to first rename existing resource file to the new name.
|
||||
// Otherwise users can end up with duplicated resources.
|
||||
err = os.Rename(oldFilename, filename)
|
||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return fmt.Errorf("failed to rename file %s. DABs uses the resource type as a sub-extension for generated content, please rename it to %s, err: %w", oldFilename, filename, err)
|
||||
}
|
||||
|
||||
saver := yamlsaver.NewSaverWithStyle(
|
||||
// Including all PipelineSpec and nested fields which are map[string]string type
|
||||
map[string]yaml.Style{
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package bundle
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/fs"
|
||||
|
@ -11,6 +12,8 @@ import (
|
|||
|
||||
"github.com/databricks/cli/cmd/root"
|
||||
"github.com/databricks/cli/libs/cmdio"
|
||||
"github.com/databricks/cli/libs/dbr"
|
||||
"github.com/databricks/cli/libs/filer"
|
||||
"github.com/databricks/cli/libs/git"
|
||||
"github.com/databricks/cli/libs/template"
|
||||
"github.com/spf13/cobra"
|
||||
|
@ -147,6 +150,26 @@ func repoName(url string) string {
|
|||
return parts[len(parts)-1]
|
||||
}
|
||||
|
||||
func constructOutputFiler(ctx context.Context, outputDir string) (filer.Filer, error) {
|
||||
outputDir, err := filepath.Abs(outputDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If the CLI is running on DBR and we're writing to the workspace file system,
|
||||
// use the extension-aware workspace filesystem filer to instantiate the template.
|
||||
//
|
||||
// It is not possible to write notebooks through the workspace filesystem's FUSE mount.
|
||||
// Therefore this is the only way we can initialize templates that contain notebooks
|
||||
// when running the CLI on DBR and initializing a template to the workspace.
|
||||
//
|
||||
if strings.HasPrefix(outputDir, "/Workspace/") && dbr.RunsOnRuntime(ctx) {
|
||||
return filer.NewWorkspaceFilesExtensionsClient(root.WorkspaceClient(ctx), outputDir)
|
||||
}
|
||||
|
||||
return filer.NewLocalClient(outputDir)
|
||||
}
|
||||
|
||||
func newInitCommand() *cobra.Command {
|
||||
cmd := &cobra.Command{
|
||||
Use: "init [TEMPLATE_PATH]",
|
||||
|
@ -201,6 +224,11 @@ See https://docs.databricks.com/en/dev-tools/bundles/templates.html for more inf
|
|||
templatePath = getNativeTemplateByDescription(description)
|
||||
}
|
||||
|
||||
outputFiler, err := constructOutputFiler(ctx, outputDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if templatePath == customTemplate {
|
||||
cmdio.LogString(ctx, "Please specify a path or Git repository to use a custom template.")
|
||||
cmdio.LogString(ctx, "See https://docs.databricks.com/en/dev-tools/bundles/templates.html to learn more about custom templates.")
|
||||
|
@ -230,7 +258,7 @@ See https://docs.databricks.com/en/dev-tools/bundles/templates.html for more inf
|
|||
|
||||
// skip downloading the repo because input arg is not a URL. We assume
|
||||
// it's a path on the local file system in that case
|
||||
return template.Materialize(ctx, configFile, templateFS, outputDir)
|
||||
return template.Materialize(ctx, configFile, templateFS, outputFiler)
|
||||
}
|
||||
|
||||
// Create a temporary directory with the name of the repository. The '*'
|
||||
|
@ -255,7 +283,7 @@ See https://docs.databricks.com/en/dev-tools/bundles/templates.html for more inf
|
|||
// Clean up downloaded repository once the template is materialized.
|
||||
defer os.RemoveAll(repoDir)
|
||||
templateFS := os.DirFS(filepath.Join(repoDir, templateDir))
|
||||
return template.Materialize(ctx, configFile, templateFS, outputDir)
|
||||
return template.Materialize(ctx, configFile, templateFS, outputFiler)
|
||||
}
|
||||
return cmd
|
||||
}
|
||||
|
|
|
@ -166,7 +166,7 @@ func TestAccGenerateAndBind(t *testing.T) {
|
|||
_, err = os.Stat(filepath.Join(bundleRoot, "src", "test.py"))
|
||||
require.NoError(t, err)
|
||||
|
||||
matches, err := filepath.Glob(filepath.Join(bundleRoot, "resources", "test_job_key.yml"))
|
||||
matches, err := filepath.Glob(filepath.Join(bundleRoot, "resources", "test_job_key.job.yml"))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, matches, 1)
|
||||
|
|
|
@ -11,6 +11,11 @@
|
|||
"node_type_id": {
|
||||
"type": "string",
|
||||
"description": "Node type id for job cluster"
|
||||
},
|
||||
"root_path": {
|
||||
"type": "string",
|
||||
"description": "Root path to deploy bundle to",
|
||||
"default": ""
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,11 @@ bundle:
|
|||
name: basic
|
||||
|
||||
workspace:
|
||||
{{ if .root_path }}
|
||||
root_path: "{{.root_path}}/.bundle/{{.unique_id}}"
|
||||
{{ else }}
|
||||
root_path: "~/.bundle/{{.unique_id}}"
|
||||
{{ end }}
|
||||
|
||||
resources:
|
||||
jobs:
|
||||
|
|
|
@ -1,2 +0,0 @@
|
|||
bundle:
|
||||
name: abc
|
|
@ -1,5 +1,8 @@
|
|||
bundle:
|
||||
name: "bundle-playground"
|
||||
name: recreate-pipeline
|
||||
|
||||
workspace:
|
||||
root_path: "~/.bundle/{{.unique_id}}"
|
||||
|
||||
variables:
|
||||
catalog:
|
||||
|
|
|
@ -1,5 +1,8 @@
|
|||
bundle:
|
||||
name: "bundle-playground"
|
||||
name: uc-schema
|
||||
|
||||
workspace:
|
||||
root_path: "~/.bundle/{{.unique_id}}"
|
||||
|
||||
resources:
|
||||
pipelines:
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
package bundle
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/databricks/cli/internal"
|
||||
"github.com/databricks/cli/internal/acc"
|
||||
"github.com/databricks/cli/libs/env"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAccDeployBasicToSharedWorkspacePath(t *testing.T) {
|
||||
ctx, wt := acc.WorkspaceTest(t)
|
||||
|
||||
nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
|
||||
uniqueId := uuid.New().String()
|
||||
|
||||
currentUser, err := wt.W.CurrentUser.Me(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
bundleRoot, err := initTestTemplate(t, ctx, "basic", map[string]any{
|
||||
"unique_id": uniqueId,
|
||||
"node_type_id": nodeTypeId,
|
||||
"spark_version": defaultSparkVersion,
|
||||
"root_path": fmt.Sprintf("/Shared/%s", currentUser.UserName),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
err = destroyBundle(wt.T, ctx, bundleRoot)
|
||||
require.NoError(wt.T, err)
|
||||
})
|
||||
|
||||
err = deployBundle(wt.T, ctx, bundleRoot)
|
||||
require.NoError(wt.T, err)
|
||||
}
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/databricks/cli/internal"
|
||||
"github.com/databricks/cli/libs/cmdio"
|
||||
"github.com/databricks/cli/libs/env"
|
||||
"github.com/databricks/cli/libs/filer"
|
||||
"github.com/databricks/cli/libs/flags"
|
||||
"github.com/databricks/cli/libs/template"
|
||||
"github.com/databricks/cli/libs/vfs"
|
||||
|
@ -42,7 +43,9 @@ func initTestTemplateWithBundleRoot(t *testing.T, ctx context.Context, templateN
|
|||
cmd := cmdio.NewIO(flags.OutputJSON, strings.NewReader(""), os.Stdout, os.Stderr, "", "bundles")
|
||||
ctx = cmdio.InContext(ctx, cmd)
|
||||
|
||||
err = template.Materialize(ctx, configFilePath, os.DirFS(templateRoot), bundleRoot)
|
||||
out, err := filer.NewLocalClient(bundleRoot)
|
||||
require.NoError(t, err)
|
||||
err = template.Materialize(ctx, configFilePath, os.DirFS(templateRoot), out)
|
||||
return bundleRoot, err
|
||||
}
|
||||
|
||||
|
|
|
@ -21,8 +21,8 @@ const schemaFileName = "databricks_template_schema.json"
|
|||
// ctx: context containing a cmdio object. This is used to prompt the user
|
||||
// configFilePath: file path containing user defined config values
|
||||
// templateFS: root of the template definition
|
||||
// outputDir: root of directory where to initialize the template
|
||||
func Materialize(ctx context.Context, configFilePath string, templateFS fs.FS, outputDir string) error {
|
||||
// outputFiler: filer to use for writing the initialized template
|
||||
func Materialize(ctx context.Context, configFilePath string, templateFS fs.FS, outputFiler filer.Filer) error {
|
||||
if _, err := fs.Stat(templateFS, schemaFileName); errors.Is(err, fs.ErrNotExist) {
|
||||
return fmt.Errorf("not a bundle template: expected to find a template schema file at %s", schemaFileName)
|
||||
}
|
||||
|
@ -73,12 +73,7 @@ func Materialize(ctx context.Context, configFilePath string, templateFS fs.FS, o
|
|||
return err
|
||||
}
|
||||
|
||||
out, err := filer.NewLocalClient(outputDir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = r.persistToDisk(ctx, out)
|
||||
err = r.persistToDisk(ctx, outputFiler)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -19,6 +19,6 @@ func TestMaterializeForNonTemplateDirectory(t *testing.T) {
|
|||
ctx := root.SetWorkspaceClient(context.Background(), w)
|
||||
|
||||
// Try to materialize a non-template directory.
|
||||
err = Materialize(ctx, "", os.DirFS(tmpDir), "")
|
||||
err = Materialize(ctx, "", os.DirFS(tmpDir), nil)
|
||||
assert.EqualError(t, err, fmt.Sprintf("not a bundle template: expected to find a template schema file at %s", schemaFileName))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue