diff --git a/bundle/config/mutator/trampoline.go b/bundle/config/mutator/trampoline.go new file mode 100644 index 000000000..6265c28d5 --- /dev/null +++ b/bundle/config/mutator/trampoline.go @@ -0,0 +1,95 @@ +package mutator + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + "text/template" + + "github.com/databricks/cli/bundle" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +type fnTemplateData func(task *jobs.Task) (map[string]any, error) +type fnCleanUp func(task *jobs.Task) +type fnTasks func(b *bundle.Bundle) []*jobs.Task + +type trampoline struct { + name string + getTasks fnTasks + templateData fnTemplateData + cleanUp fnCleanUp + template string +} + +func NewTrampoline( + name string, + tasks fnTasks, + templateData fnTemplateData, + cleanUp fnCleanUp, + template string, +) *trampoline { + return &trampoline{name, tasks, templateData, cleanUp, template} +} + +func (m *trampoline) Name() string { + return fmt.Sprintf("trampoline(%s)", m.name) +} + +func (m *trampoline) Apply(ctx context.Context, b *bundle.Bundle) error { + tasks := m.getTasks(b) + for _, task := range tasks { + err := m.generateNotebookWrapper(b, task) + if err != nil { + return err + } + } + return nil +} + +func (m *trampoline) generateNotebookWrapper(b *bundle.Bundle, task *jobs.Task) error { + internalDir, err := b.InternalDir() + if err != nil { + return err + } + + notebookName := fmt.Sprintf("notebook_%s", task.TaskKey) + localNotebookPath := filepath.Join(internalDir, notebookName+".py") + + err = os.MkdirAll(filepath.Dir(localNotebookPath), 0755) + if err != nil { + return err + } + + f, err := os.Create(localNotebookPath) + if err != nil { + return err + } + defer f.Close() + + data, err := m.templateData(task) + if err != nil { + return err + } + + t, err := template.New(notebookName).Parse(m.template) + if err != nil { + return err + } + + internalDirRel, err := filepath.Rel(b.Config.Path, internalDir) + if err != nil { + return err + } + + m.cleanUp(task) + remotePath := path.Join(b.Config.Workspace.FilesPath, filepath.ToSlash(internalDirRel), notebookName) + + task.NotebookTask = &jobs.NotebookTask{ + NotebookPath: remotePath, + } + + return t.Execute(f, data) +} diff --git a/bundle/config/mutator/trampoline_test.go b/bundle/config/mutator/trampoline_test.go new file mode 100644 index 000000000..9c176ff06 --- /dev/null +++ b/bundle/config/mutator/trampoline_test.go @@ -0,0 +1,90 @@ +package mutator + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/require" +) + +func getTasks(b *bundle.Bundle) []*jobs.Task { + tasks := make([]*jobs.Task, 0) + for k := range b.Config.Resources.Jobs["test"].Tasks { + tasks = append(tasks, &b.Config.Resources.Jobs["test"].Tasks[k]) + } + + return tasks +} + +func templateData(task *jobs.Task) (map[string]any, error) { + if task.PythonWheelTask == nil { + return nil, fmt.Errorf("PythonWheelTask cannot be nil") + } + + data := make(map[string]any) + data["MyName"] = "Trampoline" + return data, nil +} + +func cleanUp(task *jobs.Task) { + task.PythonWheelTask = nil +} + +func TestGenerateTrampoline(t *testing.T) { + tmpDir := t.TempDir() + + tasks := []jobs.Task{ + { + TaskKey: "to_trampoline", + PythonWheelTask: &jobs.PythonWheelTask{ + PackageName: "test", + EntryPoint: "run", + }}, + } + + b := &bundle.Bundle{ + Config: config.Root{ + Path: tmpDir, + Bundle: config.Bundle{ + Target: "development", + }, + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "test": { + Paths: resources.Paths{ + ConfigFilePath: tmpDir, + }, + JobSettings: &jobs.JobSettings{ + Tasks: tasks, + }, + }, + }, + }, + }, + } + ctx := context.Background() + + trampoline := NewTrampoline("test_trampoline", getTasks, templateData, cleanUp, "Hello from {{.MyName}}") + err := bundle.Apply(ctx, b, trampoline) + require.NoError(t, err) + + dir, err := b.InternalDir() + require.NoError(t, err) + filename := filepath.Join(dir, "notebook_to_trampoline.py") + + bytes, err := os.ReadFile(filename) + require.NoError(t, err) + + require.Equal(t, "Hello from Trampoline", string(bytes)) + + task := b.Config.Resources.Jobs["test"].Tasks[0] + require.Equal(t, task.NotebookTask.NotebookPath, ".databricks/bundle/development/.internal/notebook_to_trampoline") + require.Nil(t, task.PythonWheelTask) +} diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 011bb4b2b..5a9a7f2fe 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -8,6 +8,7 @@ import ( "github.com/databricks/cli/bundle/deploy/lock" "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/libraries" + "github.com/databricks/cli/bundle/python" ) // The deploy phase deploys artifacts and resources. @@ -17,10 +18,11 @@ func Deploy() bundle.Mutator { bundle.Defer( bundle.Seq( mutator.ValidateGitDetails(), - files.Upload(), libraries.MatchWithArtifacts(), artifacts.CleanUp(), artifacts.UploadAll(), + python.TransformWheelTask(), + files.Upload(), terraform.Interpolate(), terraform.Write(), terraform.StatePull(), diff --git a/bundle/python/transform.go b/bundle/python/transform.go new file mode 100644 index 000000000..c7938bab6 --- /dev/null +++ b/bundle/python/transform.go @@ -0,0 +1,85 @@ +package python + +import ( + "fmt" + "strconv" + "strings" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config/mutator" + "github.com/databricks/cli/bundle/libraries" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +const NOTEBOOK_TEMPLATE = `# Databricks notebook source +%python +{{range .Libraries}} +%pip install --force-reinstall {{.Whl}} +{{end}} + +from contextlib import redirect_stdout +import io +import sys +sys.argv = [{{.Params}}] + +import pkg_resources +_func = pkg_resources.load_entry_point("{{.Task.PackageName}}", "console_scripts", "{{.Task.EntryPoint}}") + +f = io.StringIO() +with redirect_stdout(f): + _func() +s = f.getvalue() +dbutils.notebook.exit(s) +` + +// This mutator takes the wheel task and transforms it into notebook +// which installs uploaded wheels using %pip and then calling corresponding +// entry point. +func TransformWheelTask() bundle.Mutator { + return mutator.NewTrampoline( + "python_wheel", + getTasks, + generateTemplateData, + cleanUpTask, + NOTEBOOK_TEMPLATE, + ) +} + +func getTasks(b *bundle.Bundle) []*jobs.Task { + return libraries.FindAllWheelTasks(b) +} + +func generateTemplateData(task *jobs.Task) (map[string]any, error) { + params, err := generateParameters(task.PythonWheelTask) + if err != nil { + return nil, err + } + + data := map[string]any{ + "Libraries": task.Libraries, + "Params": params, + "Task": task.PythonWheelTask, + } + + return data, nil +} + +func generateParameters(task *jobs.PythonWheelTask) (string, error) { + if task.Parameters != nil && task.NamedParameters != nil { + return "", fmt.Errorf("not allowed to pass both paramaters and named_parameters") + } + params := append([]string{"python"}, task.Parameters...) + for k, v := range task.NamedParameters { + params = append(params, fmt.Sprintf("%s=%s", k, v)) + } + + for i := range params { + params[i] = strconv.Quote(params[i]) + } + return strings.Join(params, ", "), nil +} + +func cleanUpTask(task *jobs.Task) { + task.PythonWheelTask = nil + task.Libraries = nil +} diff --git a/bundle/python/transform_test.go b/bundle/python/transform_test.go new file mode 100644 index 000000000..ada64e7a9 --- /dev/null +++ b/bundle/python/transform_test.go @@ -0,0 +1,57 @@ +package python + +import ( + "testing" + + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/require" +) + +type testCase struct { + Actual []string + Expected string +} +type NamedParams map[string]string +type testCaseNamed struct { + Actual NamedParams + Expected string +} + +var paramsTestCases []testCase = []testCase{ + {[]string{}, `"python"`}, + {[]string{"a"}, `"python", "a"`}, + {[]string{"a", "b"}, `"python", "a", "b"`}, + {[]string{"123!@#$%^&*()-="}, `"python", "123!@#$%^&*()-="`}, + {[]string{`{"a": 1}`}, `"python", "{\"a\": 1}"`}, +} + +var paramsTestCasesNamed []testCaseNamed = []testCaseNamed{ + {NamedParams{}, `"python"`}, + {NamedParams{"a": "1"}, `"python", "a=1"`}, + {NamedParams{"a": "1", "b": "2"}, `"python", "a=1", "b=2"`}, + {NamedParams{"data": `{"a": 1}`}, `"python", "data={\"a\": 1}"`}, +} + +func TestGenerateParameters(t *testing.T) { + for _, c := range paramsTestCases { + task := &jobs.PythonWheelTask{Parameters: c.Actual} + result, err := generateParameters(task) + require.NoError(t, err) + require.Equal(t, c.Expected, result) + } +} + +func TestGenerateNamedParameters(t *testing.T) { + for _, c := range paramsTestCasesNamed { + task := &jobs.PythonWheelTask{NamedParameters: c.Actual} + result, err := generateParameters(task) + require.NoError(t, err) + require.Equal(t, c.Expected, result) + } +} + +func TestGenerateBoth(t *testing.T) { + task := &jobs.PythonWheelTask{NamedParameters: map[string]string{"a": "1"}, Parameters: []string{"b"}} + _, err := generateParameters(task) + require.Error(t, err) +}