databricks-cli/bundle/python/transform.go

157 lines
3.5 KiB
Go
Raw Normal View History

package python
import (
"context"
"fmt"
"os"
2023-08-14 15:36:37 +00:00
"path"
"path/filepath"
"strings"
2023-08-14 15:36:37 +00:00
"text/template"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
2023-08-14 15:36:37 +00:00
const NOTEBOOK_TEMPLATE = `# Databricks notebook source
%python
{{range .Libraries}}
%pip install --force-reinstall {{.Whl}}
{{end}}
from contextlib import redirect_stdout
import io
import sys
2023-08-14 15:36:37 +00:00
sys.argv = [{{.Params}}]
import pkg_resources
2023-08-14 15:36:37 +00:00
_func = pkg_resources.load_entry_point("{{.Task.PackageName}}", "console_scripts", "{{.Task.EntryPoint}}")
f = io.StringIO()
with redirect_stdout(f):
_func()
s = f.getvalue()
dbutils.notebook.exit(s)
`
2023-08-14 15:36:37 +00:00
// This mutator takes the wheel task and transforms it into notebook
// which installs uploaded wheels using %pip and then calling corresponding
// entry point.
func TransformWheelTask() bundle.Mutator {
return &transform{}
}
type transform struct {
}
func (m *transform) Name() string {
return "python.TransformWheelTask"
}
func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error {
wheelTasks := libraries.FindAllWheelTasks(b)
for _, wheelTask := range wheelTasks {
2023-08-17 15:50:00 +00:00
err := generateNotebookTrampoline(b, wheelTask)
2023-08-14 15:36:37 +00:00
if err != nil {
return err
}
2023-08-17 15:50:00 +00:00
}
return nil
}
2023-08-14 15:36:37 +00:00
2023-08-17 15:50:00 +00:00
func generateNotebookTrampoline(b *bundle.Bundle, wheelTask *jobs.Task) error {
taskDefinition := wheelTask.PythonWheelTask
libraries := wheelTask.Libraries
2023-08-17 15:50:00 +00:00
wheelTask.PythonWheelTask = nil
wheelTask.Libraries = nil
2023-08-17 15:50:00 +00:00
filename, err := generateNotebookWrapper(b, taskDefinition, libraries)
if err != nil {
return err
}
2023-08-17 15:50:00 +00:00
internalDir, err := getInternalDir(b)
if err != nil {
return err
}
2023-08-17 15:50:00 +00:00
internalDirRel, err := filepath.Rel(b.Config.Path, internalDir)
if err != nil {
return err
}
parts := []string{b.Config.Workspace.FilesPath}
parts = append(parts, strings.Split(internalDirRel, string(os.PathSeparator))...)
parts = append(parts, filename)
wheelTask.NotebookTask = &jobs.NotebookTask{
NotebookPath: path.Join(parts...),
}
return nil
}
2023-08-16 13:50:00 +00:00
func getInternalDir(b *bundle.Bundle) (string, error) {
cacheDir, err := b.CacheDir()
if err != nil {
return "", err
}
internalDir := filepath.Join(cacheDir, ".internal")
return internalDir, nil
}
2023-08-14 15:36:37 +00:00
func generateNotebookWrapper(b *bundle.Bundle, task *jobs.PythonWheelTask, libraries []compute.Library) (string, error) {
2023-08-16 13:50:00 +00:00
internalDir, err := getInternalDir(b)
2023-08-14 15:36:37 +00:00
if err != nil {
return "", err
}
2023-08-16 13:50:00 +00:00
2023-08-14 15:36:37 +00:00
notebookName := fmt.Sprintf("notebook_%s_%s", task.PackageName, task.EntryPoint)
path := filepath.Join(internalDir, notebookName+".py")
2023-08-14 15:36:37 +00:00
err = os.MkdirAll(filepath.Dir(path), 0755)
if err != nil {
return "", err
}
2023-08-14 15:36:37 +00:00
f, err := os.Create(path)
if err != nil {
return "", err
}
defer f.Close()
2023-08-16 14:34:46 +00:00
params, err := generateParameters(task)
if err != nil {
return "", err
}
2023-08-14 15:36:37 +00:00
data := map[string]any{
"Libraries": libraries,
2023-08-16 14:34:46 +00:00
"Params": params,
2023-08-14 15:36:37 +00:00
"Task": task,
}
t, err := template.New("notebook").Parse(NOTEBOOK_TEMPLATE)
if err != nil {
return "", err
}
return notebookName, t.Execute(f, data)
}
2023-08-16 14:34:46 +00:00
func generateParameters(task *jobs.PythonWheelTask) (string, error) {
if task.Parameters != nil && task.NamedParameters != nil {
return "", fmt.Errorf("not allowed to pass both paramaters and named_parameters")
}
params := append([]string{"python"}, task.Parameters...)
for k, v := range task.NamedParameters {
params = append(params, fmt.Sprintf("%s=%s", k, v))
}
for i := range params {
params[i] = `"` + params[i] + `"`
}
2023-08-16 14:34:46 +00:00
return strings.Join(params, ", "), nil
}