Warn user to use ${resources.schemas...} syntax

This commit is contained in:
Shreyas Goenka 2024-12-09 22:00:24 +05:30
parent 227a13556b
commit 8a7d5fd1ec
No known key found for this signature in database
GPG Key ID: 92A07DF49CCB0622
3 changed files with 359 additions and 0 deletions

View File

@ -0,0 +1,130 @@
package validate
import (
"context"
"fmt"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
)
// Validate that any references to UC schemas defined in the DAB use the ${resources.schemas...}
// syntax to capture the deploy time dependency.
func SchemaReferences() bundle.ReadOnlyMutator {
return &schemaReferences{}
}
type schemaReferences struct{}
func (v *schemaReferences) Name() string {
return "validate:schema_dependency"
}
func findSchemaInBundle(rb bundle.ReadOnlyBundle, catalogName, schemaName string) ([]dyn.Location, dyn.Path, bool) {
for k, s := range rb.Config().Resources.Schemas {
if s.CatalogName != catalogName || s.Name != schemaName {
continue
}
return rb.Config().GetLocations("resources.schemas." + k), dyn.NewPath(dyn.Key("resources"), dyn.Key("schemas"), dyn.Key(k)), true
}
return nil, nil, false
}
func (v *schemaReferences) Apply(ctx context.Context, rb bundle.ReadOnlyBundle) diag.Diagnostics {
diags := diag.Diagnostics{}
for k, p := range rb.Config().Resources.Pipelines {
// Skip if the pipeline uses hive metastore.
if p.Catalog == "" {
continue
}
schemaName := ""
fieldPath := dyn.Path{}
schemaLocation := []dyn.Location{}
switch {
case p.Schema == "" && p.Target == "":
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: "Unity Catalog pipeline should have a schema or target defined",
Detail: `The target or schema field is required for UC pipelines. Reason: DLT
requires specifying a target schema for UC pipelines. Please use the
TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING
TABLE statement if you do not wish to publish your dataset.`,
Locations: rb.Config().GetLocations("resources.pipelines." + k),
Paths: []dyn.Path{
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")),
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")),
},
})
continue
case p.Schema != "" && p.Target != "":
locations := rb.Config().GetLocations("resources.pipelines." + k + ".schema")
locations = append(locations, rb.Config().GetLocations("resources.pipelines."+k+".target")...)
diags = append(diags, diag.Diagnostic{
Severity: diag.Error,
Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.",
Locations: locations,
Paths: []dyn.Path{
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema")),
dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target")),
},
})
continue
case p.Schema != "":
schemaName = p.Schema
fieldPath = dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("schema"))
schemaLocation = rb.Config().GetLocations("resources.pipelines." + k + ".schema")
case p.Target != "":
schemaName = p.Target
fieldPath = dyn.NewPath(dyn.Key("resources"), dyn.Key("pipelines"), dyn.Key(k), dyn.Key("target"))
schemaLocation = rb.Config().GetLocations("resources.pipelines." + k + ".target")
}
// Check if the schema is defined in the bundle.
matchLocations, matchPath, found := findSchemaInBundle(rb, p.Catalog, schemaName)
if !found {
continue
}
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Use ${%s.name} syntax to refer to the UC schema instead of directly using its name %q", matchPath, schemaName),
Detail: fmt.Sprintf(`Using ${%s.name} will allow DABs to capture the deploy time dependency this DLT pipeline
has on the schema %q and deploy changes to the schema before deploying the pipeline.`, matchPath, schemaName),
Locations: append(schemaLocation, matchLocations...),
Paths: []dyn.Path{
fieldPath,
matchPath,
},
})
}
for k, v := range rb.Config().Resources.Volumes {
if v.CatalogName == "" || v.SchemaName == "" {
continue
}
matchLocations, matchPath, found := findSchemaInBundle(rb, v.CatalogName, v.SchemaName)
if !found {
continue
}
fieldLocations := rb.Config().GetLocations("resources.volumes." + k + ".schema_name")
diags = append(diags, diag.Diagnostic{
Severity: diag.Warning,
Summary: fmt.Sprintf("Use ${%s.name} syntax to refer to the UC schema instead of directly using its name %q", matchPath, v.SchemaName),
Detail: fmt.Sprintf(`Using ${%s.name} will allow DABs to capture the deploy time dependency this Volume
has on the schema %q and deploy changes to the schema before deploying the Volume.`, matchPath, v.SchemaName),
Locations: append(matchLocations, fieldLocations...),
Paths: []dyn.Path{
dyn.NewPath(dyn.Key("resources"), dyn.Key("volumes"), dyn.Key(k), dyn.Key("schema")),
matchPath,
},
})
}
return diags
}

View File

@ -0,0 +1,228 @@
package validate
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/cli/bundle/internal/bundletest"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
)
func TestValidateSchemaReferencesForPipelines(t *testing.T) {
pipelineTargetL := dyn.Location{File: "file1", Line: 1, Column: 1}
pipelineSchemaL := dyn.Location{File: "file2", Line: 2, Column: 2}
pipelineL := dyn.Location{File: "file3", Line: 3, Column: 3}
schemaL := dyn.Location{File: "file4", Line: 4, Column: 4}
for _, tc := range []struct {
schemaV string
targetV string
catalogV string
want diag.Diagnostics
}{
{
schemaV: "",
targetV: "",
catalogV: "",
want: diag.Diagnostics{},
},
{
schemaV: "",
targetV: "",
catalogV: "main",
want: diag.Diagnostics{{
Summary: "Unity Catalog pipeline should have a schema or target defined",
Severity: diag.Error,
Detail: `The target or schema field is required for UC pipelines. Reason: DLT
requires specifying a target schema for UC pipelines. Please use the
TEMPORARY keyword in the CREATE MATERIALIZED VIEW or CREATE STREAMING
TABLE statement if you do not wish to publish your dataset.`,
Locations: []dyn.Location{pipelineL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.schema"),
dyn.MustPathFromString("resources.pipelines.p1.target"),
},
}},
},
{
schemaV: "both",
targetV: "both",
catalogV: "main",
want: diag.Diagnostics{{
Severity: diag.Error,
Summary: "Both schema and target are defined in a Unity Catalog pipeline. Only one of them should be defined.",
Locations: []dyn.Location{pipelineSchemaL, pipelineTargetL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.schema"),
dyn.MustPathFromString("resources.pipelines.p1.target"),
},
}},
},
{
schemaV: "schema1",
targetV: "",
catalogV: "other",
want: diag.Diagnostics{},
},
{
schemaV: "schema1",
targetV: "",
catalogV: "main",
want: diag.Diagnostics{{
Severity: diag.Warning,
Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`,
Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this DLT pipeline
has on the schema "schema1" and deploy changes to the schema before deploying the pipeline.`,
Locations: []dyn.Location{pipelineSchemaL, schemaL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.schema"),
dyn.MustPathFromString("resources.schemas.s1"),
},
}},
},
{
schemaV: "",
targetV: "schema1",
catalogV: "main",
want: diag.Diagnostics{{
Severity: diag.Warning,
Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`,
Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this DLT pipeline
has on the schema "schema1" and deploy changes to the schema before deploying the pipeline.`,
Locations: []dyn.Location{pipelineTargetL, schemaL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.pipelines.p1.target"),
dyn.MustPathFromString("resources.schemas.s1"),
},
}},
},
{
schemaV: "${resources.schemas.s1.name}",
targetV: "",
catalogV: "main",
want: diag.Diagnostics{},
},
{
schemaV: "",
targetV: "${resources.schemas.s1.name}",
catalogV: "main",
want: diag.Diagnostics{},
},
} {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"s1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "main",
Name: "schema1",
},
},
},
Pipelines: map[string]*resources.Pipeline{
"p1": {
PipelineSpec: &pipelines.PipelineSpec{
Name: "abc",
Schema: tc.schemaV,
Target: tc.targetV,
Catalog: tc.catalogV,
},
},
},
},
},
}
bundletest.SetLocation(b, "resources.schemas.s1", []dyn.Location{schemaL})
bundletest.SetLocation(b, "resources.pipelines.p1", []dyn.Location{pipelineL})
if tc.schemaV != "" {
bundletest.SetLocation(b, "resources.pipelines.p1.schema", []dyn.Location{pipelineSchemaL})
}
if tc.targetV != "" {
bundletest.SetLocation(b, "resources.pipelines.p1.target", []dyn.Location{pipelineTargetL})
}
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(b), SchemaReferences())
assert.Equal(t, tc.want, diags)
}
}
func TestValidateSchemaReferencesForVolumes(t *testing.T) {
schemaL := dyn.Location{File: "file1", Line: 1, Column: 1}
volumeSchemaL := dyn.Location{File: "file2", Line: 2, Column: 2}
for _, tc := range []struct {
catalogV string
schemaV string
want diag.Diagnostics
}{
{
catalogV: "main",
schemaV: "schema1",
want: diag.Diagnostics{{
Severity: diag.Warning,
Summary: `Use ${resources.schemas.s1.name} syntax to refer to the UC schema instead of directly using its name "schema1"`,
Detail: `Using ${resources.schemas.s1.name} will allow DABs to capture the deploy time dependency this Volume
has on the schema "schema1" and deploy changes to the schema before deploying the Volume.`,
Locations: []dyn.Location{schemaL, volumeSchemaL},
Paths: []dyn.Path{
dyn.MustPathFromString("resources.volumes.v1.schema"),
dyn.MustPathFromString("resources.schemas.s1"),
},
}},
},
{
catalogV: "main",
schemaV: "${resources.schemas.s1.name}",
want: diag.Diagnostics{},
},
{
catalogV: "main",
schemaV: "other",
want: diag.Diagnostics{},
},
{
catalogV: "other",
schemaV: "schema1",
want: diag.Diagnostics{},
},
} {
b := bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Schemas: map[string]*resources.Schema{
"s1": {
CreateSchema: &catalog.CreateSchema{
CatalogName: "main",
Name: "schema1",
},
},
},
Volumes: map[string]*resources.Volume{
"v1": {
CreateVolumeRequestContent: &catalog.CreateVolumeRequestContent{
SchemaName: tc.schemaV,
CatalogName: tc.catalogV,
Name: "my_volume",
},
},
},
},
},
}
bundletest.SetLocation(&b, "resources.schemas.s1", []dyn.Location{schemaL})
bundletest.SetLocation(&b, "resources.volumes.v1.schema_name", []dyn.Location{volumeSchemaL})
diags := bundle.ApplyReadOnly(context.Background(), bundle.ReadOnly(&b), SchemaReferences())
assert.Equal(t, tc.want, diags)
}
}

View File

@ -37,6 +37,7 @@ func (v *validate) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics
JobTaskClusterSpec(),
ValidateFolderPermissions(),
SingleNodeCluster(),
SchemaReferences(),
))
}