Use a helper notebook to set catalog/schema

This commit is contained in:
Lennart Kats 2024-11-02 09:52:09 +01:00
parent 480d308a80
commit c1f5b3c8e7
No known key found for this signature in database
GPG Key ID: 1EB8B57673197023
8 changed files with 239 additions and 105 deletions

View File

@ -93,11 +93,13 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
task.DbtTask.Catalog = t.Catalog
}
if task.DbtTask.Schema == "" {
task.DbtTask.Schema = t.Catalog
task.DbtTask.Schema = t.Schema
}
}
}
diags = diags.Extend(validateJobUsesCatalogAndSchema(b, key, j))
diags = diags.Extend(addCatalogSchemaParameters(b, key, j, t))
diags = diags.Extend(validateCatalogSchemaUsage(b, key, j))
}
}
@ -116,7 +118,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
if t.TriggerPauseStatus == config.Paused {
p.Continuous = false
}
if t.Catalog != "" && p.Catalog == "" {
if t.Catalog != "" && p.Catalog == "" && p.Catalog != "hive_metastore" {
p.Catalog = t.Catalog
}
if t.Schema != "" && p.Target == "" {
@ -182,13 +184,16 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos
}
e.Name = normalizePrefix(prefix) + e.Name
TODO:
- e.AiGateway.InferenceTableConfig.CatalogName
- e.AiGateway.InferenceTableConfig.SchemaName
- e.Config.AutoCaptureConfig.SchemaName
- e.Config.AutoCaptureConfig.CatalogName
- e.Config.ServedEntities[0].EntityName (__catalog_name__.__schema_name__.__model_name__.)
- e.Config.ServedModels[0].ModelName (__catalog_name__.__schema_name__.__model_name__.)
if t.Catalog != "" || t.Schema != "" {
// TODO:
// - e.AiGateway.InferenceTableConfig.CatalogName
// - e.AiGateway.InferenceTableConfig.SchemaName
// - e.Config.AutoCaptureConfig.SchemaName
// - e.Config.AutoCaptureConfig.CatalogName
// - e.Config.ServedEntities[0].EntityName (__catalog_name__.__schema_name__.__model_name__.)
// - e.Config.ServedModels[0].ModelName (__catalog_name__.__schema_name__.__model_name__.)
diags = diags.Extend(diag.Errorf("model serving endpoints are not supported with catalog/schema presets"))
}
}
@ -306,57 +311,32 @@ func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics {
}
return nil
}
func validateCatalogSchemaUsage(b *bundle.Bundle, key string, job *resources.Job) diag.Diagnostics {
for _, t := range job.Tasks {
if t.NotebookTask != nil {
// TODO: proper validation that warns
return diag.Diagnostics{{
Summary: fmt.Sprintf("job %s must read catalog and schema from parameters for notebook %s",
key, t.NotebookTask.NotebookPath),
Severity: diag.Recommendation,
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
}}
}
}
// TODO: validate that any notebooks actually read the catalog/schema arguments
// if ... {
// return diag.Diagnostics{{
// Summary: fmt.Sprintf("job %s must pass catalog and schema presets as parameters as follows:\n"+
// " ...", key),
// Severity: diag.Error,
// Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
// }}
// }
func validateJobUsesCatalogAndSchema(b *bundle.Bundle, key string, job *resources.Job) diag.Diagnostics {
if !hasTasksRequiringParameters(job) {
return nil
}
if !hasParameter(job, "catalog") || !hasParameter(job, "schema") {
return diag.Diagnostics{{
Summary: fmt.Sprintf("job %s must pass catalog and schema presets as parameters as follows:\n"+
" parameters:\n"+
" - name: catalog:\n"+
" default: ${presets.catalog}\n"+
" - name: schema\n"+
" default: ${presets.schema}\n", key),
Severity: diag.Error,
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
}}
}
return nil
}
// hasTasksRequiringParameters determines if there is a task in this job that
// requires the 'catalog' and 'schema' parameters when they are enabled in presets.
func hasTasksRequiringParameters(job *resources.Job) bool {
for _, task := range job.Tasks {
// Allowlisted task types: these don't require catalog / schema to be passed as a paramater
if task.DbtTask != nil || task.ConditionTask != nil || task.RunJobTask != nil || task.ForEachTask != nil || task.PipelineTask != nil {
continue
}
// Alert tasks, query object tasks, etc. don't require a parameter;
// the catalog / schema is set inside those objects instead.
if task.SqlTask != nil && task.SqlTask.File == nil {
continue
}
return true
}
return false
}
// hasParameter determines if a job has a parameter with the given name.
func hasParameter(job *resources.Job, name string) bool {
if job.Parameters == nil {
return false
}
for _, p := range job.Parameters {
if p.Name == name {
return true
}
}
return false
}
// toTagArray converts a map of tags to an array of tags.
// We sort tags so ensure stable ordering.
func toTagArray(tags map[string]string) []Tag {
@ -388,3 +368,56 @@ func normalizePrefix(prefix string) string {
return textutil.NormalizeString(prefix) + suffix
}
// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist.
// Returns any warning diagnostics for existing parameters.
func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, t config.Presets) diag.Diagnostics {
var diags diag.Diagnostics
// Check for existing catalog/schema parameters
hasCatalog := false
hasSchema := false
if job.Parameters != nil {
for _, param := range job.Parameters {
if param.Name == "catalog" {
hasCatalog = true
diags = diags.Extend(diag.Diagnostics{{
Summary: fmt.Sprintf("job %s already has 'catalog' parameter defined; ignoring preset value", key),
Severity: diag.Warning,
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
}})
}
if param.Name == "schema" {
hasSchema = true
diags = diags.Extend(diag.Diagnostics{{
Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key),
Severity: diag.Warning,
Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)},
}})
}
}
}
// Initialize parameters if nil
if job.Parameters == nil {
job.Parameters = []jobs.JobParameterDefinition{}
}
// Add catalog parameter if not already present
if !hasCatalog && t.Catalog != "" {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "catalog",
Default: t.Catalog,
})
}
// Add schema parameter if not already present
if !hasSchema && t.Schema != "" {
job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{
Name: "schema",
Default: t.Schema,
})
}
return diags
}

View File

@ -29,7 +29,39 @@
"enum": ["yes", "no"],
"description": "Include a stub (sample) Python package in '{{.project_name}}{{path_separator}}src'",
"order": 4
},
"default_catalog": {
"type": "string",
"default": "{{default_catalog}}",
"pattern": "^\\w*$",
"pattern_match_failure_message": "Invalid catalog name.",
"description": "\nPlease provide an initial catalog{{if eq (default_catalog) \"\"}} (leave blank when not using Unity Catalog){{end}}.\ndefault_catalog",
"order": 5
},
"personal_schemas": {
"type": "string",
"description": "\nWould you like to use a personal schema for each user working on this project? (e.g., 'catalog.{{short_name}}')\npersonal_schemas",
"enum": [
"yes, use a schema based on the current user name during development",
"no, use a shared schema during development"
],
"order": 6
},
"shared_schema": {
"skip_prompt_if": {
"properties": {
"personal_schemas": {
"const": "yes, use a schema based on the current user name during development"
}
}
},
"type": "string",
"default": "default",
"pattern": "^\\w+$",
"pattern_match_failure_message": "Invalid schema name.",
"description": "\nPlease provide an initial schema during development.\ndefault_schema",
"order": 7
}
},
"success_message": "Workspace to use (auto-detected, edit in '{{.project_name}}/databricks.yml'): {{workspace_host}}\n\n✨ Your new project has been created in the '{{.project_name}}' directory!\n\nPlease refer to the README.md file for \"getting started\" instructions.\nSee also the documentation at https://docs.databricks.com/dev-tools/bundles/index.html."
"success_message": "\nWorkspace to use (auto-detected, edit in '{{.project_name}}/databricks.yml').\nworkspace_host: {{workspace_host}}\n\n✨ Your new project has been created in the '{{.project_name}}' directory!\n\nPlease refer to the README.md file for \"getting started\" instructions.\nSee also the documentation at https://docs.databricks.com/dev-tools/bundles/index.html."
}

View File

@ -6,6 +6,13 @@ bundle:
include:
- resources/*.yml
{{- $dev_schema := .shared_schema }}
{{- $prod_schema := .shared_schema }}
{{- if (regexp "^yes").MatchString .personal_schemas}}
{{- $dev_schema = "${workspace.current_user.short_name}"}}
{{- $prod_schema = "default"}}
{{- end}}
targets:
dev:
# The default target uses 'mode: development' to create a development copy.
@ -17,8 +24,8 @@ targets:
workspace:
host: {{workspace_host}}
presets:
catalog: {{default_catalog}}
schema: default
catalog: {{.default_catalog}}
schema: {{$dev_schema}}
prod:
mode: production
@ -27,8 +34,8 @@ targets:
# We explicitly specify /Workspace/Users/{{user_name}} to make sure we only have a single copy.
root_path: /Workspace/Users/{{user_name}}/.bundle/${bundle.name}/${bundle.target}
presets:
catalog: {{default_catalog}}
schema: default
catalog: {{.default_catalog}}
schema: {{$prod_schema}}
permissions:
- {{if is_service_principal}}service_principal{{else}}user{{end}}_name: {{user_name}}
level: CAN_MANAGE

View File

@ -10,14 +10,6 @@ resources:
{{.project_name}}_job:
name: {{.project_name}}_job
{{if or (eq .include_notebook "yes") (eq .include_python "yes") -}}
parameters:
- name: catalog
default: ${presets.catalog}
- name: schema
default: ${presets.schema}
{{end -}}
trigger:
# Run this job every day, exactly one day from the last run; see https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:

View File

@ -3,13 +3,6 @@ resources:
pipelines:
{{.project_name}}_pipeline:
name: {{.project_name}}_pipeline
{{- if or (eq default_catalog "") (eq default_catalog "hive_metastore")}}
## Specify the 'catalog' field to configure this pipeline to make use of Unity Catalog:
# catalog: catalog_name
{{- else}}
catalog: {{default_catalog}}
{{- end}}
target: {{.project_name}}_${bundle.environment}
libraries:
- notebook:
path: ../src/dlt_pipeline.ipynb

View File

@ -0,0 +1,83 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "9a626959-61c8-4bba-84d2-2a4ecab1f7ec",
"showTitle": false,
"title": ""
}
},
"source": [
"# helper notebook: apply_defaults\n",
"\n",
"This helper notebook is used to create widgets that configure the default catalog\n",
"and schema.\n",
"\n",
"Usage:\n",
"\n",
"```\n",
"% run ../relative/path/to/apply_defaults\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 0,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {},
"inputWidgets": {},
"nuid": "9198e987-5606-403d-9f6d-8f14e6a4017f",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"# Load default catalog and schema as widget and set their values as the default catalog / schema\n",
"dbutils.widgets.text('catalog', '{{.default_catalog}}')\n",
"dbutils.widgets.text('schema', 'default')\n",
"catalog = dbutils.widgets.get('catalog')\n",
"schema = dbutils.widgets.get('schema')\n",
"spark.sql(f'USE {catalog}.{schema}')"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Automatically reload imported modules when they change\n",
"%load_ext autoreload\n",
"%autoreload 2"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"dashboards": [],
"language": "python",
"notebookMetadata": {
"pythonIndentUnit": 2
},
"notebookName": "dlt_pipeline",
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.4"
}
},
"nbformat": 4,
"nbformat_minor": 0
}

View File

@ -23,23 +23,8 @@
"metadata": {},
"outputs": [],
"source": [
"# Automatically reload this notebook when it is edited\n",
"%load_ext autoreload\n",
"%autoreload 2"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# Set the catalog and schema for the current session\n",
"dbutils.widgets.text('catalog', '{{default_catalog}}')\n",
"dbutils.widgets.text('schema', 'default')\n",
"catalog = dbutils.widgets.get('catalog')\n",
"schema = dbutils.widgets.get('schema')\n",
"spark.sql(f'USE {catalog}.{schema}')"
"# Load default catalog and schema as widget and set their values as the default catalog / schema\n",
"%run ./apply_defaults"
]
},
{
@ -62,9 +47,9 @@
{{- if (eq .include_python "yes") }}
"from {{.project_name}} import main\n",
"\n",
"main.get_taxis(spark).show(10)"
"main.create_example_table()"
{{else}}
"spark.range(10)"
"spark.sql("CREATE OR REPLACE TABLE example AS SELECT 'example table' AS text_column")"
{{end -}}
]
}

View File

@ -1,21 +1,30 @@
from pyspark.sql import SparkSession, DataFrame
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
# Create a new Databricks Connect session. If this fails,
# check that you have configured Databricks Connect correctly.
# See https://docs.databricks.com/dev-tools/databricks-connect.html.
def get_spark() -> SparkSession:
"""
Create a new Databricks Connect session. If this fails,
check that you have configured Databricks Connect correctly.
See https://docs.databricks.com/dev-tools/databricks-connect.html.
"""
try:
from databricks.connect import DatabricksSession
return DatabricksSession.builder.getOrCreate()
except ImportError:
return SparkSession.builder.getOrCreate()
def get_taxis(spark: SparkSession) -> DataFrame:
return spark.read.table("samples.nyctaxi.trips")
def create_example_table():
"""
Create a table called 'example' in the default catalog and schema.
"""
get_spark().sql("CREATE OR REPLACE TABLE example AS SELECT 'example table' AS text_column")
def main():
# Set the catalog and schema for the current session
# Set the catalog and schema for the current session.
# In the default template, these parameters are set
# using the 'catalog' and 'schema' presets in databricks.yml.
parser = argparse.ArgumentParser()
parser.add_argument('--catalog', '-c', required=True)
parser.add_argument('--schema', '-s', required=True)
@ -23,7 +32,7 @@ def main():
spark = get_spark()
spark.sql(f"USE {args.catalog}.{args.schema}")
get_taxis(spark).show(5)
create_example_table()
if __name__ == '__main__':
main()