Fixed panic when loading incorrectly defined jobs (#1402)

## Changes
If only key was defined for a job in YAML config, validate previously
failed with segfault.

This PR validates that jobs are correctly defined and returns an error
if not.

## Tests
Added regression test
This commit is contained in:
Andrew Nester 2024-05-17 12:10:17 +02:00 committed by GitHub
parent 04e56aa472
commit a014d50a6a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 239 additions and 6 deletions

View File

@ -56,7 +56,11 @@ func TestDefaultQueueingApplyEnableQueueing(t *testing.T) {
Config: config.Root{ Config: config.Root{
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{
"job": {}, "job": {
JobSettings: &jobs.JobSettings{
Name: "job",
},
},
}, },
}, },
}, },
@ -77,7 +81,11 @@ func TestDefaultQueueingApplyWithMultipleJobs(t *testing.T) {
Queue: &jobs.QueueSettings{Enabled: false}, Queue: &jobs.QueueSettings{Enabled: false},
}, },
}, },
"job2": {}, "job2": {
JobSettings: &jobs.JobSettings{
Name: "job",
},
},
"job3": { "job3": {
JobSettings: &jobs.JobSettings{ JobSettings: &jobs.JobSettings{
Queue: &jobs.QueueSettings{Enabled: true}, Queue: &jobs.QueueSettings{Enabled: true},

View File

@ -126,6 +126,47 @@ func (r *Resources) VerifyUniqueResourceIdentifiers() (*UniqueResourceIdTracker,
return tracker, nil return tracker, nil
} }
type resource struct {
resource ConfigResource
resource_type string
key string
}
func (r *Resources) allResources() []resource {
all := make([]resource, 0)
for k, e := range r.Jobs {
all = append(all, resource{resource_type: "job", resource: e, key: k})
}
for k, e := range r.Pipelines {
all = append(all, resource{resource_type: "pipeline", resource: e, key: k})
}
for k, e := range r.Models {
all = append(all, resource{resource_type: "model", resource: e, key: k})
}
for k, e := range r.Experiments {
all = append(all, resource{resource_type: "experiment", resource: e, key: k})
}
for k, e := range r.ModelServingEndpoints {
all = append(all, resource{resource_type: "serving endpoint", resource: e, key: k})
}
for k, e := range r.RegisteredModels {
all = append(all, resource{resource_type: "registered model", resource: e, key: k})
}
return all
}
func (r *Resources) VerifyAllResourcesDefined() error {
all := r.allResources()
for _, e := range all {
err := e.resource.Validate()
if err != nil {
return fmt.Errorf("%s %s is not defined", e.resource_type, e.key)
}
}
return nil
}
// ConfigureConfigFilePath sets the specified path for all resources contained in this instance. // ConfigureConfigFilePath sets the specified path for all resources contained in this instance.
// This property is used to correctly resolve paths relative to the path // This property is used to correctly resolve paths relative to the path
// of the configuration file they were defined in. // of the configuration file they were defined in.
@ -153,6 +194,7 @@ func (r *Resources) ConfigureConfigFilePath() {
type ConfigResource interface { type ConfigResource interface {
Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error)
TerraformResourceName() string TerraformResourceName() string
Validate() error
} }
func (r *Resources) FindResourceByConfigKey(key string) (ConfigResource, error) { func (r *Resources) FindResourceByConfigKey(key string) (ConfigResource, error) {

View File

@ -2,6 +2,7 @@ package resources
import ( import (
"context" "context"
"fmt"
"strconv" "strconv"
"github.com/databricks/cli/bundle/config/paths" "github.com/databricks/cli/bundle/config/paths"
@ -47,3 +48,11 @@ func (j *Job) Exists(ctx context.Context, w *databricks.WorkspaceClient, id stri
func (j *Job) TerraformResourceName() string { func (j *Job) TerraformResourceName() string {
return "databricks_job" return "databricks_job"
} }
func (j *Job) Validate() error {
if j == nil || !j.DynamicValue.IsValid() || j.JobSettings == nil {
return fmt.Errorf("job is not defined")
}
return nil
}

View File

@ -1,7 +1,12 @@
package resources package resources
import ( import (
"context"
"fmt"
"github.com/databricks/cli/bundle/config/paths" "github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/ml" "github.com/databricks/databricks-sdk-go/service/ml"
) )
@ -23,3 +28,26 @@ func (s *MlflowExperiment) UnmarshalJSON(b []byte) error {
func (s MlflowExperiment) MarshalJSON() ([]byte, error) { func (s MlflowExperiment) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s) return marshal.Marshal(s)
} }
func (s *MlflowExperiment) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
_, err := w.Experiments.GetExperiment(ctx, ml.GetExperimentRequest{
ExperimentId: id,
})
if err != nil {
log.Debugf(ctx, "experiment %s does not exist", id)
return false, err
}
return true, nil
}
func (s *MlflowExperiment) TerraformResourceName() string {
return "databricks_mlflow_experiment"
}
func (s *MlflowExperiment) Validate() error {
if s == nil || !s.DynamicValue.IsValid() {
return fmt.Errorf("experiment is not defined")
}
return nil
}

View File

@ -1,7 +1,12 @@
package resources package resources
import ( import (
"context"
"fmt"
"github.com/databricks/cli/bundle/config/paths" "github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/ml" "github.com/databricks/databricks-sdk-go/service/ml"
) )
@ -23,3 +28,26 @@ func (s *MlflowModel) UnmarshalJSON(b []byte) error {
func (s MlflowModel) MarshalJSON() ([]byte, error) { func (s MlflowModel) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s) return marshal.Marshal(s)
} }
func (s *MlflowModel) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
_, err := w.ModelRegistry.GetModel(ctx, ml.GetModelRequest{
Name: id,
})
if err != nil {
log.Debugf(ctx, "model %s does not exist", id)
return false, err
}
return true, nil
}
func (s *MlflowModel) TerraformResourceName() string {
return "databricks_mlflow_model"
}
func (s *MlflowModel) Validate() error {
if s == nil || !s.DynamicValue.IsValid() {
return fmt.Errorf("model is not defined")
}
return nil
}

View File

@ -1,7 +1,12 @@
package resources package resources
import ( import (
"context"
"fmt"
"github.com/databricks/cli/bundle/config/paths" "github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/serving" "github.com/databricks/databricks-sdk-go/service/serving"
) )
@ -33,3 +38,26 @@ func (s *ModelServingEndpoint) UnmarshalJSON(b []byte) error {
func (s ModelServingEndpoint) MarshalJSON() ([]byte, error) { func (s ModelServingEndpoint) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s) return marshal.Marshal(s)
} }
func (s *ModelServingEndpoint) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
_, err := w.ServingEndpoints.Get(ctx, serving.GetServingEndpointRequest{
Name: id,
})
if err != nil {
log.Debugf(ctx, "serving endpoint %s does not exist", id)
return false, err
}
return true, nil
}
func (s *ModelServingEndpoint) TerraformResourceName() string {
return "databricks_model_serving"
}
func (s *ModelServingEndpoint) Validate() error {
if s == nil || !s.DynamicValue.IsValid() {
return fmt.Errorf("serving endpoint is not defined")
}
return nil
}

View File

@ -2,6 +2,7 @@ package resources
import ( import (
"context" "context"
"fmt"
"github.com/databricks/cli/bundle/config/paths" "github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/log"
@ -42,3 +43,11 @@ func (p *Pipeline) Exists(ctx context.Context, w *databricks.WorkspaceClient, id
func (p *Pipeline) TerraformResourceName() string { func (p *Pipeline) TerraformResourceName() string {
return "databricks_pipeline" return "databricks_pipeline"
} }
func (p *Pipeline) Validate() error {
if p == nil || !p.DynamicValue.IsValid() {
return fmt.Errorf("pipeline is not defined")
}
return nil
}

View File

@ -1,7 +1,12 @@
package resources package resources
import ( import (
"context"
"fmt"
"github.com/databricks/cli/bundle/config/paths" "github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
) )
@ -34,3 +39,26 @@ func (s *RegisteredModel) UnmarshalJSON(b []byte) error {
func (s RegisteredModel) MarshalJSON() ([]byte, error) { func (s RegisteredModel) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s) return marshal.Marshal(s)
} }
func (s *RegisteredModel) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
_, err := w.RegisteredModels.Get(ctx, catalog.GetRegisteredModelRequest{
FullName: id,
})
if err != nil {
log.Debugf(ctx, "registered model %s does not exist", id)
return false, err
}
return true, nil
}
func (s *RegisteredModel) TerraformResourceName() string {
return "databricks_registered_model"
}
func (s *RegisteredModel) Validate() error {
if s == nil || !s.DynamicValue.IsValid() {
return fmt.Errorf("registered model is not defined")
}
return nil
}

View File

@ -138,6 +138,14 @@ func (r *Root) updateWithDynamicValue(nv dyn.Value) error {
// Assign the normalized configuration tree. // Assign the normalized configuration tree.
r.value = nv r.value = nv
// At the moment the check has to be done as part of updateWithDynamicValue
// because otherwise ConfigureConfigFilePath will fail with a panic.
// In the future, we should move this check to a separate mutator in initialise phase.
err = r.Resources.VerifyAllResourcesDefined()
if err != nil {
return err
}
// Assign config file paths after converting to typed configuration. // Assign config file paths after converting to typed configuration.
r.ConfigureConfigFilePath() r.ConfigureConfigFilePath()
return nil return nil

View File

@ -8,6 +8,7 @@ import (
"github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -45,9 +46,15 @@ func testFixture(userName string) *bundle.Bundle {
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{
"job1": { "job1": {
JobSettings: &jobs.JobSettings{
Name: "job1",
},
Permissions: p, Permissions: p,
}, },
"job2": { "job2": {
JobSettings: &jobs.JobSettings{
Name: "job2",
},
Permissions: p, Permissions: p,
}, },
}, },

View File

@ -7,6 +7,7 @@ import (
"github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config" "github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources" "github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -23,8 +24,16 @@ func TestApplyBundlePermissions(t *testing.T) {
}, },
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{
"job_1": {}, "job_1": {
"job_2": {}, JobSettings: &jobs.JobSettings{
Name: "job_1",
},
},
"job_2": {
JobSettings: &jobs.JobSettings{
Name: "job_2",
},
},
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline_1": {}, "pipeline_1": {},
@ -109,11 +118,17 @@ func TestWarningOnOverlapPermission(t *testing.T) {
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{
"job_1": { "job_1": {
JobSettings: &jobs.JobSettings{
Name: "job_1",
},
Permissions: []resources.Permission{ Permissions: []resources.Permission{
{Level: CAN_VIEW, UserName: "TestUser"}, {Level: CAN_VIEW, UserName: "TestUser"},
}, },
}, },
"job_2": { "job_2": {
JobSettings: &jobs.JobSettings{
Name: "job_2",
},
Permissions: []resources.Permission{ Permissions: []resources.Permission{
{Level: CAN_VIEW, UserName: "TestUser2"}, {Level: CAN_VIEW, UserName: "TestUser2"},
}, },

View File

@ -30,8 +30,8 @@ func TestApplyWorkspaceRootPermissions(t *testing.T) {
}, },
Resources: config.Resources{ Resources: config.Resources{
Jobs: map[string]*resources.Job{ Jobs: map[string]*resources.Job{
"job_1": {JobSettings: &jobs.JobSettings{}}, "job_1": {JobSettings: &jobs.JobSettings{Name: "job_1"}},
"job_2": {JobSettings: &jobs.JobSettings{}}, "job_2": {JobSettings: &jobs.JobSettings{Name: "job_2"}},
}, },
Pipelines: map[string]*resources.Pipeline{ Pipelines: map[string]*resources.Pipeline{
"pipeline_1": {PipelineSpec: &pipelines.PipelineSpec{}}, "pipeline_1": {PipelineSpec: &pipelines.PipelineSpec{}},

View File

@ -2,3 +2,4 @@ resources:
jobs: jobs:
my_first_job: my_first_job:
id: 1 id: 1
name: "My First Job"

View File

@ -2,3 +2,4 @@ resources:
jobs: jobs:
my_second_job: my_second_job:
id: 2 id: 2
name: "My Second Job"

View File

@ -2,3 +2,4 @@ resources:
jobs: jobs:
my_job: my_job:
id: 1 id: 1
name: "My Job"

View File

@ -0,0 +1,8 @@
bundle:
name: undefined-job
resources:
jobs:
undefined:
test:
name: "Test Job"

View File

@ -0,0 +1,12 @@
package config_tests
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestUndefinedJobLoadsWithError(t *testing.T) {
_, diags := loadTargetWithDiags("./undefined_job", "default")
assert.ErrorContains(t, diags.Error(), "job undefined is not defined")
}