2024-09-27 09:32:54 +00:00
|
|
|
package trampoline
|
2023-08-30 12:21:39 +00:00
|
|
|
|
|
|
|
import (
|
2024-07-09 15:08:38 +00:00
|
|
|
"context"
|
2025-01-07 10:49:23 +00:00
|
|
|
"errors"
|
2023-08-30 12:21:39 +00:00
|
|
|
"fmt"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"github.com/databricks/cli/bundle"
|
2023-09-08 11:08:21 +00:00
|
|
|
"github.com/databricks/cli/bundle/libraries"
|
Remove bundle.{Seq,If,Defer,newPhase,logString}, switch to regular functions (#2390)
## Changes
- Instead of constructing chains of mutators and then executing them,
execute them directly.
- Remove functionality related to chain-building: Seq, If, Defer,
newPhase, logString.
- Phases become functions that apply the changes directly rather than
construct mutator chains that will be called later.
- Add a helper ApplySeq to call multiple mutators, use it where
Apply+Seq were used before.
This is intended to be a refactoring without functional changes, but
there are a few behaviour changes:
- Since defer() is used to call unlock instead of bundle.Defer()
unlocking will now happen even in case of panics.
- In --debug, the phase names are are still logged once before start of
the phase but each entry no longer has 'seq' or phase name in it.
- The message "Deployment complete!" was printed even if
terraform.Apply() mutator had an error. It no longer does that.
## Motivation
The use of the chains was necessary when mutators were returning a list
of other mutators instead of calling them directly. But that has since
been removed, so now the chain machinery have no purpose anymore.
Use of direct functions simplifies the logic and makes bugs more
apparent and easy to fix.
Other improvements that this unlocks:
- Simpler stacktraces/debugging (breakpoints).
- Use of functions with narrowly scoped API: instead of mutators that
receive full bundle config, we can use focused functions that only deal
with sections they care about prepareGitSettings(currentGitSection) ->
updatedGitSection. This makes the data flow more apparent.
- Parallel computations across mutators (within phase): launch
goroutines fetching data from APIs at the beggining, process them once
they are ready.
## Tests
Existing tests.
2025-02-27 11:41:58 +00:00
|
|
|
"github.com/databricks/cli/libs/diag"
|
2024-03-04 12:34:03 +00:00
|
|
|
"github.com/databricks/databricks-sdk-go/service/compute"
|
2023-08-30 12:21:39 +00:00
|
|
|
"github.com/databricks/databricks-sdk-go/service/jobs"
|
|
|
|
)
|
|
|
|
|
|
|
|
const NOTEBOOK_TEMPLATE = `# Databricks notebook source
|
|
|
|
%python
|
|
|
|
{{range .Libraries}}
|
|
|
|
%pip install --force-reinstall {{.Whl}}
|
|
|
|
{{end}}
|
|
|
|
|
2023-08-31 14:10:32 +00:00
|
|
|
dbutils.library.restartPython()
|
|
|
|
|
2023-08-30 12:21:39 +00:00
|
|
|
try:
|
|
|
|
from importlib import metadata
|
|
|
|
except ImportError: # for Python<3.8
|
|
|
|
import subprocess
|
|
|
|
import sys
|
|
|
|
|
|
|
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "importlib-metadata"])
|
|
|
|
import importlib_metadata as metadata
|
|
|
|
|
|
|
|
from contextlib import redirect_stdout
|
|
|
|
import io
|
|
|
|
import sys
|
2023-12-01 10:35:20 +00:00
|
|
|
import json
|
|
|
|
|
|
|
|
params = []
|
|
|
|
try:
|
|
|
|
python_params = dbutils.widgets.get("__python_params")
|
|
|
|
if python_params:
|
|
|
|
params = json.loads(python_params)
|
|
|
|
except Exception as e:
|
|
|
|
print(e)
|
|
|
|
|
2023-08-30 12:21:39 +00:00
|
|
|
sys.argv = [{{.Params}}]
|
|
|
|
|
2023-12-01 10:35:20 +00:00
|
|
|
if params:
|
|
|
|
sys.argv = [sys.argv[0]] + params
|
|
|
|
|
2023-08-30 12:21:39 +00:00
|
|
|
entry = [ep for ep in metadata.distribution("{{.Task.PackageName}}").entry_points if ep.name == "{{.Task.EntryPoint}}"]
|
|
|
|
|
|
|
|
f = io.StringIO()
|
|
|
|
with redirect_stdout(f):
|
|
|
|
if entry:
|
|
|
|
entry[0].load()()
|
|
|
|
else:
|
|
|
|
raise ImportError("Entry point '{{.Task.EntryPoint}}' not found")
|
|
|
|
s = f.getvalue()
|
|
|
|
dbutils.notebook.exit(s)
|
|
|
|
`
|
|
|
|
|
Remove bundle.{Seq,If,Defer,newPhase,logString}, switch to regular functions (#2390)
## Changes
- Instead of constructing chains of mutators and then executing them,
execute them directly.
- Remove functionality related to chain-building: Seq, If, Defer,
newPhase, logString.
- Phases become functions that apply the changes directly rather than
construct mutator chains that will be called later.
- Add a helper ApplySeq to call multiple mutators, use it where
Apply+Seq were used before.
This is intended to be a refactoring without functional changes, but
there are a few behaviour changes:
- Since defer() is used to call unlock instead of bundle.Defer()
unlocking will now happen even in case of panics.
- In --debug, the phase names are are still logged once before start of
the phase but each entry no longer has 'seq' or phase name in it.
- The message "Deployment complete!" was printed even if
terraform.Apply() mutator had an error. It no longer does that.
## Motivation
The use of the chains was necessary when mutators were returning a list
of other mutators instead of calling them directly. But that has since
been removed, so now the chain machinery have no purpose anymore.
Use of direct functions simplifies the logic and makes bugs more
apparent and easy to fix.
Other improvements that this unlocks:
- Simpler stacktraces/debugging (breakpoints).
- Use of functions with narrowly scoped API: instead of mutators that
receive full bundle config, we can use focused functions that only deal
with sections they care about prepareGitSettings(currentGitSection) ->
updatedGitSection. This makes the data flow more apparent.
- Parallel computations across mutators (within phase): launch
goroutines fetching data from APIs at the beggining, process them once
they are ready.
## Tests
Existing tests.
2025-02-27 11:41:58 +00:00
|
|
|
type transformWheelTask struct{}
|
|
|
|
|
|
|
|
func (transformWheelTask) Name() string {
|
|
|
|
return "TransformWheelTask"
|
|
|
|
}
|
|
|
|
|
2023-08-30 12:21:39 +00:00
|
|
|
// This mutator takes the wheel task and transforms it into notebook
|
|
|
|
// which installs uploaded wheels using %pip and then calling corresponding
|
|
|
|
// entry point.
|
|
|
|
func TransformWheelTask() bundle.Mutator {
|
Remove bundle.{Seq,If,Defer,newPhase,logString}, switch to regular functions (#2390)
## Changes
- Instead of constructing chains of mutators and then executing them,
execute them directly.
- Remove functionality related to chain-building: Seq, If, Defer,
newPhase, logString.
- Phases become functions that apply the changes directly rather than
construct mutator chains that will be called later.
- Add a helper ApplySeq to call multiple mutators, use it where
Apply+Seq were used before.
This is intended to be a refactoring without functional changes, but
there are a few behaviour changes:
- Since defer() is used to call unlock instead of bundle.Defer()
unlocking will now happen even in case of panics.
- In --debug, the phase names are are still logged once before start of
the phase but each entry no longer has 'seq' or phase name in it.
- The message "Deployment complete!" was printed even if
terraform.Apply() mutator had an error. It no longer does that.
## Motivation
The use of the chains was necessary when mutators were returning a list
of other mutators instead of calling them directly. But that has since
been removed, so now the chain machinery have no purpose anymore.
Use of direct functions simplifies the logic and makes bugs more
apparent and easy to fix.
Other improvements that this unlocks:
- Simpler stacktraces/debugging (breakpoints).
- Use of functions with narrowly scoped API: instead of mutators that
receive full bundle config, we can use focused functions that only deal
with sections they care about prepareGitSettings(currentGitSection) ->
updatedGitSection. This makes the data flow more apparent.
- Parallel computations across mutators (within phase): launch
goroutines fetching data from APIs at the beggining, process them once
they are ready.
## Tests
Existing tests.
2025-02-27 11:41:58 +00:00
|
|
|
return transformWheelTask{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (transformWheelTask) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
|
|
|
|
isEnabled := b.Config.Experimental != nil && b.Config.Experimental.PythonWheelWrapper
|
|
|
|
if !isEnabled {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
return bundle.Apply(ctx, b, NewTrampoline(
|
|
|
|
"python_wheel",
|
|
|
|
&pythonTrampoline{},
|
|
|
|
NOTEBOOK_TEMPLATE,
|
|
|
|
))
|
2023-08-30 12:21:39 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type pythonTrampoline struct{}
|
|
|
|
|
|
|
|
func (t *pythonTrampoline) CleanUp(task *jobs.Task) error {
|
|
|
|
task.PythonWheelTask = nil
|
2024-03-04 12:34:03 +00:00
|
|
|
|
|
|
|
nonWheelLibraries := make([]compute.Library, 0)
|
|
|
|
for _, l := range task.Libraries {
|
|
|
|
if l.Whl == "" {
|
|
|
|
nonWheelLibraries = append(nonWheelLibraries, l)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
task.Libraries = nonWheelLibraries
|
2023-08-30 12:21:39 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2024-09-27 09:32:54 +00:00
|
|
|
func (t *pythonTrampoline) GetTasks(b *bundle.Bundle) []TaskWithJobKey {
|
2023-08-30 12:21:39 +00:00
|
|
|
r := b.Config.Resources
|
2024-09-27 09:32:54 +00:00
|
|
|
result := make([]TaskWithJobKey, 0)
|
2023-08-30 12:21:39 +00:00
|
|
|
for k := range b.Config.Resources.Jobs {
|
|
|
|
tasks := r.Jobs[k].JobSettings.Tasks
|
|
|
|
for i := range tasks {
|
|
|
|
task := &tasks[i]
|
2023-08-30 13:51:15 +00:00
|
|
|
|
2023-09-08 13:45:21 +00:00
|
|
|
// Keep only Python wheel tasks with workspace libraries referenced.
|
|
|
|
// At this point of moment we don't have local paths in Libraries sections anymore
|
|
|
|
// Local paths have been replaced with the remote when the artifacts where uploaded
|
|
|
|
// in artifacts.UploadAll mutator.
|
2024-04-22 11:44:34 +00:00
|
|
|
if task.PythonWheelTask == nil || !needsTrampoline(*task) {
|
2023-08-30 13:51:15 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2024-09-27 09:32:54 +00:00
|
|
|
result = append(result, TaskWithJobKey{
|
2023-08-30 12:21:39 +00:00
|
|
|
JobKey: k,
|
|
|
|
Task: task,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return result
|
|
|
|
}
|
|
|
|
|
2024-04-22 11:44:34 +00:00
|
|
|
func needsTrampoline(task jobs.Task) bool {
|
2023-09-08 13:45:21 +00:00
|
|
|
return libraries.IsTaskWithWorkspaceLibraries(task)
|
|
|
|
}
|
|
|
|
|
2023-08-30 12:21:39 +00:00
|
|
|
func (t *pythonTrampoline) GetTemplateData(task *jobs.Task) (map[string]any, error) {
|
|
|
|
params, err := t.generateParameters(task.PythonWheelTask)
|
2024-03-04 12:34:03 +00:00
|
|
|
whlLibraries := make([]compute.Library, 0)
|
|
|
|
for _, l := range task.Libraries {
|
|
|
|
if l.Whl != "" {
|
|
|
|
whlLibraries = append(whlLibraries, l)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-08-30 12:21:39 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
data := map[string]any{
|
2024-03-04 12:34:03 +00:00
|
|
|
"Libraries": whlLibraries,
|
2023-08-30 12:21:39 +00:00
|
|
|
"Params": params,
|
|
|
|
"Task": task.PythonWheelTask,
|
|
|
|
}
|
|
|
|
|
|
|
|
return data, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t *pythonTrampoline) generateParameters(task *jobs.PythonWheelTask) (string, error) {
|
|
|
|
if task.Parameters != nil && task.NamedParameters != nil {
|
2025-01-07 10:49:23 +00:00
|
|
|
return "", errors.New("not allowed to pass both paramaters and named_parameters")
|
2023-08-30 12:21:39 +00:00
|
|
|
}
|
2023-09-26 14:32:20 +00:00
|
|
|
params := append([]string{task.PackageName}, task.Parameters...)
|
2023-08-30 12:21:39 +00:00
|
|
|
for k, v := range task.NamedParameters {
|
|
|
|
params = append(params, fmt.Sprintf("%s=%s", k, v))
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := range params {
|
|
|
|
params[i] = strconv.Quote(params[i])
|
|
|
|
}
|
|
|
|
return strings.Join(params, ", "), nil
|
|
|
|
}
|