From c1f5b3c8e738f73af74d2b12eb8ce246e41c7612 Mon Sep 17 00:00:00 2001 From: Lennart Kats Date: Sat, 2 Nov 2024 09:52:09 +0100 Subject: [PATCH] Use a helper notebook to set catalog/schema --- bundle/config/mutator/apply_presets.go | 147 +++++++++++------- .../databricks_template_schema.json | 34 +++- .../{{.project_name}}/databricks.yml.tmpl | 15 +- .../resources/{{.project_name}}.job.yml.tmpl | 8 - .../{{.project_name}}.pipeline.yml.tmpl | 7 - .../src/apply_defaults.ipynb.tmpl | 83 ++++++++++ .../{{.project_name}}/src/notebook.ipynb.tmpl | 23 +-- .../src/{{.project_name}}/main.py.tmpl | 27 ++-- 8 files changed, 239 insertions(+), 105 deletions(-) create mode 100644 libs/template/templates/default-python/template/{{.project_name}}/src/apply_defaults.ipynb.tmpl diff --git a/bundle/config/mutator/apply_presets.go b/bundle/config/mutator/apply_presets.go index 860d080d9..a9d475cfb 100644 --- a/bundle/config/mutator/apply_presets.go +++ b/bundle/config/mutator/apply_presets.go @@ -93,11 +93,13 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos task.DbtTask.Catalog = t.Catalog } if task.DbtTask.Schema == "" { - task.DbtTask.Schema = t.Catalog + task.DbtTask.Schema = t.Schema } } } - diags = diags.Extend(validateJobUsesCatalogAndSchema(b, key, j)) + + diags = diags.Extend(addCatalogSchemaParameters(b, key, j, t)) + diags = diags.Extend(validateCatalogSchemaUsage(b, key, j)) } } @@ -116,7 +118,7 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos if t.TriggerPauseStatus == config.Paused { p.Continuous = false } - if t.Catalog != "" && p.Catalog == "" { + if t.Catalog != "" && p.Catalog == "" && p.Catalog != "hive_metastore" { p.Catalog = t.Catalog } if t.Schema != "" && p.Target == "" { @@ -182,13 +184,16 @@ func (m *applyPresets) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnos } e.Name = normalizePrefix(prefix) + e.Name - TODO: - - e.AiGateway.InferenceTableConfig.CatalogName - - e.AiGateway.InferenceTableConfig.SchemaName - - e.Config.AutoCaptureConfig.SchemaName - - e.Config.AutoCaptureConfig.CatalogName - - e.Config.ServedEntities[0].EntityName (__catalog_name__.__schema_name__.__model_name__.) - - e.Config.ServedModels[0].ModelName (__catalog_name__.__schema_name__.__model_name__.) + if t.Catalog != "" || t.Schema != "" { + // TODO: + // - e.AiGateway.InferenceTableConfig.CatalogName + // - e.AiGateway.InferenceTableConfig.SchemaName + // - e.Config.AutoCaptureConfig.SchemaName + // - e.Config.AutoCaptureConfig.CatalogName + // - e.Config.ServedEntities[0].EntityName (__catalog_name__.__schema_name__.__model_name__.) + // - e.Config.ServedModels[0].ModelName (__catalog_name__.__schema_name__.__model_name__.) + diags = diags.Extend(diag.Errorf("model serving endpoints are not supported with catalog/schema presets")) + } } @@ -306,57 +311,32 @@ func validateCatalogAndSchema(b *bundle.Bundle) diag.Diagnostics { } return nil } +func validateCatalogSchemaUsage(b *bundle.Bundle, key string, job *resources.Job) diag.Diagnostics { + for _, t := range job.Tasks { + if t.NotebookTask != nil { + // TODO: proper validation that warns + return diag.Diagnostics{{ + Summary: fmt.Sprintf("job %s must read catalog and schema from parameters for notebook %s", + key, t.NotebookTask.NotebookPath), + Severity: diag.Recommendation, + Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)}, + }} + } + } + + // TODO: validate that any notebooks actually read the catalog/schema arguments + // if ... { + // return diag.Diagnostics{{ + // Summary: fmt.Sprintf("job %s must pass catalog and schema presets as parameters as follows:\n"+ + // " ...", key), + // Severity: diag.Error, + // Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)}, + // }} + // } -func validateJobUsesCatalogAndSchema(b *bundle.Bundle, key string, job *resources.Job) diag.Diagnostics { - if !hasTasksRequiringParameters(job) { - return nil - } - if !hasParameter(job, "catalog") || !hasParameter(job, "schema") { - return diag.Diagnostics{{ - Summary: fmt.Sprintf("job %s must pass catalog and schema presets as parameters as follows:\n"+ - " parameters:\n"+ - " - name: catalog:\n"+ - " default: ${presets.catalog}\n"+ - " - name: schema\n"+ - " default: ${presets.schema}\n", key), - Severity: diag.Error, - Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)}, - }} - } return nil } -// hasTasksRequiringParameters determines if there is a task in this job that -// requires the 'catalog' and 'schema' parameters when they are enabled in presets. -func hasTasksRequiringParameters(job *resources.Job) bool { - for _, task := range job.Tasks { - // Allowlisted task types: these don't require catalog / schema to be passed as a paramater - if task.DbtTask != nil || task.ConditionTask != nil || task.RunJobTask != nil || task.ForEachTask != nil || task.PipelineTask != nil { - continue - } - // Alert tasks, query object tasks, etc. don't require a parameter; - // the catalog / schema is set inside those objects instead. - if task.SqlTask != nil && task.SqlTask.File == nil { - continue - } - return true - } - return false -} - -// hasParameter determines if a job has a parameter with the given name. -func hasParameter(job *resources.Job, name string) bool { - if job.Parameters == nil { - return false - } - for _, p := range job.Parameters { - if p.Name == name { - return true - } - } - return false -} - // toTagArray converts a map of tags to an array of tags. // We sort tags so ensure stable ordering. func toTagArray(tags map[string]string) []Tag { @@ -388,3 +368,56 @@ func normalizePrefix(prefix string) string { return textutil.NormalizeString(prefix) + suffix } + +// addCatalogSchemaParameters adds catalog and schema parameters to a job if they don't already exist. +// Returns any warning diagnostics for existing parameters. +func addCatalogSchemaParameters(b *bundle.Bundle, key string, job *resources.Job, t config.Presets) diag.Diagnostics { + var diags diag.Diagnostics + + // Check for existing catalog/schema parameters + hasCatalog := false + hasSchema := false + if job.Parameters != nil { + for _, param := range job.Parameters { + if param.Name == "catalog" { + hasCatalog = true + diags = diags.Extend(diag.Diagnostics{{ + Summary: fmt.Sprintf("job %s already has 'catalog' parameter defined; ignoring preset value", key), + Severity: diag.Warning, + Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)}, + }}) + } + if param.Name == "schema" { + hasSchema = true + diags = diags.Extend(diag.Diagnostics{{ + Summary: fmt.Sprintf("job %s already has 'schema' parameter defined; ignoring preset value", key), + Severity: diag.Warning, + Locations: []dyn.Location{b.Config.GetLocation("resources.jobs." + key)}, + }}) + } + } + } + + // Initialize parameters if nil + if job.Parameters == nil { + job.Parameters = []jobs.JobParameterDefinition{} + } + + // Add catalog parameter if not already present + if !hasCatalog && t.Catalog != "" { + job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{ + Name: "catalog", + Default: t.Catalog, + }) + } + + // Add schema parameter if not already present + if !hasSchema && t.Schema != "" { + job.Parameters = append(job.Parameters, jobs.JobParameterDefinition{ + Name: "schema", + Default: t.Schema, + }) + } + + return diags +} diff --git a/libs/template/templates/default-python/databricks_template_schema.json b/libs/template/templates/default-python/databricks_template_schema.json index d53bad91a..711576247 100644 --- a/libs/template/templates/default-python/databricks_template_schema.json +++ b/libs/template/templates/default-python/databricks_template_schema.json @@ -29,7 +29,39 @@ "enum": ["yes", "no"], "description": "Include a stub (sample) Python package in '{{.project_name}}{{path_separator}}src'", "order": 4 + }, + "default_catalog": { + "type": "string", + "default": "{{default_catalog}}", + "pattern": "^\\w*$", + "pattern_match_failure_message": "Invalid catalog name.", + "description": "\nPlease provide an initial catalog{{if eq (default_catalog) \"\"}} (leave blank when not using Unity Catalog){{end}}.\ndefault_catalog", + "order": 5 + }, + "personal_schemas": { + "type": "string", + "description": "\nWould you like to use a personal schema for each user working on this project? (e.g., 'catalog.{{short_name}}')\npersonal_schemas", + "enum": [ + "yes, use a schema based on the current user name during development", + "no, use a shared schema during development" + ], + "order": 6 + }, + "shared_schema": { + "skip_prompt_if": { + "properties": { + "personal_schemas": { + "const": "yes, use a schema based on the current user name during development" + } + } + }, + "type": "string", + "default": "default", + "pattern": "^\\w+$", + "pattern_match_failure_message": "Invalid schema name.", + "description": "\nPlease provide an initial schema during development.\ndefault_schema", + "order": 7 } }, - "success_message": "Workspace to use (auto-detected, edit in '{{.project_name}}/databricks.yml'): {{workspace_host}}\n\n✨ Your new project has been created in the '{{.project_name}}' directory!\n\nPlease refer to the README.md file for \"getting started\" instructions.\nSee also the documentation at https://docs.databricks.com/dev-tools/bundles/index.html." + "success_message": "\nWorkspace to use (auto-detected, edit in '{{.project_name}}/databricks.yml').\nworkspace_host: {{workspace_host}}\n\n✨ Your new project has been created in the '{{.project_name}}' directory!\n\nPlease refer to the README.md file for \"getting started\" instructions.\nSee also the documentation at https://docs.databricks.com/dev-tools/bundles/index.html." } diff --git a/libs/template/templates/default-python/template/{{.project_name}}/databricks.yml.tmpl b/libs/template/templates/default-python/template/{{.project_name}}/databricks.yml.tmpl index 13e2e6c9c..2a0bdee09 100644 --- a/libs/template/templates/default-python/template/{{.project_name}}/databricks.yml.tmpl +++ b/libs/template/templates/default-python/template/{{.project_name}}/databricks.yml.tmpl @@ -6,6 +6,13 @@ bundle: include: - resources/*.yml +{{- $dev_schema := .shared_schema }} +{{- $prod_schema := .shared_schema }} +{{- if (regexp "^yes").MatchString .personal_schemas}} + {{- $dev_schema = "${workspace.current_user.short_name}"}} + {{- $prod_schema = "default"}} +{{- end}} + targets: dev: # The default target uses 'mode: development' to create a development copy. @@ -17,8 +24,8 @@ targets: workspace: host: {{workspace_host}} presets: - catalog: {{default_catalog}} - schema: default + catalog: {{.default_catalog}} + schema: {{$dev_schema}} prod: mode: production @@ -27,8 +34,8 @@ targets: # We explicitly specify /Workspace/Users/{{user_name}} to make sure we only have a single copy. root_path: /Workspace/Users/{{user_name}}/.bundle/${bundle.name}/${bundle.target} presets: - catalog: {{default_catalog}} - schema: default + catalog: {{.default_catalog}} + schema: {{$prod_schema}} permissions: - {{if is_service_principal}}service_principal{{else}}user{{end}}_name: {{user_name}} level: CAN_MANAGE diff --git a/libs/template/templates/default-python/template/{{.project_name}}/resources/{{.project_name}}.job.yml.tmpl b/libs/template/templates/default-python/template/{{.project_name}}/resources/{{.project_name}}.job.yml.tmpl index 17b209a11..0ea69a75a 100644 --- a/libs/template/templates/default-python/template/{{.project_name}}/resources/{{.project_name}}.job.yml.tmpl +++ b/libs/template/templates/default-python/template/{{.project_name}}/resources/{{.project_name}}.job.yml.tmpl @@ -10,14 +10,6 @@ resources: {{.project_name}}_job: name: {{.project_name}}_job - {{if or (eq .include_notebook "yes") (eq .include_python "yes") -}} - parameters: - - name: catalog - default: ${presets.catalog} - - name: schema - default: ${presets.schema} - - {{end -}} trigger: # Run this job every day, exactly one day from the last run; see https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: diff --git a/libs/template/templates/default-python/template/{{.project_name}}/resources/{{.project_name}}.pipeline.yml.tmpl b/libs/template/templates/default-python/template/{{.project_name}}/resources/{{.project_name}}.pipeline.yml.tmpl index 1c6b8607e..c3f94cb1c 100644 --- a/libs/template/templates/default-python/template/{{.project_name}}/resources/{{.project_name}}.pipeline.yml.tmpl +++ b/libs/template/templates/default-python/template/{{.project_name}}/resources/{{.project_name}}.pipeline.yml.tmpl @@ -3,13 +3,6 @@ resources: pipelines: {{.project_name}}_pipeline: name: {{.project_name}}_pipeline - {{- if or (eq default_catalog "") (eq default_catalog "hive_metastore")}} - ## Specify the 'catalog' field to configure this pipeline to make use of Unity Catalog: - # catalog: catalog_name - {{- else}} - catalog: {{default_catalog}} - {{- end}} - target: {{.project_name}}_${bundle.environment} libraries: - notebook: path: ../src/dlt_pipeline.ipynb diff --git a/libs/template/templates/default-python/template/{{.project_name}}/src/apply_defaults.ipynb.tmpl b/libs/template/templates/default-python/template/{{.project_name}}/src/apply_defaults.ipynb.tmpl new file mode 100644 index 000000000..a68e7d822 --- /dev/null +++ b/libs/template/templates/default-python/template/{{.project_name}}/src/apply_defaults.ipynb.tmpl @@ -0,0 +1,83 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "9a626959-61c8-4bba-84d2-2a4ecab1f7ec", + "showTitle": false, + "title": "" + } + }, + "source": [ + "# helper notebook: apply_defaults\n", + "\n", + "This helper notebook is used to create widgets that configure the default catalog\n", + "and schema.\n", + "\n", + "Usage:\n", + "\n", + "```\n", + "% run ../relative/path/to/apply_defaults\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 0, + "metadata": { + "application/vnd.databricks.v1+cell": { + "cellMetadata": {}, + "inputWidgets": {}, + "nuid": "9198e987-5606-403d-9f6d-8f14e6a4017f", + "showTitle": false, + "title": "" + } + }, + "outputs": [], + "source": [ + "# Load default catalog and schema as widget and set their values as the default catalog / schema\n", + "dbutils.widgets.text('catalog', '{{.default_catalog}}')\n", + "dbutils.widgets.text('schema', 'default')\n", + "catalog = dbutils.widgets.get('catalog')\n", + "schema = dbutils.widgets.get('schema')\n", + "spark.sql(f'USE {catalog}.{schema}')" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# Automatically reload imported modules when they change\n", + "%load_ext autoreload\n", + "%autoreload 2" + ] + } + ], + "metadata": { + "application/vnd.databricks.v1+notebook": { + "dashboards": [], + "language": "python", + "notebookMetadata": { + "pythonIndentUnit": 2 + }, + "notebookName": "dlt_pipeline", + "widgets": {} + }, + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} diff --git a/libs/template/templates/default-python/template/{{.project_name}}/src/notebook.ipynb.tmpl b/libs/template/templates/default-python/template/{{.project_name}}/src/notebook.ipynb.tmpl index 4163389b3..95f6b16a3 100644 --- a/libs/template/templates/default-python/template/{{.project_name}}/src/notebook.ipynb.tmpl +++ b/libs/template/templates/default-python/template/{{.project_name}}/src/notebook.ipynb.tmpl @@ -23,23 +23,8 @@ "metadata": {}, "outputs": [], "source": [ - "# Automatically reload this notebook when it is edited\n", - "%load_ext autoreload\n", - "%autoreload 2" - ] - }, - { - "cell_type": "code", - "execution_count": 2, - "metadata": {}, - "outputs": [], - "source": [ - "# Set the catalog and schema for the current session\n", - "dbutils.widgets.text('catalog', '{{default_catalog}}')\n", - "dbutils.widgets.text('schema', 'default')\n", - "catalog = dbutils.widgets.get('catalog')\n", - "schema = dbutils.widgets.get('schema')\n", - "spark.sql(f'USE {catalog}.{schema}')" + "# Load default catalog and schema as widget and set their values as the default catalog / schema\n", + "%run ./apply_defaults" ] }, { @@ -62,9 +47,9 @@ {{- if (eq .include_python "yes") }} "from {{.project_name}} import main\n", "\n", - "main.get_taxis(spark).show(10)" + "main.create_example_table()" {{else}} - "spark.range(10)" + "spark.sql("CREATE OR REPLACE TABLE example AS SELECT 'example table' AS text_column")" {{end -}} ] } diff --git a/libs/template/templates/default-python/template/{{.project_name}}/src/{{.project_name}}/main.py.tmpl b/libs/template/templates/default-python/template/{{.project_name}}/src/{{.project_name}}/main.py.tmpl index 1ac627a0c..80d447f6c 100644 --- a/libs/template/templates/default-python/template/{{.project_name}}/src/{{.project_name}}/main.py.tmpl +++ b/libs/template/templates/default-python/template/{{.project_name}}/src/{{.project_name}}/main.py.tmpl @@ -1,21 +1,30 @@ from pyspark.sql import SparkSession, DataFrame -def get_taxis(spark: SparkSession) -> DataFrame: - return spark.read.table("samples.nyctaxi.trips") - - -# Create a new Databricks Connect session. If this fails, -# check that you have configured Databricks Connect correctly. -# See https://docs.databricks.com/dev-tools/databricks-connect.html. def get_spark() -> SparkSession: + """ + Create a new Databricks Connect session. If this fails, + check that you have configured Databricks Connect correctly. + See https://docs.databricks.com/dev-tools/databricks-connect.html. + """ try: from databricks.connect import DatabricksSession return DatabricksSession.builder.getOrCreate() except ImportError: return SparkSession.builder.getOrCreate() +def get_taxis(spark: SparkSession) -> DataFrame: + return spark.read.table("samples.nyctaxi.trips") + +def create_example_table(): + """ + Create a table called 'example' in the default catalog and schema. + """ + get_spark().sql("CREATE OR REPLACE TABLE example AS SELECT 'example table' AS text_column") + def main(): - # Set the catalog and schema for the current session + # Set the catalog and schema for the current session. + # In the default template, these parameters are set + # using the 'catalog' and 'schema' presets in databricks.yml. parser = argparse.ArgumentParser() parser.add_argument('--catalog', '-c', required=True) parser.add_argument('--schema', '-s', required=True) @@ -23,7 +32,7 @@ def main(): spark = get_spark() spark.sql(f"USE {args.catalog}.{args.schema}") - get_taxis(spark).show(5) + create_example_table() if __name__ == '__main__': main()