databricks bundle init template v2: optional stubs, DLT support (#700)

## Changes

This follows up on https://github.com/databricks/cli/pull/686. This PR
makes our stubs optional + it adds DLT stubs:

```
$ databricks bundle init
Template to use [default-python]: default-python
Unique name for this project [my_project]: my_project
Include a stub (sample) notebook in 'my_project/src' [yes]: yes
Include a stub (sample) DLT pipeline in 'my_project/src' [yes]: yes
Include a stub (sample) Python package 'my_project/src' [yes]: yes
 Successfully initialized template
```

## Tests
Manual testing, matrix tests.

---------

Co-authored-by: Andrew Nester <andrew.nester@databricks.com>
Co-authored-by: PaulCornellDB <paul.cornell@databricks.com>
Co-authored-by: Pieter Noordhuis <pieter.noordhuis@databricks.com>
This commit is contained in:
Lennart Kats (databricks) 2023-09-06 11:52:31 +02:00 committed by GitHub
parent a41b9e8bf2
commit f9e521b43e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 393 additions and 40 deletions

View File

@ -37,6 +37,10 @@ type Bundle struct {
// Stores an initialized copy of this bundle's Terraform wrapper. // Stores an initialized copy of this bundle's Terraform wrapper.
Terraform *tfexec.Terraform Terraform *tfexec.Terraform
// Indicates that the Terraform definition based on this bundle is empty,
// i.e. that it would deploy no resources.
TerraformHasNoResources bool
// Stores the locker responsible for acquiring/releasing a deployment lock. // Stores the locker responsible for acquiring/releasing a deployment lock.
Locker *locker.Locker Locker *locker.Locker

View File

@ -21,6 +21,10 @@ func (m *populateCurrentUser) Name() string {
} }
func (m *populateCurrentUser) Apply(ctx context.Context, b *bundle.Bundle) error { func (m *populateCurrentUser) Apply(ctx context.Context, b *bundle.Bundle) error {
if b.Config.Workspace.CurrentUser != nil {
return nil
}
w := b.WorkspaceClient() w := b.WorkspaceClient()
me, err := w.CurrentUser.Me(ctx) me, err := w.CurrentUser.Me(ctx)
if err != nil { if err != nil {

View File

@ -16,6 +16,10 @@ func (w *apply) Name() string {
} }
func (w *apply) Apply(ctx context.Context, b *bundle.Bundle) error { func (w *apply) Apply(ctx context.Context, b *bundle.Bundle) error {
if b.TerraformHasNoResources {
cmdio.LogString(ctx, "Note: there are no resources to deploy for this bundle")
return nil
}
tf := b.Terraform tf := b.Terraform
if tf == nil { if tf == nil {
return fmt.Errorf("terraform not initialized") return fmt.Errorf("terraform not initialized")

View File

@ -49,12 +49,14 @@ func convPermission(ac resources.Permission) schema.ResourcePermissionsAccessCon
// //
// NOTE: THIS IS CURRENTLY A HACK. WE NEED A BETTER WAY TO // NOTE: THIS IS CURRENTLY A HACK. WE NEED A BETTER WAY TO
// CONVERT TO/FROM TERRAFORM COMPATIBLE FORMAT. // CONVERT TO/FROM TERRAFORM COMPATIBLE FORMAT.
func BundleToTerraform(config *config.Root) *schema.Root { func BundleToTerraform(config *config.Root) (*schema.Root, bool) {
tfroot := schema.NewRoot() tfroot := schema.NewRoot()
tfroot.Provider = schema.NewProviders() tfroot.Provider = schema.NewProviders()
tfroot.Resource = schema.NewResources() tfroot.Resource = schema.NewResources()
noResources := true
for k, src := range config.Resources.Jobs { for k, src := range config.Resources.Jobs {
noResources = false
var dst schema.ResourceJob var dst schema.ResourceJob
conv(src, &dst) conv(src, &dst)
@ -100,6 +102,7 @@ func BundleToTerraform(config *config.Root) *schema.Root {
} }
for k, src := range config.Resources.Pipelines { for k, src := range config.Resources.Pipelines {
noResources = false
var dst schema.ResourcePipeline var dst schema.ResourcePipeline
conv(src, &dst) conv(src, &dst)
@ -127,6 +130,7 @@ func BundleToTerraform(config *config.Root) *schema.Root {
} }
for k, src := range config.Resources.Models { for k, src := range config.Resources.Models {
noResources = false
var dst schema.ResourceMlflowModel var dst schema.ResourceMlflowModel
conv(src, &dst) conv(src, &dst)
tfroot.Resource.MlflowModel[k] = &dst tfroot.Resource.MlflowModel[k] = &dst
@ -139,6 +143,7 @@ func BundleToTerraform(config *config.Root) *schema.Root {
} }
for k, src := range config.Resources.Experiments { for k, src := range config.Resources.Experiments {
noResources = false
var dst schema.ResourceMlflowExperiment var dst schema.ResourceMlflowExperiment
conv(src, &dst) conv(src, &dst)
tfroot.Resource.MlflowExperiment[k] = &dst tfroot.Resource.MlflowExperiment[k] = &dst
@ -150,7 +155,7 @@ func BundleToTerraform(config *config.Root) *schema.Root {
} }
} }
return tfroot return tfroot, noResources
} }
func TerraformToBundle(state *tfjson.State, config *config.Root) error { func TerraformToBundle(state *tfjson.State, config *config.Root) error {

View File

@ -40,7 +40,7 @@ func TestConvertJob(t *testing.T) {
}, },
} }
out := BundleToTerraform(&config) out, _ := BundleToTerraform(&config)
assert.Equal(t, "my job", out.Resource.Job["my_job"].Name) assert.Equal(t, "my job", out.Resource.Job["my_job"].Name)
assert.Len(t, out.Resource.Job["my_job"].JobCluster, 1) assert.Len(t, out.Resource.Job["my_job"].JobCluster, 1)
assert.Equal(t, "https://github.com/foo/bar", out.Resource.Job["my_job"].GitSource.Url) assert.Equal(t, "https://github.com/foo/bar", out.Resource.Job["my_job"].GitSource.Url)
@ -65,7 +65,7 @@ func TestConvertJobPermissions(t *testing.T) {
}, },
} }
out := BundleToTerraform(&config) out, _ := BundleToTerraform(&config)
assert.NotEmpty(t, out.Resource.Permissions["job_my_job"].JobId) assert.NotEmpty(t, out.Resource.Permissions["job_my_job"].JobId)
assert.Len(t, out.Resource.Permissions["job_my_job"].AccessControl, 1) assert.Len(t, out.Resource.Permissions["job_my_job"].AccessControl, 1)
@ -101,7 +101,7 @@ func TestConvertJobTaskLibraries(t *testing.T) {
}, },
} }
out := BundleToTerraform(&config) out, _ := BundleToTerraform(&config)
assert.Equal(t, "my job", out.Resource.Job["my_job"].Name) assert.Equal(t, "my job", out.Resource.Job["my_job"].Name)
require.Len(t, out.Resource.Job["my_job"].Task, 1) require.Len(t, out.Resource.Job["my_job"].Task, 1)
require.Len(t, out.Resource.Job["my_job"].Task[0].Library, 1) require.Len(t, out.Resource.Job["my_job"].Task[0].Library, 1)
@ -135,7 +135,7 @@ func TestConvertPipeline(t *testing.T) {
}, },
} }
out := BundleToTerraform(&config) out, _ := BundleToTerraform(&config)
assert.Equal(t, "my pipeline", out.Resource.Pipeline["my_pipeline"].Name) assert.Equal(t, "my pipeline", out.Resource.Pipeline["my_pipeline"].Name)
assert.Len(t, out.Resource.Pipeline["my_pipeline"].Library, 2) assert.Len(t, out.Resource.Pipeline["my_pipeline"].Library, 2)
assert.Nil(t, out.Data) assert.Nil(t, out.Data)
@ -159,7 +159,7 @@ func TestConvertPipelinePermissions(t *testing.T) {
}, },
} }
out := BundleToTerraform(&config) out, _ := BundleToTerraform(&config)
assert.NotEmpty(t, out.Resource.Permissions["pipeline_my_pipeline"].PipelineId) assert.NotEmpty(t, out.Resource.Permissions["pipeline_my_pipeline"].PipelineId)
assert.Len(t, out.Resource.Permissions["pipeline_my_pipeline"].AccessControl, 1) assert.Len(t, out.Resource.Permissions["pipeline_my_pipeline"].AccessControl, 1)
@ -194,7 +194,7 @@ func TestConvertModel(t *testing.T) {
}, },
} }
out := BundleToTerraform(&config) out, _ := BundleToTerraform(&config)
assert.Equal(t, "name", out.Resource.MlflowModel["my_model"].Name) assert.Equal(t, "name", out.Resource.MlflowModel["my_model"].Name)
assert.Equal(t, "description", out.Resource.MlflowModel["my_model"].Description) assert.Equal(t, "description", out.Resource.MlflowModel["my_model"].Description)
assert.Len(t, out.Resource.MlflowModel["my_model"].Tags, 2) assert.Len(t, out.Resource.MlflowModel["my_model"].Tags, 2)
@ -223,7 +223,7 @@ func TestConvertModelPermissions(t *testing.T) {
}, },
} }
out := BundleToTerraform(&config) out, _ := BundleToTerraform(&config)
assert.NotEmpty(t, out.Resource.Permissions["mlflow_model_my_model"].RegisteredModelId) assert.NotEmpty(t, out.Resource.Permissions["mlflow_model_my_model"].RegisteredModelId)
assert.Len(t, out.Resource.Permissions["mlflow_model_my_model"].AccessControl, 1) assert.Len(t, out.Resource.Permissions["mlflow_model_my_model"].AccessControl, 1)
@ -247,7 +247,7 @@ func TestConvertExperiment(t *testing.T) {
}, },
} }
out := BundleToTerraform(&config) out, _ := BundleToTerraform(&config)
assert.Equal(t, "name", out.Resource.MlflowExperiment["my_experiment"].Name) assert.Equal(t, "name", out.Resource.MlflowExperiment["my_experiment"].Name)
assert.Nil(t, out.Data) assert.Nil(t, out.Data)
} }
@ -270,7 +270,7 @@ func TestConvertExperimentPermissions(t *testing.T) {
}, },
} }
out := BundleToTerraform(&config) out, _ := BundleToTerraform(&config)
assert.NotEmpty(t, out.Resource.Permissions["mlflow_experiment_my_experiment"].ExperimentId) assert.NotEmpty(t, out.Resource.Permissions["mlflow_experiment_my_experiment"].ExperimentId)
assert.Len(t, out.Resource.Permissions["mlflow_experiment_my_experiment"].AccessControl, 1) assert.Len(t, out.Resource.Permissions["mlflow_experiment_my_experiment"].AccessControl, 1)

View File

@ -21,7 +21,8 @@ func (w *write) Apply(ctx context.Context, b *bundle.Bundle) error {
return err return err
} }
root := BundleToTerraform(&b.Config) root, noResources := BundleToTerraform(&b.Config)
b.TerraformHasNoResources = noResources
f, err := os.Create(filepath.Join(dir, "bundle.tf.json")) f, err := os.Create(filepath.Join(dir, "bundle.tf.json"))
if err != nil { if err != nil {
return err return err

View File

@ -26,9 +26,10 @@ type pair struct {
v any v any
} }
var cachedUser *iam.User
var cachedIsServicePrincipal *bool
func loadHelpers(ctx context.Context) template.FuncMap { func loadHelpers(ctx context.Context) template.FuncMap {
var user *iam.User
var is_service_principal *bool
w := root.WorkspaceClient(ctx) w := root.WorkspaceClient(ctx)
return template.FuncMap{ return template.FuncMap{
"fail": func(format string, args ...any) (any, error) { "fail": func(format string, args ...any) (any, error) {
@ -80,32 +81,32 @@ func loadHelpers(ctx context.Context) template.FuncMap {
return w.Config.Host, nil return w.Config.Host, nil
}, },
"user_name": func() (string, error) { "user_name": func() (string, error) {
if user == nil { if cachedUser == nil {
var err error var err error
user, err = w.CurrentUser.Me(ctx) cachedUser, err = w.CurrentUser.Me(ctx)
if err != nil { if err != nil {
return "", err return "", err
} }
} }
result := user.UserName result := cachedUser.UserName
if result == "" { if result == "" {
result = user.Id result = cachedUser.Id
} }
return result, nil return result, nil
}, },
"is_service_principal": func() (bool, error) { "is_service_principal": func() (bool, error) {
if is_service_principal != nil { if cachedIsServicePrincipal != nil {
return *is_service_principal, nil return *cachedIsServicePrincipal, nil
} }
if user == nil { if cachedUser == nil {
var err error var err error
user, err = w.CurrentUser.Me(ctx) cachedUser, err = w.CurrentUser.Me(ctx)
if err != nil { if err != nil {
return false, err return false, err
} }
} }
result := auth.IsServicePrincipal(user.Id) result := auth.IsServicePrincipal(cachedUser.Id)
is_service_principal = &result cachedIsServicePrincipal = &result
return result, nil return result, nil
}, },
} }

View File

@ -9,6 +9,7 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"slices" "slices"
"sort"
"strings" "strings"
"text/template" "text/template"
@ -214,17 +215,22 @@ func (r *renderer) walk() error {
// Add skip function, which accumulates skip patterns relative to current // Add skip function, which accumulates skip patterns relative to current
// directory // directory
r.baseTemplate.Funcs(template.FuncMap{ r.baseTemplate.Funcs(template.FuncMap{
"skip": func(relPattern string) string { "skip": func(relPattern string) (string, error) {
// patterns are specified relative to current directory of the file // patterns are specified relative to current directory of the file
// the {{skip}} function is called from. // the {{skip}} function is called from.
pattern := path.Join(currentDirectory, relPattern) patternRaw := path.Join(currentDirectory, relPattern)
pattern, err := r.executeTemplate(patternRaw)
if err != nil {
return "", err
}
if !slices.Contains(r.skipPatterns, pattern) { if !slices.Contains(r.skipPatterns, pattern) {
logger.Infof(r.ctx, "adding skip pattern: %s", pattern) logger.Infof(r.ctx, "adding skip pattern: %s", pattern)
r.skipPatterns = append(r.skipPatterns, pattern) r.skipPatterns = append(r.skipPatterns, pattern)
} }
// return empty string will print nothing at function call site // return empty string will print nothing at function call site
// when executing the template // when executing the template
return "" return "", nil
}, },
}) })
@ -239,6 +245,10 @@ func (r *renderer) walk() error {
if err != nil { if err != nil {
return err return err
} }
// Sort by name to ensure deterministic ordering
sort.Slice(entries, func(i, j int) bool {
return entries[i].Name() < entries[j].Name()
})
for _, entry := range entries { for _, entry := range entries {
if entry.IsDir() { if entry.IsDir() {
// Add to slice, for BFS traversal // Add to slice, for BFS traversal

View File

@ -12,7 +12,14 @@ import (
"testing" "testing"
"text/template" "text/template"
"github.com/databricks/cli/bundle"
bundleConfig "github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/phases"
"github.com/databricks/cli/cmd/root" "github.com/databricks/cli/cmd/root"
"github.com/databricks/databricks-sdk-go"
workspaceConfig "github.com/databricks/databricks-sdk-go/config"
"github.com/databricks/databricks-sdk-go/service/iam"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -29,6 +36,95 @@ func assertFilePermissions(t *testing.T, path string, perm fs.FileMode) {
assert.Equal(t, perm, info.Mode().Perm()) assert.Equal(t, perm, info.Mode().Perm())
} }
func assertBuiltinTemplateValid(t *testing.T, settings map[string]any, target string, isServicePrincipal bool, build bool, tempDir string) {
ctx := context.Background()
templatePath, err := prepareBuiltinTemplates("default-python", tempDir)
require.NoError(t, err)
w := &databricks.WorkspaceClient{
Config: &workspaceConfig.Config{Host: "https://myhost.com"},
}
// Prepare helpers
cachedUser = &iam.User{UserName: "user@domain.com"}
cachedIsServicePrincipal = &isServicePrincipal
ctx = root.SetWorkspaceClient(ctx, w)
helpers := loadHelpers(ctx)
renderer, err := newRenderer(ctx, settings, helpers, templatePath, "./testdata/template-in-path/library", tempDir)
require.NoError(t, err)
// Evaluate template
err = renderer.walk()
require.NoError(t, err)
err = renderer.persistToDisk()
require.NoError(t, err)
b, err := bundle.Load(ctx, filepath.Join(tempDir, "template", "my_project"))
require.NoError(t, err)
// Apply initialize / validation mutators
b.Config.Workspace.CurrentUser = &bundleConfig.User{User: cachedUser}
b.WorkspaceClient()
b.Config.Bundle.Terraform = &bundleConfig.Terraform{
ExecPath: "sh",
}
err = bundle.Apply(ctx, b, bundle.Seq(
bundle.Seq(mutator.DefaultMutators()...),
mutator.SelectTarget(target),
phases.Initialize(),
))
require.NoError(t, err)
// Apply build mutator
if build {
err = bundle.Apply(ctx, b, phases.Build())
require.NoError(t, err)
}
}
func TestBuiltinTemplateValid(t *testing.T) {
// Test option combinations
options := []string{"yes", "no"}
isServicePrincipal := false
build := false
for _, includeNotebook := range options {
for _, includeDlt := range options {
for _, includePython := range options {
for _, isServicePrincipal := range []bool{true, false} {
config := map[string]any{
"project_name": "my_project",
"include_notebook": includeNotebook,
"include_dlt": includeDlt,
"include_python": includePython,
}
tempDir := t.TempDir()
assertBuiltinTemplateValid(t, config, "dev", isServicePrincipal, build, tempDir)
}
}
}
}
// Test prod mode + build
config := map[string]any{
"project_name": "my_project",
"include_notebook": "yes",
"include_dlt": "yes",
"include_python": "yes",
}
isServicePrincipal = false
build = true
// On Windows, we can't always remove the resulting temp dir since background
// processes might have it open, so we use 'defer' for a best-effort cleanup
tempDir, err := os.MkdirTemp("", "templates")
require.NoError(t, err)
defer os.RemoveAll(tempDir)
assertBuiltinTemplateValid(t, config, "prod", isServicePrincipal, build, tempDir)
defer os.RemoveAll(tempDir)
}
func TestRendererWithAssociatedTemplateInLibrary(t *testing.T) { func TestRendererWithAssociatedTemplateInLibrary(t *testing.T) {
tmpDir := t.TempDir() tmpDir := t.TempDir()

View File

@ -3,7 +3,32 @@
"project_name": { "project_name": {
"type": "string", "type": "string",
"default": "my_project", "default": "my_project",
"description": "Unique name for this project" "description": "Unique name for this project",
"order": 1
},
"include_notebook": {
"todo": "use an enum here, see https://github.com/databricks/cli/pull/668",
"type": "string",
"default": "yes",
"pattern": "^(yes|no)$",
"description": "Include a stub (sample) notebook in 'my_project/src'",
"order": 2
},
"include_dlt": {
"todo": "use an enum here, see https://github.com/databricks/cli/pull/668",
"type": "string",
"default": "yes",
"pattern": "^(yes|no)$",
"description": "Include a stub (sample) DLT pipeline in 'my_project/src'",
"order": 3
},
"include_python": {
"todo": "use an enum here, see https://github.com/databricks/cli/pull/668",
"type": "string",
"default": "yes",
"pattern": "^(yes|no)$",
"description": "Include a stub (sample) Python package 'my_project/src'",
"order": 4
} }
} }
} }

View File

@ -1,3 +1,6 @@
{ {
"project_name": "my_project" "project_name": "my_project",
"include_notebook": "yes",
"include_dlt": "yes",
"include_python": "yes"
} }

View File

@ -0,0 +1,38 @@
# Preamble
This file only template directives; it is skipped for the actual output.
{{skip "__preamble"}}
{{ $value := .project_name }}
{{with (regexp "^[A-Za-z0-9_]*$")}}
{{if not (.MatchString $value)}}
{{fail "Invalid project_name: %s. Must consist of letter and underscores only." $value}}
{{end}}
{{end}}
{{$notDLT := not (eq .include_dlt "yes")}}
{{$notNotebook := not (eq .include_notebook "yes")}}
{{$notPython := not (eq .include_python "yes")}}
{{if $notPython}}
{{skip "{{.project_name}}/src/{{.project_name}}"}}
{{skip "{{.project_name}}/tests/test_main.py"}}
{{skip "{{.project_name}}/setup.py"}}
{{skip "{{.project_name}}/pytest.ini"}}
{{end}}
{{if $notDLT}}
{{skip "{{.project_name}}/src/dlt_pipeline.ipynb"}}
{{skip "{{.project_name}}/resources/{{.project_name}}_pipeline.yml"}}
{{end}}
{{if $notNotebook}}
{{skip "{{.project_name}}/src/notebook.iypnb"}}
{{end}}
{{if (and $notDLT $notNotebook $notPython)}}
{{skip "{{.project_name}}/resources/{{.project_name}}_job.yml"}}
{{else}}
{{skip "{{.project_name}}/resources/.gitkeep"}}
{{end}}

View File

@ -28,10 +28,17 @@ The '{{.project_name}}' project was generated by using the default-python templa
$ databricks bundle deploy --target prod $ databricks bundle deploy --target prod
``` ```
5. Optionally, install developer tools such as the Databricks extension for Visual Studio Code from 5. To run a job or pipeline, use the "run" comand:
https://docs.databricks.com/dev-tools/vscode-ext.html. Or read the "getting started" documentation for ```
**Databricks Connect** for instructions on running the included Python code from a different IDE. $ databricks bundle run {{.project_name}}_job
```
6. For documentation on the Databricks asset bundles format used 6. Optionally, install developer tools such as the Databricks extension for Visual Studio Code from
https://docs.databricks.com/dev-tools/vscode-ext.html.
{{- if (eq .include_python "yes") }} Or read the "getting started" documentation for
**Databricks Connect** for instructions on running the included Python code from a different IDE.
{{- end}}
7. For documentation on the Databricks asset bundles format used
for this project, and for CI/CD configuration, see for this project, and for CI/CD configuration, see
https://docs.databricks.com/dev-tools/bundles/index.html. https://docs.databricks.com/dev-tools/bundles/index.html.

View File

@ -0,0 +1 @@
This folder is reserved for Databricks Asset Bundles resource definitions.

View File

@ -1,6 +1,5 @@
# The main job for {{.project_name}} # The main job for {{.project_name}}
resources: resources:
jobs: jobs:
{{.project_name}}_job: {{.project_name}}_job:
name: {{.project_name}}_job name: {{.project_name}}_job
@ -10,20 +9,41 @@ resources:
timezone_id: Europe/Amsterdam timezone_id: Europe/Amsterdam
{{- if not is_service_principal}} {{- if not is_service_principal}}
email_notifications: email_notifications:
on_failure: on_failure:
- {{user_name}} - {{user_name}}
{{else}}
{{end -}} {{end -}}
tasks: tasks:
{{- if eq .include_notebook "yes" }}
- task_key: notebook_task - task_key: notebook_task
job_cluster_key: job_cluster job_cluster_key: job_cluster
notebook_task: notebook_task:
notebook_path: ../src/notebook.ipynb notebook_path: ../src/notebook.ipynb
{{end -}}
- task_key: python_wheel_task {{- if (eq .include_dlt "yes") }}
- task_key: refresh_pipeline
{{- if (eq .include_notebook "yes" )}}
depends_on: depends_on:
- task_key: notebook_task - task_key: notebook_task
{{- end}}
pipeline_task:
{{- /* TODO: we should find a way that doesn't use magics for the below, like ./{{project_name}}_pipeline.yml */}}
pipeline_id: ${resources.pipelines.{{.project_name}}_pipeline.id}
{{end -}}
{{- if (eq .include_python "yes") }}
- task_key: main_task
{{- if (eq .include_dlt "yes") }}
depends_on:
- task_key: refresh_pipeline
{{- else if (eq .include_notebook "yes" )}}
depends_on:
- task_key: notebook_task
{{end}}
job_cluster_key: job_cluster job_cluster_key: job_cluster
python_wheel_task: python_wheel_task:
package_name: {{.project_name}} package_name: {{.project_name}}
@ -31,6 +51,8 @@ resources:
libraries: libraries:
- whl: ../dist/*.whl - whl: ../dist/*.whl
{{else}}
{{end -}}
job_clusters: job_clusters:
- job_cluster_key: job_cluster - job_cluster_key: job_cluster
new_cluster: new_cluster:

View File

@ -0,0 +1,12 @@
# The main pipeline for {{.project_name}}
resources:
pipelines:
{{.project_name}}_pipeline:
name: "{{.project_name}}_pipeline"
target: "{{.project_name}}_${bundle.environment}"
libraries:
- notebook:
path: ../src/dlt_pipeline.ipynb
configuration:
"bundle.sourcePath": "/Workspace/${workspace.file_path}/src"

View File

@ -17,11 +17,15 @@
}, },
"outputs": [], "outputs": [],
"source": [ "source": [
{{- if (eq .include_python "yes") }}
"import sys\n", "import sys\n",
"sys.path.append('../src')\n", "sys.path.append('../src')\n",
"from project import main\n", "from {{.project_name}} import main\n",
"\n", "\n",
"main.taxis.show(10)" "main.get_taxis().show(10)"
{{else}}
"spark.range(10)"
{{end -}}
] ]
} }
], ],

View File

@ -0,0 +1,112 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "9a626959-61c8-4bba-84d2-2a4ecab1f7ec",
"showTitle": false,
"title": ""
}
},
"source": [
"# DLT pipeline\n",
"\n",
"This Delta Live Tables (DLT) definition is executed using a pipeline defined in resources/{{.my_project}}_pipeline.yml."
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "9198e987-5606-403d-9f6d-8f14e6a4017f",
"showTitle": false,
"title": ""
},
"jupyter": {
{{- /* Collapse this cell by default. Just boring imports here! */}}
"source_hidden": true
}
},
"outputs": [],
"source": [
{{- if (eq .include_python "yes") }}
"# Import DLT and make sure 'my_project' is on the Python path\n",
"import dlt\n",
"from pyspark.sql.functions import expr\n",
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.getOrCreate()\n",
"import sys\n",
"try:\n",
" sys.path.append(spark.conf.get(\"bundle.sourcePath\"))\n",
"except:\n",
" pass\n",
"from my_project import main"
{{else}}
"# Import DLT\n",
"import dlt\n",
"from pyspark.sql.functions import expr\n",
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.getOrCreate()"
{{end -}}
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "3fc19dba-61fd-4a89-8f8c-24fee63bfb14",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
{{- if (eq .include_python "yes") }}
"@dlt.view\n",
"def taxi_raw():\n",
" return main.get_taxis()\n",
{{else}}
"\n",
"@dlt.view\n",
"def taxi_raw():\n",
" return spark.read.format(\"json\").load(\"/databricks-datasets/nyctaxi/sample/json/\")\n",
{{end -}}
"\n",
"@dlt.table\n",
"def filtered_taxis():\n",
" return dlt.read(\"taxi_raw\").filter(expr(\"fare_amount < 30\"))"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"dashboards": [],
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 2
},
"notebookName": "dlt_pipeline",
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

View File

@ -34,9 +34,13 @@
}, },
"outputs": [], "outputs": [],
"source": [ "source": [
{{- if (eq .include_python "yes") }}
"from {{.project_name}} import main\n", "from {{.project_name}} import main\n",
"\n", "\n",
"main.get_taxis().show(10)\n" "main.get_taxis().show(10)"
{{else}}
"spark.range(10)"
{{end -}}
] ]
} }
], ],

View File

@ -2,4 +2,4 @@ from {{.project_name}} import main
def test_main(): def test_main():
taxis = main.get_taxis() taxis = main.get_taxis()
assert taxis.count() == 5 assert taxis.count() > 5