Merge remote-tracking branch 'origin/transform-wheel-task' into mutator-with-wrappers

This commit is contained in:
kartikgupta-db 2023-08-29 13:51:13 +02:00
commit ccfce07aa6
No known key found for this signature in database
GPG Key ID: 6AD5FA11FACDEA39
5 changed files with 330 additions and 1 deletions

View File

@ -0,0 +1,95 @@
package mutator
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"text/template"
"github.com/databricks/cli/bundle"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
type fnTemplateData func(task *jobs.Task) (map[string]any, error)
type fnCleanUp func(task *jobs.Task)
type fnTasks func(b *bundle.Bundle) []*jobs.Task
type trampoline struct {
name string
getTasks fnTasks
templateData fnTemplateData
cleanUp fnCleanUp
template string
}
func NewTrampoline(
name string,
tasks fnTasks,
templateData fnTemplateData,
cleanUp fnCleanUp,
template string,
) *trampoline {
return &trampoline{name, tasks, templateData, cleanUp, template}
}
func (m *trampoline) Name() string {
return fmt.Sprintf("trampoline(%s)", m.name)
}
func (m *trampoline) Apply(ctx context.Context, b *bundle.Bundle) error {
tasks := m.getTasks(b)
for _, task := range tasks {
err := m.generateNotebookWrapper(b, task)
if err != nil {
return err
}
}
return nil
}
func (m *trampoline) generateNotebookWrapper(b *bundle.Bundle, task *jobs.Task) error {
internalDir, err := b.InternalDir()
if err != nil {
return err
}
notebookName := fmt.Sprintf("notebook_%s", task.TaskKey)
localNotebookPath := filepath.Join(internalDir, notebookName+".py")
err = os.MkdirAll(filepath.Dir(localNotebookPath), 0755)
if err != nil {
return err
}
f, err := os.Create(localNotebookPath)
if err != nil {
return err
}
defer f.Close()
data, err := m.templateData(task)
if err != nil {
return err
}
t, err := template.New(notebookName).Parse(m.template)
if err != nil {
return err
}
internalDirRel, err := filepath.Rel(b.Config.Path, internalDir)
if err != nil {
return err
}
m.cleanUp(task)
remotePath := path.Join(b.Config.Workspace.FilesPath, filepath.ToSlash(internalDirRel), notebookName)
task.NotebookTask = &jobs.NotebookTask{
NotebookPath: remotePath,
}
return t.Execute(f, data)
}

View File

@ -0,0 +1,90 @@
package mutator
import (
"context"
"fmt"
"os"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)
func getTasks(b *bundle.Bundle) []*jobs.Task {
tasks := make([]*jobs.Task, 0)
for k := range b.Config.Resources.Jobs["test"].Tasks {
tasks = append(tasks, &b.Config.Resources.Jobs["test"].Tasks[k])
}
return tasks
}
func templateData(task *jobs.Task) (map[string]any, error) {
if task.PythonWheelTask == nil {
return nil, fmt.Errorf("PythonWheelTask cannot be nil")
}
data := make(map[string]any)
data["MyName"] = "Trampoline"
return data, nil
}
func cleanUp(task *jobs.Task) {
task.PythonWheelTask = nil
}
func TestGenerateTrampoline(t *testing.T) {
tmpDir := t.TempDir()
tasks := []jobs.Task{
{
TaskKey: "to_trampoline",
PythonWheelTask: &jobs.PythonWheelTask{
PackageName: "test",
EntryPoint: "run",
}},
}
b := &bundle.Bundle{
Config: config.Root{
Path: tmpDir,
Bundle: config.Bundle{
Target: "development",
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"test": {
Paths: resources.Paths{
ConfigFilePath: tmpDir,
},
JobSettings: &jobs.JobSettings{
Tasks: tasks,
},
},
},
},
},
}
ctx := context.Background()
trampoline := NewTrampoline("test_trampoline", getTasks, templateData, cleanUp, "Hello from {{.MyName}}")
err := bundle.Apply(ctx, b, trampoline)
require.NoError(t, err)
dir, err := b.InternalDir()
require.NoError(t, err)
filename := filepath.Join(dir, "notebook_to_trampoline.py")
bytes, err := os.ReadFile(filename)
require.NoError(t, err)
require.Equal(t, "Hello from Trampoline", string(bytes))
task := b.Config.Resources.Jobs["test"].Tasks[0]
require.Equal(t, task.NotebookTask.NotebookPath, ".databricks/bundle/development/.internal/notebook_to_trampoline")
require.Nil(t, task.PythonWheelTask)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/databricks/cli/bundle/deploy/lock"
"github.com/databricks/cli/bundle/deploy/terraform"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/bundle/python"
)
// The deploy phase deploys artifacts and resources.
@ -17,10 +18,11 @@ func Deploy() bundle.Mutator {
bundle.Defer(
bundle.Seq(
mutator.ValidateGitDetails(),
files.Upload(),
libraries.MatchWithArtifacts(),
artifacts.CleanUp(),
artifacts.UploadAll(),
python.TransformWheelTask(),
files.Upload(),
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),

View File

@ -0,0 +1,85 @@
package python
import (
"fmt"
"strconv"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
const NOTEBOOK_TEMPLATE = `# Databricks notebook source
%python
{{range .Libraries}}
%pip install --force-reinstall {{.Whl}}
{{end}}
from contextlib import redirect_stdout
import io
import sys
sys.argv = [{{.Params}}]
import pkg_resources
_func = pkg_resources.load_entry_point("{{.Task.PackageName}}", "console_scripts", "{{.Task.EntryPoint}}")
f = io.StringIO()
with redirect_stdout(f):
_func()
s = f.getvalue()
dbutils.notebook.exit(s)
`
// This mutator takes the wheel task and transforms it into notebook
// which installs uploaded wheels using %pip and then calling corresponding
// entry point.
func TransformWheelTask() bundle.Mutator {
return mutator.NewTrampoline(
"python_wheel",
getTasks,
generateTemplateData,
cleanUpTask,
NOTEBOOK_TEMPLATE,
)
}
func getTasks(b *bundle.Bundle) []*jobs.Task {
return libraries.FindAllWheelTasks(b)
}
func generateTemplateData(task *jobs.Task) (map[string]any, error) {
params, err := generateParameters(task.PythonWheelTask)
if err != nil {
return nil, err
}
data := map[string]any{
"Libraries": task.Libraries,
"Params": params,
"Task": task.PythonWheelTask,
}
return data, nil
}
func generateParameters(task *jobs.PythonWheelTask) (string, error) {
if task.Parameters != nil && task.NamedParameters != nil {
return "", fmt.Errorf("not allowed to pass both paramaters and named_parameters")
}
params := append([]string{"python"}, task.Parameters...)
for k, v := range task.NamedParameters {
params = append(params, fmt.Sprintf("%s=%s", k, v))
}
for i := range params {
params[i] = strconv.Quote(params[i])
}
return strings.Join(params, ", "), nil
}
func cleanUpTask(task *jobs.Task) {
task.PythonWheelTask = nil
task.Libraries = nil
}

View File

@ -0,0 +1,57 @@
package python
import (
"testing"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)
type testCase struct {
Actual []string
Expected string
}
type NamedParams map[string]string
type testCaseNamed struct {
Actual NamedParams
Expected string
}
var paramsTestCases []testCase = []testCase{
{[]string{}, `"python"`},
{[]string{"a"}, `"python", "a"`},
{[]string{"a", "b"}, `"python", "a", "b"`},
{[]string{"123!@#$%^&*()-="}, `"python", "123!@#$%^&*()-="`},
{[]string{`{"a": 1}`}, `"python", "{\"a\": 1}"`},
}
var paramsTestCasesNamed []testCaseNamed = []testCaseNamed{
{NamedParams{}, `"python"`},
{NamedParams{"a": "1"}, `"python", "a=1"`},
{NamedParams{"a": "1", "b": "2"}, `"python", "a=1", "b=2"`},
{NamedParams{"data": `{"a": 1}`}, `"python", "data={\"a\": 1}"`},
}
func TestGenerateParameters(t *testing.T) {
for _, c := range paramsTestCases {
task := &jobs.PythonWheelTask{Parameters: c.Actual}
result, err := generateParameters(task)
require.NoError(t, err)
require.Equal(t, c.Expected, result)
}
}
func TestGenerateNamedParameters(t *testing.T) {
for _, c := range paramsTestCasesNamed {
task := &jobs.PythonWheelTask{NamedParameters: c.Actual}
result, err := generateParameters(task)
require.NoError(t, err)
require.Equal(t, c.Expected, result)
}
}
func TestGenerateBoth(t *testing.T) {
task := &jobs.PythonWheelTask{NamedParameters: map[string]string{"a": "1"}, Parameters: []string{"b"}}
_, err := generateParameters(task)
require.Error(t, err)
}