Allow artifacts (JARs, wheels) to be uploaded to UC Volumes (#1591)

## Changes
This change allows to specify UC volumes path as an artifact paths so
all artifacts (JARs, wheels) are uploaded to UC Volumes.

Example configuration is here:
```
bundle:
  name: jar-bundle

workspace:
  host: https://foo.com
  artifact_path: /Volumes/main/default/foobar

artifacts:
  my_java_code:
    path: ./sample-java
    build: "javac PrintArgs.java && jar cvfm PrintArgs.jar META-INF/MANIFEST.MF PrintArgs.class"
    files:
      - source: ./sample-java/PrintArgs.jar

resources:
  jobs:
    jar_job:
      name: "Test Spark Jar Job"
      tasks:
        - task_key: TestSparkJarTask
          new_cluster:
            num_workers: 1
            spark_version: "14.3.x-scala2.12"
            node_type_id: "i3.xlarge"
          spark_jar_task:
            main_class_name: PrintArgs
          libraries:
            - jar: ./sample-java/PrintArgs.jar
```
## Tests
Manually + added E2E test for Java jobs

E2E test is temporarily skipped until auth related issues for UC for
tests are resolved
This commit is contained in:
Andrew Nester 2024-07-16 10:57:04 +02:00 committed by GitHub
parent 61cb0f2695
commit 434bcbb018
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 357 additions and 38 deletions

View File

@ -8,6 +8,7 @@ import (
"os"
"path"
"path/filepath"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/artifacts/whl"
@ -17,6 +18,7 @@ import (
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
)
type mutatorFactory = func(name string) bundle.Mutator
@ -103,7 +105,7 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost
return diag.FromErr(err)
}
client, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), uploadPath)
client, err := getFilerForArtifacts(b.WorkspaceClient(), uploadPath)
if err != nil {
return diag.FromErr(err)
}
@ -116,6 +118,17 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnost
return nil
}
func getFilerForArtifacts(w *databricks.WorkspaceClient, uploadPath string) (filer.Filer, error) {
if isVolumesPath(uploadPath) {
return filer.NewFilesClient(w, uploadPath)
}
return filer.NewWorkspaceFilesClient(w, uploadPath)
}
func isVolumesPath(path string) bool {
return strings.HasPrefix(path, "/Volumes/")
}
func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, uploadPath string, client filer.Filer) error {
for i := range a.Files {
f := &a.Files[i]
@ -130,14 +143,15 @@ func uploadArtifact(ctx context.Context, b *bundle.Bundle, a *config.Artifact, u
log.Infof(ctx, "Upload succeeded")
f.RemotePath = path.Join(uploadPath, filepath.Base(f.Source))
remotePath := f.RemotePath
// TODO: confirm if we still need to update the remote path to start with /Workspace
if !strings.HasPrefix(f.RemotePath, "/Workspace/") && !strings.HasPrefix(f.RemotePath, "/Volumes/") {
wsfsBase := "/Workspace"
remotePath := path.Join(wsfsBase, f.RemotePath)
remotePath = path.Join(wsfsBase, f.RemotePath)
}
for _, job := range b.Config.Resources.Jobs {
rewriteArtifactPath(b, f, job, remotePath)
}
}

View File

@ -17,7 +17,7 @@ import (
"github.com/stretchr/testify/require"
)
func TestArtifactUpload(t *testing.T) {
func TestArtifactUploadForWorkspace(t *testing.T) {
tmpDir := t.TempDir()
whlFolder := filepath.Join(tmpDir, "whl")
testutil.Touch(t, whlFolder, "source.whl")
@ -105,3 +105,92 @@ func TestArtifactUpload(t *testing.T) {
require.Equal(t, "/Workspace/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl)
require.Equal(t, "/Workspace/Users/foo@bar.com/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl)
}
func TestArtifactUploadForVolumes(t *testing.T) {
tmpDir := t.TempDir()
whlFolder := filepath.Join(tmpDir, "whl")
testutil.Touch(t, whlFolder, "source.whl")
whlLocalPath := filepath.Join(whlFolder, "source.whl")
b := &bundle.Bundle{
RootPath: tmpDir,
Config: config.Root{
Workspace: config.Workspace{
ArtifactPath: "/Volumes/foo/bar/artifacts",
},
Artifacts: config.Artifacts{
"whl": {
Type: config.ArtifactPythonWheel,
Files: []config.ArtifactFile{
{Source: whlLocalPath},
},
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Volumes/some/path/mywheel.whl",
},
},
},
{
ForEachTask: &jobs.ForEachTask{
Task: jobs.Task{
Libraries: []compute.Library{
{
Whl: filepath.Join("whl", "*.whl"),
},
{
Whl: "/Volumes/some/path/mywheel.whl",
},
},
},
},
},
},
Environments: []jobs.JobEnvironment{
{
Spec: &compute.Environment{
Dependencies: []string{
filepath.Join("whl", "source.whl"),
"/Volumes/some/path/mywheel.whl",
},
},
},
},
},
},
},
},
},
}
artifact := b.Config.Artifacts["whl"]
mockFiler := mockfiler.NewMockFiler(t)
mockFiler.EXPECT().Write(
mock.Anything,
filepath.Join("source.whl"),
mock.AnythingOfType("*bytes.Reader"),
filer.OverwriteIfExists,
filer.CreateParentDirectories,
).Return(nil)
err := uploadArtifact(context.Background(), b, artifact, "/Volumes/foo/bar/artifacts", mockFiler)
require.NoError(t, err)
// Test that libraries path is updated
require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[0].Whl)
require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[0].Libraries[1].Whl)
require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[0])
require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Environments[0].Spec.Dependencies[1])
require.Equal(t, "/Volumes/foo/bar/artifacts/source.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[0].Whl)
require.Equal(t, "/Volumes/some/path/mywheel.whl", b.Config.Resources.Jobs["job"].JobSettings.Tasks[1].ForEachTask.Task.Libraries[1].Whl)
}

View File

@ -44,27 +44,6 @@ func (m *build) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
}
}
// Expand any glob reference in files source path
files := make([]config.ArtifactFile, 0, len(artifact.Files))
for _, f := range artifact.Files {
matches, err := filepath.Glob(f.Source)
if err != nil {
return diag.Errorf("unable to find files for %s: %v", f.Source, err)
}
if len(matches) == 0 {
return diag.Errorf("no files found for %s", f.Source)
}
for _, match := range matches {
files = append(files, config.ArtifactFile{
Source: match,
})
}
}
artifact.Files = files
// Skip building if build command is not specified or infered
if artifact.BuildCommand == "" {
// If no build command was specified or infered and there is no
@ -72,7 +51,11 @@ func (m *build) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
if len(artifact.Files) == 0 {
return diag.Errorf("misconfigured artifact: please specify 'build' or 'files' property")
}
return nil
// We can skip calling build mutator if there is no build command
// But we still need to expand glob references in files source path.
diags := expandGlobReference(artifact)
return diags
}
// If artifact path is not provided, use bundle root dir
@ -85,5 +68,40 @@ func (m *build) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
artifact.Path = filepath.Join(dirPath, artifact.Path)
}
return bundle.Apply(ctx, b, getBuildMutator(artifact.Type, m.name))
diags := bundle.Apply(ctx, b, getBuildMutator(artifact.Type, m.name))
if diags.HasError() {
return diags
}
// We need to expand glob reference after build mutator is applied because
// if we do it before, any files that are generated by build command will
// not be included into artifact.Files and thus will not be uploaded.
d := expandGlobReference(artifact)
return diags.Extend(d)
}
func expandGlobReference(artifact *config.Artifact) diag.Diagnostics {
var diags diag.Diagnostics
// Expand any glob reference in files source path
files := make([]config.ArtifactFile, 0, len(artifact.Files))
for _, f := range artifact.Files {
matches, err := filepath.Glob(f.Source)
if err != nil {
return diags.Extend(diag.Errorf("unable to find files for %s: %v", f.Source, err))
}
if len(matches) == 0 {
return diags.Extend(diag.Errorf("no files found for %s", f.Source))
}
for _, match := range matches {
files = append(files, config.ArtifactFile{
Source: match,
})
}
}
artifact.Files = files
return diags
}

View File

@ -6,7 +6,8 @@ import (
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
)
func UploadAll() bundle.Mutator {
@ -57,12 +58,18 @@ func (m *cleanUp) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
return diag.FromErr(err)
}
b.WorkspaceClient().Workspace.Delete(ctx, workspace.Delete{
Path: uploadPath,
Recursive: true,
})
client, err := getFilerForArtifacts(b.WorkspaceClient(), uploadPath)
if err != nil {
return diag.FromErr(err)
}
err = b.WorkspaceClient().Workspace.MkdirsByPath(ctx, uploadPath)
// We intentionally ignore the error because it is not critical to the deployment
err = client.Delete(ctx, ".", filer.DeleteRecursively)
if err != nil {
log.Errorf(ctx, "failed to delete %s: %v", uploadPath, err)
}
err = client.Mkdir(ctx, ".")
if err != nil {
return diag.Errorf("unable to create directory for %s: %v", uploadPath, err)
}

View File

@ -153,3 +153,72 @@ func TestAccUploadArtifactFileToCorrectRemotePathWithEnvironments(t *testing.T)
b.Config.Resources.Jobs["test"].JobSettings.Environments[0].Spec.Dependencies[0],
)
}
func TestAccUploadArtifactFileToCorrectRemotePathForVolumes(t *testing.T) {
ctx, wt := acc.WorkspaceTest(t)
w := wt.W
if os.Getenv("TEST_METASTORE_ID") == "" {
t.Skip("Skipping tests that require a UC Volume when metastore id is not set.")
}
volumePath := internal.TemporaryUcVolume(t, w)
dir := t.TempDir()
whlPath := filepath.Join(dir, "dist", "test.whl")
touchEmptyFile(t, whlPath)
b := &bundle.Bundle{
RootPath: dir,
Config: config.Root{
Bundle: config.Bundle{
Target: "whatever",
},
Workspace: config.Workspace{
ArtifactPath: volumePath,
},
Artifacts: config.Artifacts{
"test": &config.Artifact{
Type: "whl",
Files: []config.ArtifactFile{
{
Source: whlPath,
},
},
},
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"test": {
JobSettings: &jobs.JobSettings{
Tasks: []jobs.Task{
{
Libraries: []compute.Library{
{
Whl: "dist/test.whl",
},
},
},
},
},
},
},
},
},
}
diags := bundle.Apply(ctx, b, artifacts.BasicUpload("test"))
require.NoError(t, diags.Error())
// The remote path attribute on the artifact file should have been set.
require.Regexp(t,
regexp.MustCompile(path.Join(regexp.QuoteMeta(volumePath), `.internal/test\.whl`)),
b.Config.Artifacts["test"].Files[0].RemotePath,
)
// The task library path should have been updated to the remote path.
require.Regexp(t,
regexp.MustCompile(path.Join(regexp.QuoteMeta(volumePath), `.internal/test\.whl`)),
b.Config.Resources.Jobs["test"].JobSettings.Tasks[0].Libraries[0].Whl,
)
}

View File

@ -0,0 +1,29 @@
{
"properties": {
"project_name": {
"type": "string",
"default": "my_java_project",
"description": "Unique name for this project"
},
"spark_version": {
"type": "string",
"description": "Spark version used for job cluster"
},
"node_type_id": {
"type": "string",
"description": "Node type id for job cluster"
},
"unique_id": {
"type": "string",
"description": "Unique ID for job name"
},
"root": {
"type": "string",
"description": "Path to the root of the template"
},
"artifact_path": {
"type": "string",
"description": "Path to the remote base path for artifacts"
}
}
}

View File

@ -0,0 +1,28 @@
bundle:
name: spark-jar-task
workspace:
root_path: "~/.bundle/{{.unique_id}}"
artifact_path: {{.artifact_path}}
artifacts:
my_java_code:
path: ./{{.project_name}}
build: "javac PrintArgs.java && jar cvfm PrintArgs.jar META-INF/MANIFEST.MF PrintArgs.class"
files:
- source: ./{{.project_name}}/PrintArgs.jar
resources:
jobs:
jar_job:
name: "[${bundle.target}] Test Spark Jar Job {{.unique_id}}"
tasks:
- task_key: TestSparkJarTask
new_cluster:
num_workers: 1
spark_version: "{{.spark_version}}"
node_type_id: "{{.node_type_id}}"
spark_jar_task:
main_class_name: PrintArgs
libraries:
- jar: ./{{.project_name}}/PrintArgs.jar

View File

@ -0,0 +1 @@
Main-Class: PrintArgs

View File

@ -0,0 +1,8 @@
import java.util.Arrays;
public class PrintArgs {
public static void main(String[] args) {
System.out.println("Hello from Jar!");
System.out.println(Arrays.toString(args));
}
}

View File

@ -21,9 +21,13 @@ import (
const defaultSparkVersion = "13.3.x-snapshot-scala2.12"
func initTestTemplate(t *testing.T, ctx context.Context, templateName string, config map[string]any) (string, error) {
bundleRoot := t.TempDir()
return initTestTemplateWithBundleRoot(t, ctx, templateName, config, bundleRoot)
}
func initTestTemplateWithBundleRoot(t *testing.T, ctx context.Context, templateName string, config map[string]any, bundleRoot string) (string, error) {
templateRoot := filepath.Join("bundles", templateName)
bundleRoot := t.TempDir()
configFilePath, err := writeConfigFile(t, config)
if err != nil {
return "", err

View File

@ -0,0 +1,52 @@
package bundle
import (
"os"
"testing"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/internal/acc"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
func runSparkJarTest(t *testing.T, sparkVersion string) {
t.Skip("Temporarily skipping the test until auth / permission issues for UC volumes are resolved.")
env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
t.Log(env)
if os.Getenv("TEST_METASTORE_ID") == "" {
t.Skip("Skipping tests that require a UC Volume when metastore id is not set.")
}
ctx, wt := acc.WorkspaceTest(t)
w := wt.W
volumePath := internal.TemporaryUcVolume(t, w)
nodeTypeId := internal.GetNodeTypeId(env)
tmpDir := t.TempDir()
bundleRoot, err := initTestTemplateWithBundleRoot(t, ctx, "spark_jar_task", map[string]any{
"node_type_id": nodeTypeId,
"unique_id": uuid.New().String(),
"spark_version": sparkVersion,
"root": tmpDir,
"artifact_path": volumePath,
}, tmpDir)
require.NoError(t, err)
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)
t.Cleanup(func() {
destroyBundle(t, ctx, bundleRoot)
})
out, err := runResource(t, ctx, bundleRoot, "jar_job")
require.NoError(t, err)
require.Contains(t, out, "Hello from Jar!")
}
func TestAccSparkJarTaskDeployAndRunOnVolumes(t *testing.T) {
runSparkJarTest(t, "14.3.x-scala2.12")
}

View File

@ -472,7 +472,7 @@ func TemporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string {
}
// Create a new UC volume in a catalog called "main" in the workspace.
func temporaryUcVolume(t *testing.T, w *databricks.WorkspaceClient) string {
func TemporaryUcVolume(t *testing.T, w *databricks.WorkspaceClient) string {
ctx := context.Background()
// Create a schema
@ -607,7 +607,7 @@ func setupUcVolumesFiler(t *testing.T) (filer.Filer, string) {
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
tmpDir := temporaryUcVolume(t, w)
tmpDir := TemporaryUcVolume(t, w)
f, err := filer.NewFilesClient(w, tmpDir)
require.NoError(t, err)