databricks-cli/bundle/config/resources/pipeline.go

94 lines
2.6 KiB
Go

package resources
import (
"context"
"strings"
"github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/marshal"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/imdario/mergo"
)
type Pipeline struct {
ID string `json:"id,omitempty" bundle:"readonly"`
Permissions []Permission `json:"permissions,omitempty"`
ModifiedStatus ModifiedStatus `json:"modified_status,omitempty" bundle:"internal"`
paths.Paths
*pipelines.PipelineSpec
}
func (s *Pipeline) UnmarshalJSON(b []byte) error {
return marshal.Unmarshal(b, s)
}
func (s Pipeline) MarshalJSON() ([]byte, error) {
return marshal.Marshal(s)
}
// MergeClusters merges cluster definitions with same label.
// The clusters field is a slice, and as such, overrides are appended to it.
// We can identify a cluster by its label, however, so we can use this label
// to figure out which definitions are actually overrides and merge them.
//
// Note: the cluster label is optional and defaults to 'default'.
// We therefore ALSO merge all clusters without a label.
func (p *Pipeline) MergeClusters() error {
clusters := make(map[string]*pipelines.PipelineCluster)
output := make([]pipelines.PipelineCluster, 0, len(p.Clusters))
// Normalize cluster labels.
// If empty, this defaults to "default".
// To make matching case insensitive, labels are lowercased.
for i := range p.Clusters {
label := p.Clusters[i].Label
if label == "" {
label = "default"
}
p.Clusters[i].Label = strings.ToLower(label)
}
// Target overrides are always appended, so we can iterate in natural order to
// first find the base definition, and merge instances we encounter later.
for i := range p.Clusters {
label := p.Clusters[i].Label
// Register pipeline cluster with label if not yet seen before.
ref, ok := clusters[label]
if !ok {
output = append(output, p.Clusters[i])
clusters[label] = &output[len(output)-1]
continue
}
// Merge this instance into the reference.
err := mergo.Merge(ref, &p.Clusters[i], mergo.WithOverride, mergo.WithAppendSlice)
if err != nil {
return err
}
}
// Overwrite resulting slice.
p.Clusters = output
return nil
}
func (p *Pipeline) Exists(ctx context.Context, w *databricks.WorkspaceClient, id string) (bool, error) {
_, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{
PipelineId: id,
})
if err != nil {
log.Debugf(ctx, "pipeline %s does not exist", id)
return false, err
}
return true, nil
}
func (p *Pipeline) TerraformResourceName() string {
return "databricks_pipeline"
}