Include a materialized copy of built-in templates (#2146)

## Changes

Include a materialized copy of built-in templates as reference output.

This updates the output comparison logic to work against an output
directory. The `doComparison` function now always works on real files.
It can now tell apart non-existing files and empty files (e.g., the
`.gitkeep` files in templates).
This commit is contained in:
Pieter Noordhuis 2025-01-17 16:03:59 +01:00 committed by GitHub
parent 0d5193a62c
commit 50f62692ce
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
65 changed files with 1302 additions and 80 deletions

View File

@ -89,6 +89,7 @@ func TestAccept(t *testing.T) {
require.NotNil(t, user) require.NotNil(t, user)
testdiff.PrepareReplacementsUser(t, &repls, *user) testdiff.PrepareReplacementsUser(t, &repls, *user)
testdiff.PrepareReplacementsWorkspaceClient(t, &repls, workspaceClient) testdiff.PrepareReplacementsWorkspaceClient(t, &repls, workspaceClient)
testdiff.PrepareReplacementsUUID(t, &repls)
testDirs := getTests(t) testDirs := getTests(t)
require.NotEmpty(t, testDirs) require.NotEmpty(t, testDirs)
@ -154,70 +155,86 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont
require.NoError(t, err) require.NoError(t, err)
cmd.Env = append(os.Environ(), "GOCOVERDIR="+coverDir) cmd.Env = append(os.Environ(), "GOCOVERDIR="+coverDir)
} }
// Write combined output to a file
out, err := os.Create(filepath.Join(tmpDir, "output.txt"))
require.NoError(t, err)
cmd.Stdout = out
cmd.Stderr = out
cmd.Dir = tmpDir cmd.Dir = tmpDir
outB, err := cmd.CombinedOutput() err = cmd.Run()
out := formatOutput(string(outB), err) // Include exit code in output (if non-zero)
out = repls.Replace(out) formatOutput(out, err)
doComparison(t, filepath.Join(dir, "output.txt"), "script output", out) require.NoError(t, out.Close())
for key := range outputs { // Compare expected outputs
if key == "output.txt" { for relPath := range outputs {
// handled above doComparison(t, repls, dir, tmpDir, relPath)
continue
}
pathNew := filepath.Join(tmpDir, key)
newValBytes, err := os.ReadFile(pathNew)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
t.Errorf("%s: expected to find this file but could not (%s)", key, tmpDir)
} else {
t.Errorf("%s: could not read: %s", key, err)
}
continue
}
pathExpected := filepath.Join(dir, key)
newVal := repls.Replace(string(newValBytes))
doComparison(t, pathExpected, pathNew, newVal)
} }
// Make sure there are not unaccounted for new files // Make sure there are not unaccounted for new files
files, err := os.ReadDir(tmpDir) files, err := ListDir(t, tmpDir)
require.NoError(t, err) require.NoError(t, err)
for _, relPath := range files {
for _, f := range files { if _, ok := inputs[relPath]; ok {
name := f.Name()
if _, ok := inputs[name]; ok {
continue continue
} }
if _, ok := outputs[name]; ok { if _, ok := outputs[relPath]; ok {
continue continue
} }
t.Errorf("Unexpected output: %s", f) if strings.HasPrefix(relPath, "out") {
if strings.HasPrefix(name, "out") {
// We have a new file starting with "out" // We have a new file starting with "out"
// Show the contents & support overwrite mode for it: // Show the contents & support overwrite mode for it:
pathNew := filepath.Join(tmpDir, name) doComparison(t, repls, dir, tmpDir, relPath)
newVal := testutil.ReadFile(t, pathNew)
newVal = repls.Replace(newVal)
doComparison(t, filepath.Join(dir, name), filepath.Join(tmpDir, name), newVal)
} }
} }
} }
func doComparison(t *testing.T, pathExpected, pathNew, valueNew string) { func doComparison(t *testing.T, repls testdiff.ReplacementsContext, dirRef, dirNew, relPath string) {
valueNew = testdiff.NormalizeNewlines(valueNew) pathRef := filepath.Join(dirRef, relPath)
valueExpected := string(readIfExists(t, pathExpected)) pathNew := filepath.Join(dirNew, relPath)
valueExpected = testdiff.NormalizeNewlines(valueExpected) bufRef, okRef := readIfExists(t, pathRef)
testdiff.AssertEqualTexts(t, pathExpected, pathNew, valueExpected, valueNew) bufNew, okNew := readIfExists(t, pathNew)
if testdiff.OverwriteMode { if !okRef && !okNew {
if valueNew != "" { t.Errorf("Both files are missing: %s, %s", pathRef, pathNew)
t.Logf("Overwriting: %s", pathExpected) return
testutil.WriteFile(t, pathExpected, valueNew) }
} else {
t.Logf("Removing: %s", pathExpected) valueRef := testdiff.NormalizeNewlines(string(bufRef))
_ = os.Remove(pathExpected) valueNew := testdiff.NormalizeNewlines(string(bufNew))
// Apply replacements to the new value only.
// The reference value is stored after applying replacements.
valueNew = repls.Replace(valueNew)
// The test did not produce an expected output file.
if okRef && !okNew {
t.Errorf("Missing output file: %s", relPath)
testdiff.AssertEqualTexts(t, pathRef, pathNew, valueRef, valueNew)
if testdiff.OverwriteMode {
t.Logf("Removing output file: %s", relPath)
require.NoError(t, os.Remove(pathRef))
} }
return
}
// The test produced an unexpected output file.
if !okRef && okNew {
t.Errorf("Unexpected output file: %s", relPath)
testdiff.AssertEqualTexts(t, pathRef, pathNew, valueRef, valueNew)
if testdiff.OverwriteMode {
t.Logf("Writing output file: %s", relPath)
testutil.WriteFile(t, pathRef, valueNew)
}
return
}
// Compare the reference and new values.
equal := testdiff.AssertEqualTexts(t, pathRef, pathNew, valueRef, valueNew)
if !equal && testdiff.OverwriteMode {
t.Logf("Overwriting existing output file: %s", relPath)
testutil.WriteFile(t, pathRef, valueNew)
} }
} }
@ -234,13 +251,13 @@ func readMergedScriptContents(t *testing.T, dir string) string {
cleanups := []string{} cleanups := []string{}
for { for {
x := readIfExists(t, filepath.Join(dir, CleanupScript)) x, ok := readIfExists(t, filepath.Join(dir, CleanupScript))
if len(x) > 0 { if ok {
cleanups = append(cleanups, string(x)) cleanups = append(cleanups, string(x))
} }
x = readIfExists(t, filepath.Join(dir, PrepareScript)) x, ok = readIfExists(t, filepath.Join(dir, PrepareScript))
if len(x) > 0 { if ok {
prepares = append(prepares, string(x)) prepares = append(prepares, string(x))
} }
@ -316,29 +333,28 @@ func copyFile(src, dst string) error {
return err return err
} }
func formatOutput(out string, err error) string { func formatOutput(w io.Writer, err error) {
if err == nil { if err == nil {
return out return
} }
if exiterr, ok := err.(*exec.ExitError); ok { if exiterr, ok := err.(*exec.ExitError); ok {
exitCode := exiterr.ExitCode() exitCode := exiterr.ExitCode()
out += fmt.Sprintf("\nExit code: %d\n", exitCode) fmt.Fprintf(w, "\nExit code: %d\n", exitCode)
} else { } else {
out += fmt.Sprintf("\nError: %s\n", err) fmt.Fprintf(w, "\nError: %s\n", err)
} }
return out
} }
func readIfExists(t *testing.T, path string) []byte { func readIfExists(t *testing.T, path string) ([]byte, bool) {
data, err := os.ReadFile(path) data, err := os.ReadFile(path)
if err == nil { if err == nil {
return data return data, true
} }
if !errors.Is(err, os.ErrNotExist) { if !errors.Is(err, os.ErrNotExist) {
t.Fatalf("%s: %s", path, err) t.Fatalf("%s: %s", path, err)
} }
return []byte{} return []byte{}, false
} }
func CopyDir(src, dst string, inputs, outputs map[string]bool) error { func CopyDir(src, dst string, inputs, outputs map[string]bool) error {
@ -353,8 +369,10 @@ func CopyDir(src, dst string, inputs, outputs map[string]bool) error {
return err return err
} }
if strings.HasPrefix(name, "out") { if strings.HasPrefix(relPath, "out") {
outputs[relPath] = true if !info.IsDir() {
outputs[relPath] = true
}
return nil return nil
} else { } else {
inputs[relPath] = true inputs[relPath] = true
@ -373,3 +391,25 @@ func CopyDir(src, dst string, inputs, outputs map[string]bool) error {
return copyFile(path, destPath) return copyFile(path, destPath)
}) })
} }
func ListDir(t *testing.T, src string) ([]string, error) {
var files []string
err := filepath.Walk(src, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
relPath, err := filepath.Rel(src, path)
if err != nil {
return err
}
files = append(files, relPath)
return nil
})
return files, err
}

View File

@ -1,5 +0,0 @@
trace $CLI bundle init dbt-sql --config-file ./input.json
cd my_dbt_sql
trace $CLI bundle validate -t dev
trace $CLI bundle validate -t prod

View File

@ -1 +0,0 @@
rm -fr my_dbt_sql

View File

@ -1 +0,0 @@
rm -fr my_default_python

View File

@ -1,5 +0,0 @@
trace $CLI bundle init default-sql --config-file ./input.json
cd my_default_sql
trace $CLI bundle validate -t dev
trace $CLI bundle validate -t prod

View File

@ -1 +0,0 @@
rm -fr my_default_sql

View File

@ -1,5 +1,5 @@
>>> $CLI bundle init dbt-sql --config-file ./input.json >>> $CLI bundle init dbt-sql --config-file ./input.json --output-dir output
Welcome to the dbt template for Databricks Asset Bundles! Welcome to the dbt template for Databricks Asset Bundles!

View File

@ -0,0 +1,2 @@
.databricks

View File

@ -0,0 +1,3 @@
# Typings for Pylance in Visual Studio Code
# see https://github.com/microsoft/pyright/blob/main/docs/builtins.md
from databricks.sdk.runtime import *

View File

@ -0,0 +1,6 @@
{
"recommendations": [
"redhat.vscode-yaml",
"innoverio.vscode-dbt-power-user",
]
}

View File

@ -0,0 +1,32 @@
{
"python.analysis.stubPath": ".vscode",
"jupyter.interactiveWindow.cellMarker.codeRegex": "^# COMMAND ----------|^# Databricks notebook source|^(#\\s*%%|#\\s*\\<codecell\\>|#\\s*In\\[\\d*?\\]|#\\s*In\\[ \\])",
"jupyter.interactiveWindow.cellMarker.default": "# COMMAND ----------",
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.analysis.extraPaths": ["src"],
"files.exclude": {
"**/*.egg-info": true,
"**/__pycache__": true,
".pytest_cache": true,
},
"python.envFile": "${workspaceFolder}/.databricks/.databricks.env",
"python.defaultInterpreterPath": "${workspaceFolder}/.venv/bin/python",
"sqltools.connections": [
{
"connectionMethod": "VS Code Extension (beta)",
"catalog": "hive_metastore",
"previewLimit": 50,
"driver": "Databricks",
"name": "databricks",
"path": "/sql/2.0/warehouses/f00dcafe"
}
],
"sqltools.autoConnectTo": "",
"[jinja-sql]": {
"editor.defaultFormatter": "innoverio.vscode-dbt-power-user"
}
}

View File

@ -0,0 +1,138 @@
# my_dbt_sql
The 'my_dbt_sql' project was generated by using the dbt template for
Databricks Asset Bundles. It follows the standard dbt project structure
and has an additional `resources` directory to define Databricks resources such as jobs
that run dbt models.
* Learn more about dbt and its standard project structure here: https://docs.getdbt.com/docs/build/projects.
* Learn more about Databricks Asset Bundles here: https://docs.databricks.com/en/dev-tools/bundles/index.html
The remainder of this file includes instructions for local development (using dbt)
and deployment to production (using Databricks Asset Bundles).
## Development setup
1. Install the Databricks CLI from https://docs.databricks.com/dev-tools/cli/databricks-cli.html
2. Authenticate to your Databricks workspace, if you have not done so already:
```
$ databricks configure
```
3. Install dbt
To install dbt, you need a recent version of Python. For the instructions below,
we assume `python3` refers to the Python version you want to use. On some systems,
you may need to refer to a different Python version, e.g. `python` or `/usr/bin/python`.
Run these instructions from the `my_dbt_sql` directory. We recommend making
use of a Python virtual environment and installing dbt as follows:
```
$ python3 -m venv .venv
$ . .venv/bin/activate
$ pip install -r requirements-dev.txt
```
4. Initialize your dbt profile
Use `dbt init` to initialize your profile.
```
$ dbt init
```
Note that dbt authentication uses personal access tokens by default
(see https://docs.databricks.com/dev-tools/auth/pat.html).
You can use OAuth as an alternative, but this currently requires manual configuration.
See https://github.com/databricks/dbt-databricks/blob/main/docs/oauth.md
for general instructions, or https://community.databricks.com/t5/technical-blog/using-dbt-core-with-oauth-on-azure-databricks/ba-p/46605
for advice on setting up OAuth for Azure Databricks.
To setup up additional profiles, such as a 'prod' profile,
see https://docs.getdbt.com/docs/core/connect-data-platform/connection-profiles.
5. Activate dbt so it can be used from the terminal
```
$ . .venv/bin/activate
```
## Local development with dbt
Use `dbt` to [run this project locally using a SQL warehouse](https://docs.databricks.com/partners/prep/dbt.html):
```
$ dbt seed
$ dbt run
```
(Did you get an error that the dbt command could not be found? You may need
to try the last step from the development setup above to re-activate
your Python virtual environment!)
To just evaluate a single model defined in a file called orders.sql, use:
```
$ dbt run --model orders
```
Use `dbt test` to run tests generated from yml files such as `models/schema.yml`
and any SQL tests from `tests/`
```
$ dbt test
```
## Production setup
Your production dbt profiles are defined in dbt_profiles/profiles.yml.
These profiles define the default catalog, schema, and any other
target-specific settings. Read more about dbt profiles on Databricks at
https://docs.databricks.com/en/workflows/jobs/how-to/use-dbt-in-workflows.html#advanced-run-dbt-with-a-custom-profile.
The target workspaces for staging and prod are defined in databricks.yml.
You can manually deploy based on these configurations (see below).
Or you can use CI/CD to automate deployment. See
https://docs.databricks.com/dev-tools/bundles/ci-cd.html for documentation
on CI/CD setup.
## Manually deploying to Databricks with Databricks Asset Bundles
Databricks Asset Bundles can be used to deploy to Databricks and to execute
dbt commands as a job using Databricks Workflows. See
https://docs.databricks.com/dev-tools/bundles/index.html to learn more.
Use the Databricks CLI to deploy a development copy of this project to a workspace:
```
$ databricks bundle deploy --target dev
```
(Note that "dev" is the default target, so the `--target` parameter
is optional here.)
This deploys everything that's defined for this project.
For example, the default template would deploy a job called
`[dev yourname] my_dbt_sql_job` to your workspace.
You can find that job by opening your workpace and clicking on **Workflows**.
You can also deploy to your production target directly from the command-line.
The warehouse, catalog, and schema for that target are configured in databricks.yml.
When deploying to this target, note that the default job at resources/my_dbt_sql.job.yml
has a schedule set that runs every day. The schedule is paused when deploying in development mode
(see https://docs.databricks.com/dev-tools/bundles/deployment-modes.html).
To deploy a production copy, type:
```
$ databricks bundle deploy --target prod
```
## IDE support
Optionally, install developer tools such as the Databricks extension for Visual Studio Code from
https://docs.databricks.com/dev-tools/vscode-ext.html. Third-party extensions
related to dbt may further enhance your dbt development experience!

View File

@ -0,0 +1,34 @@
# This file defines the structure of this project and how it is deployed
# to production using Databricks Asset Bundles.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: my_dbt_sql
uuid: <UUID>
include:
- resources/*.yml
# Deployment targets.
# The default schema, catalog, etc. for dbt are defined in dbt_profiles/profiles.yml
targets:
dev:
default: true
# The default target uses 'mode: development' to create a development copy.
# - Deployed resources get prefixed with '[dev my_user_name]'
# - Any job schedules and triggers are paused by default.
# See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
mode: development
workspace:
host: $DATABRICKS_URL
prod:
mode: production
workspace:
host: $DATABRICKS_URL
# We explicitly specify /Workspace/Users/$USERNAME to make sure we only have a single copy.
root_path: /Workspace/Users/$USERNAME/.bundle/${bundle.name}/${bundle.target}
permissions:
- user_name: $USERNAME
level: CAN_MANAGE
run_as:
user_name: $USERNAME

View File

@ -0,0 +1,38 @@
# This file defines dbt profiles for deployed dbt jobs.
my_dbt_sql:
target: dev # default target
outputs:
# Doing local development with the dbt CLI?
# Then you should create your own profile in your .dbt/profiles.yml using 'dbt init'
# (See README.md)
# The default target when deployed with the Databricks CLI
# N.B. when you use dbt from the command line, it uses the profile from .dbt/profiles.yml
dev:
type: databricks
method: http
catalog: main
schema: "{{ var('dev_schema') }}"
http_path: /sql/2.0/warehouses/f00dcafe
# The workspace host / token are provided by Databricks
# see databricks.yml for the workspace host used for 'dev'
host: "{{ env_var('DBT_HOST') }}"
token: "{{ env_var('DBT_ACCESS_TOKEN') }}"
# The production target when deployed with the Databricks CLI
prod:
type: databricks
method: http
catalog: main
schema: default
http_path: /sql/2.0/warehouses/f00dcafe
# The workspace host / token are provided by Databricks
# see databricks.yml for the workspace host used for 'prod'
host: "{{ env_var('DBT_HOST') }}"
token: "{{ env_var('DBT_ACCESS_TOKEN') }}"

View File

@ -0,0 +1,32 @@
name: 'my_dbt_sql'
version: '1.0.0'
config-version: 2
# This setting configures which "profile" dbt uses for this project.
profile: 'my_dbt_sql'
# These configurations specify where dbt should look for different types of files.
# For Databricks asset bundles, we put everything in src, as you may have
# non-dbt resources in your project.
model-paths: ["src/models"]
analysis-paths: ["src/analyses"]
test-paths: ["src/tests"]
seed-paths: ["src/seeds"]
macro-paths: ["src/macros"]
snapshot-paths: ["src/snapshots"]
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_packages"
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build all models in the example/
# directory as views by default. These settings can be overridden in the
# individual model files using the `{{ config(...) }}` macro.
models:
my_dbt_sql:
# Config indicated by + and applies to all files under models/example/
example:
+materialized: view

View File

@ -0,0 +1,23 @@
# This file defines prompts with defaults for dbt initializaton.
# It is used when the `dbt init` command is invoked.
#
fixed:
type: databricks
prompts:
host:
default: $DATABRICKS_HOST
token:
hint: 'personal access token to use, dapiXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
hide_input: true
http_path:
hint: 'HTTP path of SQL warehouse to use'
default: /sql/2.0/warehouses/f00dcafe
catalog:
hint: 'initial catalog'
default: main
schema:
hint: 'personal schema where dbt will build objects during development, example: $USERNAME'
threads:
hint: 'threads to use during development, 1 or more'
type: 'int'
default: 4

View File

@ -0,0 +1,3 @@
## requirements-dev.txt: dependencies for local development.
dbt-databricks>=1.8.0,<2.0.0

View File

@ -0,0 +1,43 @@
resources:
jobs:
my_dbt_sql_job:
name: my_dbt_sql_job
trigger:
# Run this job every day, exactly one day from the last run; see https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- $USERNAME
tasks:
- task_key: dbt
dbt_task:
project_directory: ../
# The default schema, catalog, etc. are defined in ../dbt_profiles/profiles.yml
profiles_directory: dbt_profiles/
commands:
# The dbt commands to run (see also dbt_profiles/profiles.yml; dev_schema is used in the dev profile)
- 'dbt deps --target=${bundle.target}'
- 'dbt seed --target=${bundle.target} --vars "{ dev_schema: ${workspace.current_user.short_name} }"'
- 'dbt run --target=${bundle.target} --vars "{ dev_schema: ${workspace.current_user.short_name} }"'
libraries:
- pypi:
package: dbt-databricks>=1.8.0,<2.0.0
new_cluster:
spark_version: 15.4.x-scala2.12
node_type_id: i3.xlarge
data_security_mode: SINGLE_USER
num_workers: 0
spark_conf:
spark.master: "local[*, 4]"
spark.databricks.cluster.profile: singleNode
custom_tags:
ResourceClass: SingleNode

View File

@ -0,0 +1,17 @@
-- This model file defines a materialized view called 'orders_daily'
--
-- Read more about materialized at https://docs.getdbt.com/reference/resource-configs/databricks-configs#materialized-views-and-streaming-tables
-- Current limitation: a "full refresh" is needed in case the definition below is changed; see https://github.com/databricks/dbt-databricks/issues/561.
{{ config(materialized = 'materialized_view') }}
select order_date, count(*) AS number_of_orders
from {{ ref('orders_raw') }}
-- During development, only process a smaller range of data
{% if target.name != 'prod' %}
where order_date >= '2019-08-01' and order_date < '2019-09-01'
{% endif %}
group by order_date

View File

@ -0,0 +1,16 @@
-- This model file defines a streaming table called 'orders_raw'
--
-- The streaming table below ingests all JSON files in /databricks-datasets/retail-org/sales_orders/
-- Read more about streaming tables at https://docs.getdbt.com/reference/resource-configs/databricks-configs#materialized-views-and-streaming-tables
-- Current limitation: a "full refresh" is needed in case the definition below is changed; see https://github.com/databricks/dbt-databricks/issues/561.
{{ config(materialized = 'streaming_table') }}
select
customer_name,
date(timestamp(from_unixtime(try_cast(order_datetime as bigint)))) as order_date,
order_number
from stream read_files(
"/databricks-datasets/retail-org/sales_orders/",
format => "json",
header => true
)

View File

@ -0,0 +1,21 @@
version: 2
models:
- name: orders_raw
description: "Raw ingested orders"
columns:
- name: customer_name
description: "The name of a customer"
data_tests:
- unique
- not_null
- name: orders_daily
description: "Number of orders by day"
columns:
- name: order_date
description: "The date on which orders took place"
data_tests:
- unique
- not_null

View File

@ -0,0 +1,5 @@
trace $CLI bundle init dbt-sql --config-file ./input.json --output-dir output
cd output/my_dbt_sql
trace $CLI bundle validate -t dev
trace $CLI bundle validate -t prod

View File

@ -1,5 +1,5 @@
>>> $CLI bundle init default-python --config-file ./input.json >>> $CLI bundle init default-python --config-file ./input.json --output-dir output
Welcome to the default Python template for Databricks Asset Bundles! Welcome to the default Python template for Databricks Asset Bundles!
Workspace to use (auto-detected, edit in 'my_default_python/databricks.yml'): $DATABRICKS_URL Workspace to use (auto-detected, edit in 'my_default_python/databricks.yml'): $DATABRICKS_URL

View File

@ -0,0 +1,8 @@
.databricks/
build/
dist/
__pycache__/
*.egg-info
.venv/
scratch/**
!scratch/README.md

View File

@ -0,0 +1,3 @@
# Typings for Pylance in Visual Studio Code
# see https://github.com/microsoft/pyright/blob/main/docs/builtins.md
from databricks.sdk.runtime import *

View File

@ -0,0 +1,7 @@
{
"recommendations": [
"databricks.databricks",
"ms-python.vscode-pylance",
"redhat.vscode-yaml"
]
}

View File

@ -0,0 +1,16 @@
{
"python.analysis.stubPath": ".vscode",
"jupyter.interactiveWindow.cellMarker.codeRegex": "^# COMMAND ----------|^# Databricks notebook source|^(#\\s*%%|#\\s*\\<codecell\\>|#\\s*In\\[\\d*?\\]|#\\s*In\\[ \\])",
"jupyter.interactiveWindow.cellMarker.default": "# COMMAND ----------",
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.analysis.extraPaths": ["src"],
"files.exclude": {
"**/*.egg-info": true,
"**/__pycache__": true,
".pytest_cache": true,
},
}

View File

@ -0,0 +1,47 @@
# my_default_python
The 'my_default_python' project was generated by using the default-python template.
## Getting started
1. Install the Databricks CLI from https://docs.databricks.com/dev-tools/cli/databricks-cli.html
2. Authenticate to your Databricks workspace, if you have not done so already:
```
$ databricks configure
```
3. To deploy a development copy of this project, type:
```
$ databricks bundle deploy --target dev
```
(Note that "dev" is the default target, so the `--target` parameter
is optional here.)
This deploys everything that's defined for this project.
For example, the default template would deploy a job called
`[dev yourname] my_default_python_job` to your workspace.
You can find that job by opening your workpace and clicking on **Workflows**.
4. Similarly, to deploy a production copy, type:
```
$ databricks bundle deploy --target prod
```
Note that the default job from the template has a schedule that runs every day
(defined in resources/my_default_python.job.yml). The schedule
is paused when deploying in development mode (see
https://docs.databricks.com/dev-tools/bundles/deployment-modes.html).
5. To run a job or pipeline, use the "run" command:
```
$ databricks bundle run
```
6. Optionally, install developer tools such as the Databricks extension for Visual Studio Code from
https://docs.databricks.com/dev-tools/vscode-ext.html. Or read the "getting started" documentation for
**Databricks Connect** for instructions on running the included Python code from a different IDE.
7. For documentation on the Databricks asset bundles format used
for this project, and for CI/CD configuration, see
https://docs.databricks.com/dev-tools/bundles/index.html.

View File

@ -0,0 +1,31 @@
# This is a Databricks asset bundle definition for my_default_python.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: my_default_python
uuid: <UUID>
include:
- resources/*.yml
targets:
dev:
# The default target uses 'mode: development' to create a development copy.
# - Deployed resources get prefixed with '[dev my_user_name]'
# - Any job schedules and triggers are paused by default.
# See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
mode: development
default: true
workspace:
host: $DATABRICKS_URL
prod:
mode: production
workspace:
host: $DATABRICKS_URL
# We explicitly specify /Workspace/Users/$USERNAME to make sure we only have a single copy.
root_path: /Workspace/Users/$USERNAME/.bundle/${bundle.name}/${bundle.target}
permissions:
- user_name: $USERNAME
level: CAN_MANAGE
run_as:
user_name: $USERNAME

View File

@ -0,0 +1,22 @@
# Fixtures
This folder is reserved for fixtures, such as CSV files.
Below is an example of how to load fixtures as a data frame:
```
import pandas as pd
import os
def get_absolute_path(*relative_parts):
if 'dbutils' in globals():
base_dir = os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()) # type: ignore
path = os.path.normpath(os.path.join(base_dir, *relative_parts))
return path if path.startswith("/Workspace") else "/Workspace" + path
else:
return os.path.join(*relative_parts)
csv_file = get_absolute_path("..", "fixtures", "mycsv.csv")
df = pd.read_csv(csv_file)
display(df)
```

View File

@ -0,0 +1,3 @@
[pytest]
testpaths = tests
pythonpath = src

View File

@ -0,0 +1,29 @@
## requirements-dev.txt: dependencies for local development.
##
## For defining dependencies used by jobs in Databricks Workflows, see
## https://docs.databricks.com/dev-tools/bundles/library-dependencies.html
## Add code completion support for DLT
databricks-dlt
## pytest is the default package used for testing
pytest
## Dependencies for building wheel files
setuptools
wheel
## databricks-connect can be used to run parts of this project locally.
## See https://docs.databricks.com/dev-tools/databricks-connect.html.
##
## databricks-connect is automatically installed if you're using Databricks
## extension for Visual Studio Code
## (https://docs.databricks.com/dev-tools/vscode-ext/dev-tasks/databricks-connect.html).
##
## To manually install databricks-connect, either follow the instructions
## at https://docs.databricks.com/dev-tools/databricks-connect.html
## to install the package system-wide. Or uncomment the line below to install a
## version of db-connect that corresponds to the Databricks Runtime version used
## for this project.
#
# databricks-connect>=15.4,<15.5

View File

@ -0,0 +1,49 @@
# The main job for my_default_python.
resources:
jobs:
my_default_python_job:
name: my_default_python_job
trigger:
# Run this job every day, exactly one day from the last run; see https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- $USERNAME
tasks:
- task_key: notebook_task
job_cluster_key: job_cluster
notebook_task:
notebook_path: ../src/notebook.ipynb
- task_key: refresh_pipeline
depends_on:
- task_key: notebook_task
pipeline_task:
pipeline_id: ${resources.pipelines.my_default_python_pipeline.id}
- task_key: main_task
depends_on:
- task_key: refresh_pipeline
job_cluster_key: job_cluster
python_wheel_task:
package_name: my_default_python
entry_point: main
libraries:
# By default we just include the .whl file generated for the my_default_python package.
# See https://docs.databricks.com/dev-tools/bundles/library-dependencies.html
# for more information on how to add other libraries.
- whl: ../dist/*.whl
job_clusters:
- job_cluster_key: job_cluster
new_cluster:
spark_version: 15.4.x-scala2.12
node_type_id: i3.xlarge
autoscale:
min_workers: 1
max_workers: 4

View File

@ -0,0 +1,13 @@
# The main pipeline for my_default_python
resources:
pipelines:
my_default_python_pipeline:
name: my_default_python_pipeline
catalog: main
target: my_default_python_${bundle.target}
libraries:
- notebook:
path: ../src/dlt_pipeline.ipynb
configuration:
bundle.sourcePath: ${workspace.file_path}/src

View File

@ -0,0 +1,4 @@
# scratch
This folder is reserved for personal, exploratory notebooks.
By default these are not committed to Git, as 'scratch' is listed in .gitignore.

View File

@ -0,0 +1,61 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "<UUID>",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"import sys\n",
"\n",
"sys.path.append(\"../src\")\n",
"from my_default_python import main\n",
"\n",
"main.get_taxis(spark).show(10)"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"dashboards": [],
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 2
},
"notebookName": "ipynb-notebook",
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

View File

@ -0,0 +1,41 @@
"""
setup.py configuration script describing how to build and package this project.
This file is primarily used by the setuptools library and typically should not
be executed directly. See README.md for how to deploy, test, and run
the my_default_python project.
"""
from setuptools import setup, find_packages
import sys
sys.path.append("./src")
import datetime
import my_default_python
local_version = datetime.datetime.utcnow().strftime("%Y%m%d.%H%M%S")
setup(
name="my_default_python",
# We use timestamp as Local version identifier (https://peps.python.org/pep-0440/#local-version-identifiers.)
# to ensure that changes to wheel package are picked up when used on all-purpose clusters
version=my_default_python.__version__ + "+" + local_version,
url="https://databricks.com",
author="$USERNAME",
description="wheel file based on my_default_python/src",
packages=find_packages(where="./src"),
package_dir={"": "src"},
entry_points={
"packages": [
"main=my_default_python.main:main",
],
},
install_requires=[
# Dependencies in case the output wheel file is used as a library dependency.
# For defining dependencies, when this package is used in Databricks, see:
# https://docs.databricks.com/dev-tools/bundles/library-dependencies.html
"setuptools"
],
)

View File

@ -0,0 +1,90 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "<UUID>",
"showTitle": false,
"title": ""
}
},
"source": [
"# DLT pipeline\n",
"\n",
"This Delta Live Tables (DLT) definition is executed using a pipeline defined in resources/my_default_python.pipeline.yml."
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "<UUID>",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"# Import DLT and src/my_default_python\n",
"import dlt\n",
"import sys\n",
"\n",
"sys.path.append(spark.conf.get(\"bundle.sourcePath\", \".\"))\n",
"from pyspark.sql.functions import expr\n",
"from my_default_python import main"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "<UUID>",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"@dlt.view\n",
"def taxi_raw():\n",
" return main.get_taxis(spark)\n",
"\n",
"\n",
"@dlt.table\n",
"def filtered_taxis():\n",
" return dlt.read(\"taxi_raw\").filter(expr(\"fare_amount < 30\"))"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"dashboards": [],
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 2
},
"notebookName": "dlt_pipeline",
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

View File

@ -0,0 +1,25 @@
from pyspark.sql import SparkSession, DataFrame
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
# Create a new Databricks Connect session. If this fails,
# check that you have configured Databricks Connect correctly.
# See https://docs.databricks.com/dev-tools/databricks-connect.html.
def get_spark() -> SparkSession:
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def main():
get_taxis(get_spark()).show(5)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,75 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "<UUID>",
"showTitle": false,
"title": ""
}
},
"source": [
"# Default notebook\n",
"\n",
"This default notebook is executed using Databricks Workflows as defined in resources/my_default_python.job.yml."
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"%load_ext autoreload\n",
"%autoreload 2"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "<UUID>",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"from my_default_python import main\n",
"\n",
"main.get_taxis(spark).show(10)"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"dashboards": [],
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 2
},
"notebookName": "notebook",
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

View File

@ -0,0 +1,6 @@
from my_default_python.main import get_taxis, get_spark
def test_main():
taxis = get_taxis(get_spark())
assert taxis.count() > 5

View File

@ -1,5 +1,5 @@
trace $CLI bundle init default-python --config-file ./input.json trace $CLI bundle init default-python --config-file ./input.json --output-dir output
cd my_default_python cd output/my_default_python
trace $CLI bundle validate -t dev trace $CLI bundle validate -t dev
trace $CLI bundle validate -t prod trace $CLI bundle validate -t prod

View File

@ -0,0 +1,2 @@
[format]
exclude = ["*.ipynb"]

View File

@ -1,5 +1,5 @@
>>> $CLI bundle init default-sql --config-file ./input.json >>> $CLI bundle init default-sql --config-file ./input.json --output-dir output
Welcome to the default SQL template for Databricks Asset Bundles! Welcome to the default SQL template for Databricks Asset Bundles!

View File

@ -0,0 +1,2 @@
.databricks

View File

@ -0,0 +1,7 @@
{
"recommendations": [
"databricks.databricks",
"redhat.vscode-yaml",
"databricks.sqltools-databricks-driver",
]
}

View File

@ -0,0 +1,27 @@
{
"python.analysis.stubPath": ".vscode",
"jupyter.interactiveWindow.cellMarker.codeRegex": "^# COMMAND ----------|^# Databricks notebook source|^(#\\s*%%|#\\s*\\<codecell\\>|#\\s*In\\[\\d*?\\]|#\\s*In\\[ \\])",
"jupyter.interactiveWindow.cellMarker.default": "# COMMAND ----------",
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.analysis.extraPaths": ["src"],
"files.exclude": {
"**/*.egg-info": true,
"**/__pycache__": true,
".pytest_cache": true,
},
"sqltools.connections": [
{
"connectionMethod": "VS Code Extension (beta)",
"catalog": "main",
"previewLimit": 50,
"driver": "Databricks",
"name": "databricks",
"path": "/sql/2.0/warehouses/f00dcafe"
}
],
"sqltools.autoConnectTo": "",
}

View File

@ -0,0 +1,41 @@
# my_default_sql
The 'my_default_sql' project was generated by using the default-sql template.
## Getting started
1. Install the Databricks CLI from https://docs.databricks.com/dev-tools/cli/install.html
2. Authenticate to your Databricks workspace (if you have not done so already):
```
$ databricks configure
```
3. To deploy a development copy of this project, type:
```
$ databricks bundle deploy --target dev
```
(Note that "dev" is the default target, so the `--target` parameter
is optional here.)
This deploys everything that's defined for this project.
For example, the default template would deploy a job called
`[dev yourname] my_default_sql_job` to your workspace.
You can find that job by opening your workpace and clicking on **Workflows**.
4. Similarly, to deploy a production copy, type:
```
$ databricks bundle deploy --target prod
```
5. To run a job, use the "run" command:
```
$ databricks bundle run
```
6. Optionally, install developer tools such as the Databricks extension for Visual Studio Code from
https://docs.databricks.com/dev-tools/vscode-ext.html.
7. For documentation on the Databricks Asset Bundles format used
for this project, and for CI/CD configuration, see
https://docs.databricks.com/dev-tools/bundles/index.html.

View File

@ -0,0 +1,48 @@
# This is a Databricks asset bundle definition for my_default_sql.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: my_default_sql
uuid: <UUID>
include:
- resources/*.yml
# Variable declarations. These variables are assigned in the dev/prod targets below.
variables:
warehouse_id:
description: The warehouse to use
catalog:
description: The catalog to use
schema:
description: The schema to use
targets:
dev:
# The default target uses 'mode: development' to create a development copy.
# - Deployed resources get prefixed with '[dev my_user_name]'
# - Any job schedules and triggers are paused by default.
# See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
mode: development
default: true
workspace:
host: $DATABRICKS_URL
variables:
warehouse_id: f00dcafe
catalog: main
schema: ${workspace.current_user.short_name}
prod:
mode: production
workspace:
host: $DATABRICKS_URL
# We explicitly specify /Workspace/Users/$USERNAME to make sure we only have a single copy.
root_path: /Workspace/Users/$USERNAME/.bundle/${bundle.name}/${bundle.target}
variables:
warehouse_id: f00dcafe
catalog: main
schema: default
permissions:
- user_name: $USERNAME
level: CAN_MANAGE
run_as:
user_name: $USERNAME

View File

@ -0,0 +1,38 @@
# A job running SQL queries on a SQL warehouse
resources:
jobs:
my_default_sql_sql_job:
name: my_default_sql_sql_job
trigger:
# Run this job every day, exactly one day from the last run; see https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS
email_notifications:
on_failure:
- $USERNAME
parameters:
- name: catalog
default: ${var.catalog}
- name: schema
default: ${var.schema}
- name: bundle_target
default: ${bundle.target}
tasks:
- task_key: orders_raw
sql_task:
warehouse_id: ${var.warehouse_id}
file:
path: ../src/orders_raw.sql
- task_key: orders_daily
depends_on:
- task_key: orders_raw
sql_task:
warehouse_id: ${var.warehouse_id}
file:
path: ../src/orders_daily.sql

View File

@ -0,0 +1,4 @@
# scratch
This folder is reserved for personal, exploratory notebooks and SQL files.
By default these are not committed to Git, as 'scratch' is listed in .gitignore.

View File

@ -0,0 +1,35 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "<UUID>",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"%sql\n",
"SELECT * FROM json.`/databricks-datasets/nyctaxi/sample/json/`"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"dashboards": [],
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 2
},
"notebookName": "exploration",
"widgets": {}
}
},
"nbformat": 4,
"nbformat_minor": 0
}

View File

@ -0,0 +1,21 @@
-- This query is executed using Databricks Workflows (see resources/my_default_sql_sql.job.yml)
USE CATALOG {{catalog}};
USE IDENTIFIER({{schema}});
CREATE OR REPLACE MATERIALIZED VIEW
orders_daily
AS SELECT
order_date, count(*) AS number_of_orders
FROM
orders_raw
WHERE if(
{{bundle_target}} = "prod",
true,
-- During development, only process a smaller range of data
order_date >= '2019-08-01' AND order_date < '2019-09-01'
)
GROUP BY order_date

View File

@ -0,0 +1,19 @@
-- This query is executed using Databricks Workflows (see resources/my_default_sql_sql.job.yml)
--
-- The streaming table below ingests all JSON files in /databricks-datasets/retail-org/sales_orders/
-- See also https://docs.databricks.com/sql/language-manual/sql-ref-syntax-ddl-create-streaming-table.html
USE CATALOG {{catalog}};
USE IDENTIFIER({{schema}});
CREATE OR REFRESH STREAMING TABLE
orders_raw
AS SELECT
customer_name,
DATE(TIMESTAMP(FROM_UNIXTIME(TRY_CAST(order_datetime AS BIGINT)))) AS order_date,
order_number
FROM STREAM READ_FILES(
"/databricks-datasets/retail-org/sales_orders/",
format => "json",
header => true
)

View File

@ -0,0 +1,5 @@
trace $CLI bundle init default-sql --config-file ./input.json --output-dir output
cd output/my_default_sql
trace $CLI bundle validate -t dev
trace $CLI bundle validate -t prod

View File

@ -17,18 +17,20 @@ func UnifiedDiff(filename1, filename2, s1, s2 string) string {
return fmt.Sprint(gotextdiff.ToUnified(filename1, filename2, s1, edits)) return fmt.Sprint(gotextdiff.ToUnified(filename1, filename2, s1, edits))
} }
func AssertEqualTexts(t testutil.TestingT, filename1, filename2, expected, out string) { func AssertEqualTexts(t testutil.TestingT, filename1, filename2, expected, out string) bool {
t.Helper() t.Helper()
if len(out) < 1000 && len(expected) < 1000 { if len(out) < 1000 && len(expected) < 1000 {
// This shows full strings + diff which could be useful when debugging newlines // This shows full strings + diff which could be useful when debugging newlines
assert.Equal(t, expected, out, "%s vs %s", filename1, filename2) return assert.Equal(t, expected, out, "%s vs %s", filename1, filename2)
} else { } else {
// only show diff for large texts // only show diff for large texts
diff := UnifiedDiff(filename1, filename2, expected, out) diff := UnifiedDiff(filename1, filename2, expected, out)
if diff != "" { if diff != "" {
t.Errorf("Diff:\n" + diff) t.Errorf("Diff:\n" + diff)
return false
} }
} }
return true
} }
func AssertEqualJQ(t testutil.TestingT, expectedName, outName, expected, out string, ignorePaths []string) { func AssertEqualJQ(t testutil.TestingT, expectedName, outName, expected, out string, ignorePaths []string) {