fix silently

This commit is contained in:
Shreyas Goenka 2024-12-27 19:02:08 +05:30
parent 32ebc5cbb6
commit 893118e289
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
5 changed files with 340 additions and 364 deletions

View File

@ -0,0 +1,80 @@
package mutator
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/libs/diag"
)
type resolveSchemeDependency struct{}
// If a user defines a UC schema in the bundle, they can refer to it in DLT pipelines
// or UC Volumes using the `${resources.schemas.<schema_key>.name}` syntax. Using this
// syntax allows TF to capture the deploy time dependency this DLT pipeline or UC Volume
// has on the schema and deploy changes to the schema before deploying the pipeline or volume.
//
// This mutator translates any implicit schema references in DLT pipelines or UC Volumes
// to the explicit syntax.
func ResolveSchemaDependency() bundle.Mutator {
return &resolveSchemeDependency{}
}
func (m *resolveSchemeDependency) Name() string {
return "ResolveSchemaDependency"
}
func findSchema(b *bundle.Bundle, catalogName, name string) (string, *resources.Schema) {
if catalogName == "" || name == "" {
return "", nil
}
for k, s := range b.Config.Resources.Schemas {
if s.CatalogName == catalogName && s.Name == name {
return k, s
}
}
return "", nil
}
func resolveVolume(v *resources.Volume, b *bundle.Bundle) {
schemaK, schema := findSchema(b, v.CatalogName, v.SchemaName)
if schema == nil {
return
}
v.SchemaName = fmt.Sprintf("${resources.schemas.%s.name}", schemaK)
}
func resolvePipeline(p *resources.Pipeline, b *bundle.Bundle) {
// schema and target have the same semantics in the DLT API but are mutually
// exclusive. If schema is set, the pipeline is in direct publishing mode
// and can write tables to multiple schemas (vs target which is a single schema).
schemaName := p.Schema
if schemaName == "" {
schemaName = p.Target
}
schemaK, schema := findSchema(b, p.Catalog, schemaName)
if schema == nil {
return
}
if p.Schema != "" {
p.Schema = fmt.Sprintf("${resources.schemas.%s.name}", schemaK)
} else if p.Target != "" {
p.Target = fmt.Sprintf("${resources.schemas.%s.name}", schemaK)
}
}
func (m *resolveSchemeDependency) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
for _, p := range b.Config.Resources.Pipelines {
resolvePipeline(p, b)
}
for _, v := range b.Config.Resources.Volumes {
resolveVolume(v, b)
}
return nil
}

View File

@ -0,0 +1,259 @@
package mutator
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestResolveSchemaDependencyForVolume(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
},
Volumes: map[string]*resources.Volume{
"volume1": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "foobar",
},
},
"volume2": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog2",
SchemaName: "foobar",
},
},
"volume3": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "barfoo",
},
},
"volume4": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalogX",
SchemaName: "foobar",
},
},
"volume5": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
CatalogName: "catalog1",
SchemaName: "schemaX",
},
},
},
},
},
}
d := bundle.Apply(context.Background(), b, ResolveSchemaDependency())
require.Nil(t, d)
assert.Equal(t, b.Config.Resources.Volumes["volume1"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema1.name}")
assert.Equal(t, b.Config.Resources.Volumes["volume2"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema2.name}")
assert.Equal(t, b.Config.Resources.Volumes["volume3"].CreateVolumeRequestContent.SchemaName, "${resources.schemas.schema3.name}")
assert.Equal(t, b.Config.Resources.Volumes["volume4"].CreateVolumeRequestContent.SchemaName, "foobar")
assert.Equal(t, b.Config.Resources.Volumes["volume5"].CreateVolumeRequestContent.SchemaName, "schemaX")
}
func TestResolveSchemaDependencyForPipelinesWithTarget(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "foobar",
},
},
"pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog2",
Schema: "foobar",
},
},
"pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "barfoo",
},
},
"pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalogX",
Schema: "foobar",
},
},
"pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Schema: "schemaX",
},
},
"pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Schema: "foobar",
},
},
"pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Schema: "",
Name: "whatever",
},
},
},
},
},
}
d := bundle.Apply(context.Background(), b, ResolveSchemaDependency())
require.Nil(t, d)
assert.Equal(t, b.Config.Resources.Pipelines["pipeline1"].Schema, "${resources.schemas.schema1.name}")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline2"].Schema, "${resources.schemas.schema2.name}")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline3"].Schema, "${resources.schemas.schema3.name}")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline4"].Schema, "foobar")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline5"].Schema, "schemaX")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline6"].Schema, "foobar")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline7"].Schema, "")
for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Target)
}
}
func TestResolveSchemaDependencyForPipelinesWithSchema(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"schema1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "foobar",
},
},
"schema2": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog2",
Name: "foobar",
},
},
"schema3": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "catalog1",
Name: "barfoo",
},
},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "foobar",
},
},
"pipeline2": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog2",
Target: "foobar",
},
},
"pipeline3": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "barfoo",
},
},
"pipeline4": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalogX",
Target: "foobar",
},
},
"pipeline5": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "catalog1",
Target: "schemaX",
},
},
"pipeline6": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Target: "foobar",
},
},
"pipeline7": {
PipelineSpec: &pipelines.PipelineSpec{
Catalog: "",
Target: "",
Name: "whatever",
},
},
},
},
},
}
d := bundle.Apply(context.Background(), b, ResolveSchemaDependency())
require.Nil(t, d)
assert.Equal(t, b.Config.Resources.Pipelines["pipeline1"].Target, "${resources.schemas.schema1.name}")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline2"].Target, "${resources.schemas.schema2.name}")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline3"].Target, "${resources.schemas.schema3.name}")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline4"].Target, "foobar")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline5"].Target, "schemaX")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline6"].Target, "foobar")
assert.Equal(t, b.Config.Resources.Pipelines["pipeline7"].Target, "")
for _, k := range []string{"pipeline1", "pipeline2", "pipeline3", "pipeline4", "pipeline5", "pipeline6", "pipeline7"} {
assert.Empty(t, b.Config.Resources.Pipelines[k].Schema)
}
}

View File

@ -1,136 +0,0 @@
package validate
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
)
// Validate that any references to UC schemas defined in the DAB use the ${resources.schemas...}
// syntax to capture the deploy time dependency.
func SchemaReferences() bundle.ReadOnlyMutator {
return &schemaReferences{}
}
type schemaReferences struct{}
func (v *schemaReferences) Name() string {
return "validate:schema_dependency"
}
func findSchemaInBundle(rb bundle.ReadOnlyBundle, catalogName, schemaName string) ([]dyn.Location, dyn.Path, bool) {
for k, s := range rb.Config().Resources.Schemas {
if s.CatalogName != catalogName || s.Name != schemaName {
continue
}
return rb.Config().GetLocations("resources.schemas." + k), dyn.NewPath(dyn.Key("resources"), dyn.Key("schemas"), dyn.Key(k)), true
}
return nil, nil, false
}
func (v *schemaReferences) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{}
for k, p := range rb.Config().Resources.Pipelines {
// Skip if the pipeline uses hive metastore. The DLT API allows creating
// a pipeline without a schema or target when using hive metastore.
if p.Catalog == "" {
continue
}
schemaName := ""
fieldPath := dyn.Path{}
schemaLocation := []dyn.Location{}
switch {
case p.Schema == "" && p.Target == "":
// The error message is identical to the one DLT backend returns when
// a schema is not defined for a UC DLT pipeline (date: 20 Dec 2024).
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: "Unity Catalog pipeline should have a schema or target defined",
Detail: `The target or schema field is required for UC pipelines. Reason: DLT
requires specifying a target schema for UC pipelines. Please use the
TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING
TABLE statement if you do not wish to publish your dataset.`,
Locations: rb.Config().GetLocations("resources.pipelines." + k),
Paths: []dyn.Path{
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")),
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")),
},
})
continue
case p.Schema != "" && p.Target != "":
locations := rb.Config().GetLocations("resources.pipelines." + k + ".schema")
locations = append(locations, rb.Config().GetLocations("resources.pipelines."+k+".target")...)
// The Databricks Terraform provider already has client side validation
// that does not allow this today. Having this here allows us to float
// this validation on `bundle validate` and provide location information.
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.",
Locations: locations,
Paths: []dyn.Path{
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")),
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")),
},
})
continue
case p.Schema != "":
schemaName = p.Schema
fieldPath = dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema"))
schemaLocation = rb.Config().GetLocations("resources.pipelines." + k + ".schema")
case p.Target != "":
schemaName = p.Target
fieldPath = dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target"))
schemaLocation = rb.Config().GetLocations("resources.pipelines." + k + ".target")
}
// Check if the schema is defined in the bundle.
matchLocations, matchPath, found := findSchemaInBundle(rb, p.Catalog, schemaName)
if !found {
continue
}
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Use ${%s.name} syntax to refer to the UC schema instead of directly using its name %q", matchPath, schemaName),
Detail: fmt.Sprintf(`Using ${%s.name} will allow DABs to capture the deploy time dependency this DLT pipeline
has on the schema %q and deploy changes to the schema before deploying the pipeline.`, matchPath, schemaName),
Locations: append(schemaLocation, matchLocations...),
Paths: []dyn.Path{
fieldPath,
matchPath,
},
})
}
for k, v := range rb.Config().Resources.Volumes {
if v.CatalogName == "" || v.SchemaName == "" {
continue
}
matchLocations, matchPath, found := findSchemaInBundle(rb, v.CatalogName, v.SchemaName)
if !found {
continue
}
fieldLocations := rb.Config().GetLocations("resources.volumes." + k + ".schema_name")
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Use ${%s.name} syntax to refer to the UC schema instead of directly using its name %q", matchPath, v.SchemaName),
Detail: fmt.Sprintf(`Using ${%s.name} will allow DABs to capture the deploy time dependency this Volume
has on the schema %q and deploy changes to the schema before deploying the Volume.`, matchPath, v.SchemaName),
Locations: append(matchLocations, fieldLocations...),
Paths: []dyn.Path{
dyn.NewPath(dyn.Key("resources"), dyn.Key("volumes"), dyn.Key(k), dyn.Key("schema")),
matchPath,
},
})
}
return diags
}

View File

@ -1,228 +0,0 @@
package validate
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
)
func TestValidateSchemaReferencesForPipelines(t *testing.T) {
pipelineTargetL := dyn.Location{File: "file1", Line: 1, Column: 1}
pipelineSchemaL := dyn.Location{File: "file2", Line: 2, Column: 2}
pipelineL := dyn.Location{File: "file3", Line: 3, Column: 3}
schemaL := dyn.Location{File: "file4", Line: 4, Column: 4}
for _, tc := range []struct {
schemaV string
targetV string
catalogV string
want diag.Diagnostics
}{
{
schemaV: "",
targetV: "",
catalogV: "",
want: diag.Diagnostics{},
},
{
schemaV: "",
targetV: "",
catalogV: "main",
want: diag.Diagnostics{{
Summary: "Unity Catalog pipeline should have a schema or target defined",
Severity: diag.Error,
Detail: `The target or schema field is required for UC pipelines. Reason: DLT
requires specifying a target schema for UC pipelines. Please use the
TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING
TABLE statement if you do not wish to publish your dataset.`,
Locations: []dyn.Location{pipelineL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.schema"),
dyn.MustPathFromString("resources.pipelines.p1.target"),
},
}},
},
{
schemaV: "both",
targetV: "both",
catalogV: "main",
want: diag.Diagnostics{{
Severity: diag.Error,
Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.",
Locations: []dyn.Location{pipelineSchemaL, pipelineTargetL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.schema"),
dyn.MustPathFromString("resources.pipelines.p1.target"),
},
}},
},
{
schemaV: "schema1",
targetV: "",
catalogV: "other",
want: diag.Diagnostics{},
},
{
schemaV: "schema1",
targetV: "",
catalogV: "main",
want: diag.Diagnostics{{
Severity: diag.Warning,
Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`,
Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this DLT pipeline
has on the schema "schema1" and deploy changes to the schema before deploying the pipeline.`,
Locations: []dyn.Location{pipelineSchemaL, schemaL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.schema"),
dyn.MustPathFromString("resources.schemas.s1"),
},
}},
},
{
schemaV: "",
targetV: "schema1",
catalogV: "main",
want: diag.Diagnostics{{
Severity: diag.Warning,
Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`,
Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this DLT pipeline
has on the schema "schema1" and deploy changes to the schema before deploying the pipeline.`,
Locations: []dyn.Location{pipelineTargetL, schemaL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.target"),
dyn.MustPathFromString("resources.schemas.s1"),
},
}},
},
{
schemaV: "${resources.schemas.s1.name}",
targetV: "",
catalogV: "main",
want: diag.Diagnostics{},
},
{
schemaV: "",
targetV: "${resources.schemas.s1.name}",
catalogV: "main",
want: diag.Diagnostics{},
},
} {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"s1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "main",
Name: "schema1",
},
},
},
Pipelines: map[string]*resources.Pipeline{
"p1": {
PipelineSpec: &pipelines.PipelineSpec{
Name: "abc",
Schema: tc.schemaV,
Target: tc.targetV,
Catalog: tc.catalogV,
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.schemas.s1", []dyn.Location{schemaL})
bundletest.SetLocation(b, "resources.pipelines.p1", []dyn.Location{pipelineL})
if tc.schemaV != "" {
bundletest.SetLocation(b, "resources.pipelines.p1.schema", []dyn.Location{pipelineSchemaL})
}
if tc.targetV != "" {
bundletest.SetLocation(b, "resources.pipelines.p1.target", []dyn.Location{pipelineTargetL})
}
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), SchemaReferences())
assert.Equal(t, tc.want, diags)
}
}
func TestValidateSchemaReferencesForVolumes(t *testing.T) {
schemaL := dyn.Location{File: "file1", Line: 1, Column: 1}
volumeSchemaL := dyn.Location{File: "file2", Line: 2, Column: 2}
for _, tc := range []struct {
catalogV string
schemaV string
want diag.Diagnostics
}{
{
catalogV: "main",
schemaV: "schema1",
want: diag.Diagnostics{{
Severity: diag.Warning,
Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`,
Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this Volume
has on the schema "schema1" and deploy changes to the schema before deploying the Volume.`,
Locations: []dyn.Location{schemaL, volumeSchemaL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.volumes.v1.schema"),
dyn.MustPathFromString("resources.schemas.s1"),
},
}},
},
{
catalogV: "main",
schemaV: "${resources.schemas.s1.name}",
want: diag.Diagnostics{},
},
{
catalogV: "main",
schemaV: "other",
want: diag.Diagnostics{},
},
{
catalogV: "other",
schemaV: "schema1",
want: diag.Diagnostics{},
},
} {
b := bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"s1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "main",
Name: "schema1",
},
},
},
Volumes: map[string]*resources.Volume{
"v1": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
SchemaName: tc.schemaV,
CatalogName: tc.catalogV,
Name: "my_volume",
},
},
},
},
},
}
bundletest.SetLocation(&b, "resources.schemas.s1", []dyn.Location{schemaL})
bundletest.SetLocation(&b, "resources.volumes.v1.schema_name", []dyn.Location{volumeSchemaL})
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(&b), SchemaReferences())
assert.Equal(t, tc.want, diags)
}
}

View File

@ -63,6 +63,7 @@ func Initialize() bundle.Mutator {
"workspace", "workspace",
"variables", "variables",
), ),
mutator.ResolveSchemaDependency(),
// Provide permission config errors & warnings after initializing all variables // Provide permission config errors & warnings after initializing all variables
permissions.PermissionDiagnostics(), permissions.PermissionDiagnostics(),
mutator.SetRunAs(), mutator.SetRunAs(),