mirror of https://github.com/databricks/cli.git
268 lines
7.2 KiB
Go
268 lines
7.2 KiB
Go
|
package python
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"context"
|
||
|
"encoding/base64"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"os"
|
||
|
"path"
|
||
|
"slices"
|
||
|
"strings"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"github.com/databricks/cli/bundle/run/output"
|
||
|
"github.com/databricks/cli/internal"
|
||
|
"github.com/databricks/cli/libs/filer"
|
||
|
"github.com/databricks/databricks-sdk-go"
|
||
|
"github.com/databricks/databricks-sdk-go/service/jobs"
|
||
|
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||
|
"github.com/stretchr/testify/require"
|
||
|
)
|
||
|
|
||
|
const PY_CONTENT = `# Databricks notebook source
|
||
|
import os
|
||
|
import sys
|
||
|
import json
|
||
|
|
||
|
out = {"PYTHONPATH": sys.path, "CWD": os.getcwd()}
|
||
|
json_object = json.dumps(out, indent = 4)
|
||
|
dbutils.notebook.exit(json_object)
|
||
|
`
|
||
|
|
||
|
const SPARK_PY_CONTENT = `
|
||
|
import os
|
||
|
import sys
|
||
|
import json
|
||
|
|
||
|
out = {"PYTHONPATH": sys.path, "CWD": os.getcwd()}
|
||
|
json_object = json.dumps(out, indent = 4)
|
||
|
print(json_object)
|
||
|
`
|
||
|
|
||
|
type testOutput struct {
|
||
|
PythonPath []string `json:"PYTHONPATH"`
|
||
|
Cwd string `json:"CWD"`
|
||
|
}
|
||
|
|
||
|
type testFiles struct {
|
||
|
w *databricks.WorkspaceClient
|
||
|
pyNotebookPath string
|
||
|
sparkPythonPath string
|
||
|
wheelPath string
|
||
|
}
|
||
|
|
||
|
type testOpts struct {
|
||
|
name string
|
||
|
includeNotebookTasks bool
|
||
|
includeSparkPythonTasks bool
|
||
|
includeWheelTasks bool
|
||
|
wheelSparkVersions []string
|
||
|
}
|
||
|
|
||
|
var sparkVersions = []string{
|
||
|
"11.3.x-scala2.12",
|
||
|
"12.2.x-scala2.12",
|
||
|
"13.0.x-scala2.12",
|
||
|
"13.1.x-scala2.12",
|
||
|
"13.2.x-scala2.12",
|
||
|
"13.3.x-scala2.12",
|
||
|
"14.0.x-scala2.12",
|
||
|
"14.1.x-scala2.12",
|
||
|
}
|
||
|
|
||
|
func TestAccRunPythonTaskWorkspace(t *testing.T) {
|
||
|
// TODO: remove RUN_PYTHON_TASKS_TEST when ready to be executed as part of nightly
|
||
|
internal.GetEnvOrSkipTest(t, "RUN_PYTHON_TASKS_TEST")
|
||
|
internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
|
||
|
|
||
|
unsupportedSparkVersionsForWheel := []string{
|
||
|
"11.3.x-scala2.12",
|
||
|
"12.2.x-scala2.12",
|
||
|
"13.0.x-scala2.12",
|
||
|
}
|
||
|
runPythonTasks(t, prepareWorkspaceFiles(t), testOpts{
|
||
|
name: "Python tasks from WSFS",
|
||
|
includeNotebookTasks: true,
|
||
|
includeSparkPythonTasks: true,
|
||
|
includeWheelTasks: true,
|
||
|
wheelSparkVersions: slices.DeleteFunc(slices.Clone(sparkVersions), func(s string) bool {
|
||
|
return slices.Contains(unsupportedSparkVersionsForWheel, s)
|
||
|
}),
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func TestAccRunPythonTaskDBFS(t *testing.T) {
|
||
|
// TODO: remove RUN_PYTHON_TASKS_TEST when ready to be executed as part of nightly
|
||
|
internal.GetEnvOrSkipTest(t, "RUN_PYTHON_TASKS_TEST")
|
||
|
internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
|
||
|
|
||
|
runPythonTasks(t, prepareDBFSFiles(t), testOpts{
|
||
|
name: "Python tasks from DBFS",
|
||
|
includeNotebookTasks: false,
|
||
|
includeSparkPythonTasks: true,
|
||
|
includeWheelTasks: true,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func TestAccRunPythonTaskRepo(t *testing.T) {
|
||
|
// TODO: remove RUN_PYTHON_TASKS_TEST when ready to be executed as part of nightly
|
||
|
internal.GetEnvOrSkipTest(t, "RUN_PYTHON_TASKS_TEST")
|
||
|
internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
|
||
|
|
||
|
runPythonTasks(t, prepareRepoFiles(t), testOpts{
|
||
|
name: "Python tasks from Repo",
|
||
|
includeNotebookTasks: true,
|
||
|
includeSparkPythonTasks: true,
|
||
|
includeWheelTasks: false,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func runPythonTasks(t *testing.T, tw *testFiles, opts testOpts) {
|
||
|
env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
|
||
|
t.Log(env)
|
||
|
|
||
|
w := tw.w
|
||
|
|
||
|
nodeTypeId := internal.GetNodeTypeId(env)
|
||
|
tasks := make([]jobs.SubmitTask, 0)
|
||
|
if opts.includeNotebookTasks {
|
||
|
tasks = append(tasks, internal.GenerateNotebookTasks(tw.pyNotebookPath, sparkVersions, nodeTypeId)...)
|
||
|
}
|
||
|
|
||
|
if opts.includeSparkPythonTasks {
|
||
|
tasks = append(tasks, internal.GenerateSparkPythonTasks(tw.sparkPythonPath, sparkVersions, nodeTypeId)...)
|
||
|
}
|
||
|
|
||
|
if opts.includeWheelTasks {
|
||
|
versions := sparkVersions
|
||
|
if len(opts.wheelSparkVersions) > 0 {
|
||
|
versions = opts.wheelSparkVersions
|
||
|
}
|
||
|
tasks = append(tasks, internal.GenerateWheelTasks(tw.wheelPath, versions, nodeTypeId)...)
|
||
|
}
|
||
|
|
||
|
ctx := context.Background()
|
||
|
run, err := w.Jobs.Submit(ctx, jobs.SubmitRun{
|
||
|
RunName: opts.name,
|
||
|
Tasks: tasks,
|
||
|
})
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
_, err = w.Jobs.WaitGetRunJobTerminatedOrSkipped(ctx, run.RunId, time.Hour, nil)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
output, err := output.GetJobOutput(ctx, w, run.RunId)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
result := make(map[string]testOutput, 0)
|
||
|
for _, out := range output.TaskOutputs {
|
||
|
s, err := out.Output.String()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
tOut := testOutput{}
|
||
|
err = json.Unmarshal([]byte(s), &tOut)
|
||
|
if err != nil {
|
||
|
continue
|
||
|
}
|
||
|
result[out.TaskKey] = tOut
|
||
|
}
|
||
|
|
||
|
out, err := json.MarshalIndent(result, "", " ")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
t.Log("==== Run output ====")
|
||
|
t.Log(string(out))
|
||
|
}
|
||
|
|
||
|
func prepareWorkspaceFiles(t *testing.T) *testFiles {
|
||
|
ctx := context.Background()
|
||
|
w, err := databricks.NewWorkspaceClient()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
baseDir := internal.TemporaryWorkspaceDir(t, w)
|
||
|
pyNotebookPath := path.Join(baseDir, "test.py")
|
||
|
|
||
|
err = w.Workspace.Import(ctx, workspace.Import{
|
||
|
Path: pyNotebookPath,
|
||
|
Overwrite: true,
|
||
|
Language: workspace.LanguagePython,
|
||
|
Format: workspace.ImportFormatSource,
|
||
|
Content: base64.StdEncoding.EncodeToString([]byte(PY_CONTENT)),
|
||
|
})
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
sparkPythonPath := path.Join(baseDir, "spark.py")
|
||
|
err = w.Workspace.Import(ctx, workspace.Import{
|
||
|
Path: sparkPythonPath,
|
||
|
Overwrite: true,
|
||
|
Format: workspace.ImportFormatAuto,
|
||
|
Content: base64.StdEncoding.EncodeToString([]byte(SPARK_PY_CONTENT)),
|
||
|
})
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
raw, err := os.ReadFile("./testdata/my_test_code-0.0.1-py3-none-any.whl")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
wheelPath := path.Join(baseDir, "my_test_code-0.0.1-py3-none-any.whl")
|
||
|
err = w.Workspace.Import(ctx, workspace.Import{
|
||
|
Path: path.Join(baseDir, "my_test_code-0.0.1-py3-none-any.whl"),
|
||
|
Overwrite: true,
|
||
|
Format: workspace.ImportFormatAuto,
|
||
|
Content: base64.StdEncoding.EncodeToString(raw),
|
||
|
})
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
return &testFiles{
|
||
|
w: w,
|
||
|
pyNotebookPath: pyNotebookPath,
|
||
|
sparkPythonPath: sparkPythonPath,
|
||
|
wheelPath: path.Join("/Workspace", wheelPath),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func prepareDBFSFiles(t *testing.T) *testFiles {
|
||
|
ctx := context.Background()
|
||
|
w, err := databricks.NewWorkspaceClient()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
baseDir := internal.TemporaryDbfsDir(t, w)
|
||
|
f, err := filer.NewDbfsClient(w, baseDir)
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
err = f.Write(ctx, "test.py", strings.NewReader(PY_CONTENT))
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
err = f.Write(ctx, "spark.py", strings.NewReader(SPARK_PY_CONTENT))
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
raw, err := os.ReadFile("./testdata/my_test_code-0.0.1-py3-none-any.whl")
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
err = f.Write(ctx, "my_test_code-0.0.1-py3-none-any.whl", bytes.NewReader(raw))
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
return &testFiles{
|
||
|
w: w,
|
||
|
pyNotebookPath: path.Join(baseDir, "test.py"),
|
||
|
sparkPythonPath: fmt.Sprintf("dbfs:%s", path.Join(baseDir, "spark.py")),
|
||
|
wheelPath: fmt.Sprintf("dbfs:%s", path.Join(baseDir, "my_test_code-0.0.1-py3-none-any.whl")),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func prepareRepoFiles(t *testing.T) *testFiles {
|
||
|
w, err := databricks.NewWorkspaceClient()
|
||
|
require.NoError(t, err)
|
||
|
|
||
|
repo := internal.TemporaryRepo(t, w)
|
||
|
packagePath := "internal/python/testdata"
|
||
|
return &testFiles{
|
||
|
w: w,
|
||
|
pyNotebookPath: path.Join(repo, packagePath, "test"),
|
||
|
sparkPythonPath: path.Join(repo, packagePath, "spark.py"),
|
||
|
wheelPath: path.Join(repo, packagePath, "my_test_code-0.0.1-py3-none-any.whl"),
|
||
|
}
|
||
|
}
|