mirror of https://github.com/databricks/cli.git
Added test to submit and run various Python tasks on multiple DBR versions (#806)
## Changes These tests allow us to get information for execution context (PYTHONPATH, CWD) for various Python tasks and different cluster setups. Note: this test won't be executed automatically as part of nightly builds since it requires RUN_PYTHON_TASKS_TEST env to be executed. ## Tests Integration test run successfully. --------- Co-authored-by: Pieter Noordhuis <pieter.noordhuis@databricks.com>
This commit is contained in:
parent
452565cbd3
commit
79e271f859
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
|
@ -15,8 +14,6 @@ import (
|
|||
"github.com/databricks/cli/libs/filer"
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/service/files"
|
||||
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -209,41 +206,12 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) {
|
|||
assert.False(t, entries[0].IsDir())
|
||||
}
|
||||
|
||||
func temporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
ctx := context.Background()
|
||||
me, err := w.CurrentUser.Me(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
path := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-wsfs-"))
|
||||
|
||||
// Ensure directory exists, but doesn't exist YET!
|
||||
// Otherwise we could inadvertently remove a directory that already exists on cleanup.
|
||||
t.Logf("mkdir %s", path)
|
||||
err = w.Workspace.MkdirsByPath(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Remove test directory on test completion.
|
||||
t.Cleanup(func() {
|
||||
t.Logf("rm -rf %s", path)
|
||||
err := w.Workspace.Delete(ctx, workspace.Delete{
|
||||
Path: path,
|
||||
Recursive: true,
|
||||
})
|
||||
if err == nil || apierr.IsMissing(err) {
|
||||
return
|
||||
}
|
||||
t.Logf("unable to remove temporary workspace directory %s: %#v", path, err)
|
||||
})
|
||||
|
||||
return path
|
||||
}
|
||||
|
||||
func setupWorkspaceFilesTest(t *testing.T) (context.Context, filer.Filer) {
|
||||
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||
|
||||
ctx := context.Background()
|
||||
w := databricks.Must(databricks.NewWorkspaceClient())
|
||||
tmpdir := temporaryWorkspaceDir(t, w)
|
||||
tmpdir := TemporaryWorkspaceDir(t, w)
|
||||
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -267,37 +235,12 @@ func TestAccFilerWorkspaceFilesReadDir(t *testing.T) {
|
|||
runFilerReadDirTest(t, ctx, f)
|
||||
}
|
||||
|
||||
func temporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
ctx := context.Background()
|
||||
path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-dbfs-"))
|
||||
|
||||
// This call fails if the path already exists.
|
||||
t.Logf("mkdir dbfs:%s", path)
|
||||
err := w.Dbfs.MkdirsByPath(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Remove test directory on test completion.
|
||||
t.Cleanup(func() {
|
||||
t.Logf("rm -rf dbfs:%s", path)
|
||||
err := w.Dbfs.Delete(ctx, files.Delete{
|
||||
Path: path,
|
||||
Recursive: true,
|
||||
})
|
||||
if err == nil || apierr.IsMissing(err) {
|
||||
return
|
||||
}
|
||||
t.Logf("unable to remove temporary dbfs directory %s: %#v", path, err)
|
||||
})
|
||||
|
||||
return path
|
||||
}
|
||||
|
||||
func setupFilerDbfsTest(t *testing.T) (context.Context, filer.Filer) {
|
||||
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||
|
||||
ctx := context.Background()
|
||||
w := databricks.Must(databricks.NewWorkspaceClient())
|
||||
tmpdir := temporaryDbfsDir(t, w)
|
||||
tmpdir := TemporaryDbfsDir(t, w)
|
||||
f, err := filer.NewDbfsClient(w, tmpdir)
|
||||
require.NoError(t, err)
|
||||
return ctx, f
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestAccFsCatForDbfs(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
@ -54,7 +54,7 @@ func TestAccFsCatDoesNotSupportOutputModeJson(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -75,7 +75,7 @@ func setupDbfsFiler(t *testing.T) (filer.Filer, string) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -256,7 +256,7 @@ func TestAccFsCpErrorsWhenSourceIsDirWithoutRecursiveFlag(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
_, _, err = RequireErrorRun(t, "fs", "cp", "dbfs:"+tmpDir, "dbfs:/tmp")
|
||||
assert.Equal(t, fmt.Sprintf("source path %s is a directory. Please specify the --recursive flag", tmpDir), err.Error())
|
||||
|
|
|
@ -23,7 +23,7 @@ func TestAccFsLsForDbfs(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
@ -58,7 +58,7 @@ func TestAccFsLsForDbfsWithAbsolutePaths(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
@ -94,7 +94,7 @@ func TestAccFsLsForDbfsOnFile(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
@ -114,7 +114,7 @@ func TestAccFsLsForDbfsOnEmptyDir(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json")
|
||||
assert.Equal(t, "", stderr.String())
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestAccFsMkdirCreatesDirectory(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
@ -44,7 +44,7 @@ func TestAccFsMkdirCreatesMultipleDirectories(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
@ -80,7 +80,7 @@ func TestAccFsMkdirWhenDirectoryAlreadyExists(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
// create directory "a"
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
|
@ -101,7 +101,7 @@ func TestAccFsMkdirWhenFileExistsAtPath(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
// create file hello
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
|
|
|
@ -20,7 +20,7 @@ func TestAccFsRmForFile(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
@ -52,7 +52,7 @@ func TestAccFsRmForEmptyDirectory(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
@ -84,7 +84,7 @@ func TestAccFsRmForNonEmptyDirectory(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
@ -120,7 +120,7 @@ func TestAccFsRmForNonEmptyDirectoryWithRecursiveFlag(t *testing.T) {
|
|||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
tmpDir := temporaryDbfsDir(t, w)
|
||||
tmpDir := TemporaryDbfsDir(t, w)
|
||||
|
||||
f, err := filer.NewDbfsClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -17,6 +17,12 @@ import (
|
|||
|
||||
"github.com/databricks/cli/cmd"
|
||||
_ "github.com/databricks/cli/cmd/version"
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||
"github.com/databricks/databricks-sdk-go/service/files"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
@ -272,3 +278,158 @@ func writeFile(t *testing.T, name string, body string) string {
|
|||
f.Close()
|
||||
return f.Name()
|
||||
}
|
||||
|
||||
func GenerateNotebookTasks(notebookPath string, versions []string, nodeTypeId string) []jobs.SubmitTask {
|
||||
tasks := make([]jobs.SubmitTask, 0)
|
||||
for i := 0; i < len(versions); i++ {
|
||||
task := jobs.SubmitTask{
|
||||
TaskKey: fmt.Sprintf("notebook_%s", strings.ReplaceAll(versions[i], ".", "_")),
|
||||
NotebookTask: &jobs.NotebookTask{
|
||||
NotebookPath: notebookPath,
|
||||
},
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
SparkVersion: versions[i],
|
||||
NumWorkers: 1,
|
||||
NodeTypeId: nodeTypeId,
|
||||
DataSecurityMode: compute.DataSecurityModeUserIsolation,
|
||||
},
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
|
||||
return tasks
|
||||
}
|
||||
|
||||
func GenerateSparkPythonTasks(notebookPath string, versions []string, nodeTypeId string) []jobs.SubmitTask {
|
||||
tasks := make([]jobs.SubmitTask, 0)
|
||||
for i := 0; i < len(versions); i++ {
|
||||
task := jobs.SubmitTask{
|
||||
TaskKey: fmt.Sprintf("spark_%s", strings.ReplaceAll(versions[i], ".", "_")),
|
||||
SparkPythonTask: &jobs.SparkPythonTask{
|
||||
PythonFile: notebookPath,
|
||||
},
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
SparkVersion: versions[i],
|
||||
NumWorkers: 1,
|
||||
NodeTypeId: nodeTypeId,
|
||||
DataSecurityMode: compute.DataSecurityModeUserIsolation,
|
||||
},
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
|
||||
return tasks
|
||||
}
|
||||
|
||||
func GenerateWheelTasks(wheelPath string, versions []string, nodeTypeId string) []jobs.SubmitTask {
|
||||
tasks := make([]jobs.SubmitTask, 0)
|
||||
for i := 0; i < len(versions); i++ {
|
||||
task := jobs.SubmitTask{
|
||||
TaskKey: fmt.Sprintf("whl_%s", strings.ReplaceAll(versions[i], ".", "_")),
|
||||
PythonWheelTask: &jobs.PythonWheelTask{
|
||||
PackageName: "my_test_code",
|
||||
EntryPoint: "run",
|
||||
},
|
||||
NewCluster: &compute.ClusterSpec{
|
||||
SparkVersion: versions[i],
|
||||
NumWorkers: 1,
|
||||
NodeTypeId: nodeTypeId,
|
||||
DataSecurityMode: compute.DataSecurityModeUserIsolation,
|
||||
},
|
||||
Libraries: []compute.Library{
|
||||
{Whl: wheelPath},
|
||||
},
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
|
||||
return tasks
|
||||
}
|
||||
|
||||
func TemporaryWorkspaceDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
ctx := context.Background()
|
||||
me, err := w.CurrentUser.Me(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
basePath := fmt.Sprintf("/Users/%s/%s", me.UserName, RandomName("integration-test-wsfs-"))
|
||||
|
||||
t.Logf("Creating %s", basePath)
|
||||
err = w.Workspace.MkdirsByPath(ctx, basePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Remove test directory on test completion.
|
||||
t.Cleanup(func() {
|
||||
t.Logf("Removing %s", basePath)
|
||||
err := w.Workspace.Delete(ctx, workspace.Delete{
|
||||
Path: basePath,
|
||||
Recursive: true,
|
||||
})
|
||||
if err == nil || apierr.IsMissing(err) {
|
||||
return
|
||||
}
|
||||
t.Logf("Unable to remove temporary workspace directory %s: %#v", basePath, err)
|
||||
})
|
||||
|
||||
return basePath
|
||||
}
|
||||
|
||||
func TemporaryDbfsDir(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
ctx := context.Background()
|
||||
path := fmt.Sprintf("/tmp/%s", RandomName("integration-test-dbfs-"))
|
||||
|
||||
t.Logf("Creating DBFS folder:%s", path)
|
||||
err := w.Dbfs.MkdirsByPath(ctx, path)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
t.Logf("Removing DBFS folder:%s", path)
|
||||
err := w.Dbfs.Delete(ctx, files.Delete{
|
||||
Path: path,
|
||||
Recursive: true,
|
||||
})
|
||||
if err == nil || apierr.IsMissing(err) {
|
||||
return
|
||||
}
|
||||
t.Logf("unable to remove temporary dbfs directory %s: %#v", path, err)
|
||||
})
|
||||
|
||||
return path
|
||||
}
|
||||
|
||||
func TemporaryRepo(t *testing.T, w *databricks.WorkspaceClient) string {
|
||||
ctx := context.Background()
|
||||
me, err := w.CurrentUser.Me(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
repoPath := fmt.Sprintf("/Repos/%s/%s", me.UserName, RandomName("integration-test-repo-"))
|
||||
|
||||
t.Logf("Creating repo:%s", repoPath)
|
||||
repoInfo, err := w.Repos.Create(ctx, workspace.CreateRepo{
|
||||
Url: "https://github.com/databricks/cli",
|
||||
Provider: "github",
|
||||
Path: repoPath,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
t.Logf("Removing repo: %s", repoPath)
|
||||
err := w.Repos.Delete(ctx, workspace.DeleteRepoRequest{
|
||||
RepoId: repoInfo.Id,
|
||||
})
|
||||
if err == nil || apierr.IsMissing(err) {
|
||||
return
|
||||
}
|
||||
t.Logf("unable to remove repo %s: %#v", repoPath, err)
|
||||
})
|
||||
|
||||
return repoPath
|
||||
}
|
||||
|
||||
func GetNodeTypeId(env string) string {
|
||||
if env == "gcp" {
|
||||
return "n1-standard-4"
|
||||
} else if env == "aws" {
|
||||
return "i3.xlarge"
|
||||
}
|
||||
return "Standard_DS4_v2"
|
||||
}
|
||||
|
|
|
@ -169,7 +169,7 @@ func setupLockerTest(ctx context.Context, t *testing.T) (*lockpkg.Locker, filer.
|
|||
require.NoError(t, err)
|
||||
|
||||
// create temp wsfs dir
|
||||
tmpDir := temporaryWorkspaceDir(t, w)
|
||||
tmpDir := TemporaryWorkspaceDir(t, w)
|
||||
f, err := filer.NewWorkspaceFilesClient(w, tmpDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
@ -0,0 +1,267 @@
|
|||
package python
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/databricks/cli/bundle/run/output"
|
||||
"github.com/databricks/cli/internal"
|
||||
"github.com/databricks/cli/libs/filer"
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||||
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const PY_CONTENT = `# Databricks notebook source
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
|
||||
out = {"PYTHONPATH": sys.path, "CWD": os.getcwd()}
|
||||
json_object = json.dumps(out, indent = 4)
|
||||
dbutils.notebook.exit(json_object)
|
||||
`
|
||||
|
||||
const SPARK_PY_CONTENT = `
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
|
||||
out = {"PYTHONPATH": sys.path, "CWD": os.getcwd()}
|
||||
json_object = json.dumps(out, indent = 4)
|
||||
print(json_object)
|
||||
`
|
||||
|
||||
type testOutput struct {
|
||||
PythonPath []string `json:"PYTHONPATH"`
|
||||
Cwd string `json:"CWD"`
|
||||
}
|
||||
|
||||
type testFiles struct {
|
||||
w *databricks.WorkspaceClient
|
||||
pyNotebookPath string
|
||||
sparkPythonPath string
|
||||
wheelPath string
|
||||
}
|
||||
|
||||
type testOpts struct {
|
||||
name string
|
||||
includeNotebookTasks bool
|
||||
includeSparkPythonTasks bool
|
||||
includeWheelTasks bool
|
||||
wheelSparkVersions []string
|
||||
}
|
||||
|
||||
var sparkVersions = []string{
|
||||
"11.3.x-scala2.12",
|
||||
"12.2.x-scala2.12",
|
||||
"13.0.x-scala2.12",
|
||||
"13.1.x-scala2.12",
|
||||
"13.2.x-scala2.12",
|
||||
"13.3.x-scala2.12",
|
||||
"14.0.x-scala2.12",
|
||||
"14.1.x-scala2.12",
|
||||
}
|
||||
|
||||
func TestAccRunPythonTaskWorkspace(t *testing.T) {
|
||||
// TODO: remove RUN_PYTHON_TASKS_TEST when ready to be executed as part of nightly
|
||||
internal.GetEnvOrSkipTest(t, "RUN_PYTHON_TASKS_TEST")
|
||||
internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
|
||||
|
||||
unsupportedSparkVersionsForWheel := []string{
|
||||
"11.3.x-scala2.12",
|
||||
"12.2.x-scala2.12",
|
||||
"13.0.x-scala2.12",
|
||||
}
|
||||
runPythonTasks(t, prepareWorkspaceFiles(t), testOpts{
|
||||
name: "Python tasks from WSFS",
|
||||
includeNotebookTasks: true,
|
||||
includeSparkPythonTasks: true,
|
||||
includeWheelTasks: true,
|
||||
wheelSparkVersions: slices.DeleteFunc(slices.Clone(sparkVersions), func(s string) bool {
|
||||
return slices.Contains(unsupportedSparkVersionsForWheel, s)
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
func TestAccRunPythonTaskDBFS(t *testing.T) {
|
||||
// TODO: remove RUN_PYTHON_TASKS_TEST when ready to be executed as part of nightly
|
||||
internal.GetEnvOrSkipTest(t, "RUN_PYTHON_TASKS_TEST")
|
||||
internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
|
||||
|
||||
runPythonTasks(t, prepareDBFSFiles(t), testOpts{
|
||||
name: "Python tasks from DBFS",
|
||||
includeNotebookTasks: false,
|
||||
includeSparkPythonTasks: true,
|
||||
includeWheelTasks: true,
|
||||
})
|
||||
}
|
||||
|
||||
func TestAccRunPythonTaskRepo(t *testing.T) {
|
||||
// TODO: remove RUN_PYTHON_TASKS_TEST when ready to be executed as part of nightly
|
||||
internal.GetEnvOrSkipTest(t, "RUN_PYTHON_TASKS_TEST")
|
||||
internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
|
||||
|
||||
runPythonTasks(t, prepareRepoFiles(t), testOpts{
|
||||
name: "Python tasks from Repo",
|
||||
includeNotebookTasks: true,
|
||||
includeSparkPythonTasks: true,
|
||||
includeWheelTasks: false,
|
||||
})
|
||||
}
|
||||
|
||||
func runPythonTasks(t *testing.T, tw *testFiles, opts testOpts) {
|
||||
env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
|
||||
t.Log(env)
|
||||
|
||||
w := tw.w
|
||||
|
||||
nodeTypeId := internal.GetNodeTypeId(env)
|
||||
tasks := make([]jobs.SubmitTask, 0)
|
||||
if opts.includeNotebookTasks {
|
||||
tasks = append(tasks, internal.GenerateNotebookTasks(tw.pyNotebookPath, sparkVersions, nodeTypeId)...)
|
||||
}
|
||||
|
||||
if opts.includeSparkPythonTasks {
|
||||
tasks = append(tasks, internal.GenerateSparkPythonTasks(tw.sparkPythonPath, sparkVersions, nodeTypeId)...)
|
||||
}
|
||||
|
||||
if opts.includeWheelTasks {
|
||||
versions := sparkVersions
|
||||
if len(opts.wheelSparkVersions) > 0 {
|
||||
versions = opts.wheelSparkVersions
|
||||
}
|
||||
tasks = append(tasks, internal.GenerateWheelTasks(tw.wheelPath, versions, nodeTypeId)...)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
run, err := w.Jobs.Submit(ctx, jobs.SubmitRun{
|
||||
RunName: opts.name,
|
||||
Tasks: tasks,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = w.Jobs.WaitGetRunJobTerminatedOrSkipped(ctx, run.RunId, time.Hour, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
output, err := output.GetJobOutput(ctx, w, run.RunId)
|
||||
require.NoError(t, err)
|
||||
|
||||
result := make(map[string]testOutput, 0)
|
||||
for _, out := range output.TaskOutputs {
|
||||
s, err := out.Output.String()
|
||||
require.NoError(t, err)
|
||||
|
||||
tOut := testOutput{}
|
||||
err = json.Unmarshal([]byte(s), &tOut)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
result[out.TaskKey] = tOut
|
||||
}
|
||||
|
||||
out, err := json.MarshalIndent(result, "", " ")
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Log("==== Run output ====")
|
||||
t.Log(string(out))
|
||||
}
|
||||
|
||||
func prepareWorkspaceFiles(t *testing.T) *testFiles {
|
||||
ctx := context.Background()
|
||||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
baseDir := internal.TemporaryWorkspaceDir(t, w)
|
||||
pyNotebookPath := path.Join(baseDir, "test.py")
|
||||
|
||||
err = w.Workspace.Import(ctx, workspace.Import{
|
||||
Path: pyNotebookPath,
|
||||
Overwrite: true,
|
||||
Language: workspace.LanguagePython,
|
||||
Format: workspace.ImportFormatSource,
|
||||
Content: base64.StdEncoding.EncodeToString([]byte(PY_CONTENT)),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
sparkPythonPath := path.Join(baseDir, "spark.py")
|
||||
err = w.Workspace.Import(ctx, workspace.Import{
|
||||
Path: sparkPythonPath,
|
||||
Overwrite: true,
|
||||
Format: workspace.ImportFormatAuto,
|
||||
Content: base64.StdEncoding.EncodeToString([]byte(SPARK_PY_CONTENT)),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
raw, err := os.ReadFile("./testdata/my_test_code-0.0.1-py3-none-any.whl")
|
||||
require.NoError(t, err)
|
||||
|
||||
wheelPath := path.Join(baseDir, "my_test_code-0.0.1-py3-none-any.whl")
|
||||
err = w.Workspace.Import(ctx, workspace.Import{
|
||||
Path: path.Join(baseDir, "my_test_code-0.0.1-py3-none-any.whl"),
|
||||
Overwrite: true,
|
||||
Format: workspace.ImportFormatAuto,
|
||||
Content: base64.StdEncoding.EncodeToString(raw),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
return &testFiles{
|
||||
w: w,
|
||||
pyNotebookPath: pyNotebookPath,
|
||||
sparkPythonPath: sparkPythonPath,
|
||||
wheelPath: path.Join("/Workspace", wheelPath),
|
||||
}
|
||||
}
|
||||
|
||||
func prepareDBFSFiles(t *testing.T) *testFiles {
|
||||
ctx := context.Background()
|
||||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
baseDir := internal.TemporaryDbfsDir(t, w)
|
||||
f, err := filer.NewDbfsClient(w, baseDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = f.Write(ctx, "test.py", strings.NewReader(PY_CONTENT))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = f.Write(ctx, "spark.py", strings.NewReader(SPARK_PY_CONTENT))
|
||||
require.NoError(t, err)
|
||||
|
||||
raw, err := os.ReadFile("./testdata/my_test_code-0.0.1-py3-none-any.whl")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = f.Write(ctx, "my_test_code-0.0.1-py3-none-any.whl", bytes.NewReader(raw))
|
||||
require.NoError(t, err)
|
||||
|
||||
return &testFiles{
|
||||
w: w,
|
||||
pyNotebookPath: path.Join(baseDir, "test.py"),
|
||||
sparkPythonPath: fmt.Sprintf("dbfs:%s", path.Join(baseDir, "spark.py")),
|
||||
wheelPath: fmt.Sprintf("dbfs:%s", path.Join(baseDir, "my_test_code-0.0.1-py3-none-any.whl")),
|
||||
}
|
||||
}
|
||||
|
||||
func prepareRepoFiles(t *testing.T) *testFiles {
|
||||
w, err := databricks.NewWorkspaceClient()
|
||||
require.NoError(t, err)
|
||||
|
||||
repo := internal.TemporaryRepo(t, w)
|
||||
packagePath := "internal/python/testdata"
|
||||
return &testFiles{
|
||||
w: w,
|
||||
pyNotebookPath: path.Join(repo, packagePath, "test"),
|
||||
sparkPythonPath: path.Join(repo, packagePath, "spark.py"),
|
||||
wheelPath: path.Join(repo, packagePath, "my_test_code-0.0.1-py3-none-any.whl"),
|
||||
}
|
||||
}
|
Binary file not shown.
|
@ -0,0 +1,7 @@
|
|||
import os
|
||||
import sys
|
||||
import json
|
||||
|
||||
out = {"PYTHONPATH": sys.path, "CWD": os.getcwd()}
|
||||
json_object = json.dumps(out, indent=4)
|
||||
print(json_object)
|
|
@ -0,0 +1,8 @@
|
|||
# Databricks notebook source
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
|
||||
out = {"PYTHONPATH": sys.path, "CWD": os.getcwd()}
|
||||
json_object = json.dumps(out, indent=4)
|
||||
dbutils.notebook.exit(json_object)
|
|
@ -75,7 +75,7 @@ func setupSyncTest(t *testing.T, args ...string) *syncTest {
|
|||
|
||||
w := databricks.Must(databricks.NewWorkspaceClient())
|
||||
localRoot := t.TempDir()
|
||||
remoteRoot := temporaryWorkspaceDir(t, w)
|
||||
remoteRoot := TemporaryWorkspaceDir(t, w)
|
||||
f, err := filer.NewWorkspaceFilesClient(w, remoteRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ func TestWorkpaceExportPrintsContents(t *testing.T) {
|
|||
|
||||
ctx := context.Background()
|
||||
w := databricks.Must(databricks.NewWorkspaceClient())
|
||||
tmpdir := temporaryWorkspaceDir(t, w)
|
||||
tmpdir := TemporaryWorkspaceDir(t, w)
|
||||
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -65,7 +65,7 @@ func setupWorkspaceImportExportTest(t *testing.T) (context.Context, filer.Filer,
|
|||
|
||||
ctx := context.Background()
|
||||
w := databricks.Must(databricks.NewWorkspaceClient())
|
||||
tmpdir := temporaryWorkspaceDir(t, w)
|
||||
tmpdir := TemporaryWorkspaceDir(t, w)
|
||||
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
|
Loading…
Reference in New Issue