mirror of https://github.com/databricks/cli.git
Added listing cluster filtering for cluster lookups (#1754)
## Changes We added a custom resolver for the cluster to add filtering for the cluster source when we list all clusters. Without the filtering listing could take a very long time (5-10 mins) which leads to lookup timeouts. ## Tests Existing unit tests passing
This commit is contained in:
parent
ceefa80d72
commit
02e83877f4
|
@ -116,6 +116,10 @@ func allResolvers() *resolvers {
|
||||||
{{range .Services -}}
|
{{range .Services -}}
|
||||||
{{- if in $allowlist .KebabName -}}
|
{{- if in $allowlist .KebabName -}}
|
||||||
r.{{.Singular.PascalName}} = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.{{.Singular.PascalName}} = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["{{.Singular.PascalName}}"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.{{.PascalName}}.GetBy{{range .NamedIdMap.NamePath}}{{.PascalName}}{{end}}(ctx, name)
|
entity, err := w.{{.PascalName}}.GetBy{{range .NamedIdMap.NamePath}}{{.PascalName}}{{end}}(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
|
|
@ -2,7 +2,6 @@ package mutator
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/databricks/cli/bundle"
|
"github.com/databricks/cli/bundle"
|
||||||
|
@ -44,11 +43,13 @@ func TestResolveClusterReference(t *testing.T) {
|
||||||
m := mocks.NewMockWorkspaceClient(t)
|
m := mocks.NewMockWorkspaceClient(t)
|
||||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||||
clusterApi := m.GetMockClustersAPI()
|
clusterApi := m.GetMockClustersAPI()
|
||||||
clusterApi.EXPECT().GetByClusterName(mock.Anything, clusterRef1).Return(&compute.ClusterDetails{
|
clusterApi.EXPECT().ListAll(mock.Anything, compute.ListClustersRequest{
|
||||||
ClusterId: "1234-5678-abcd",
|
FilterBy: &compute.ListClustersFilterBy{
|
||||||
}, nil)
|
ClusterSources: []compute.ClusterSource{compute.ClusterSourceApi, compute.ClusterSourceUi},
|
||||||
clusterApi.EXPECT().GetByClusterName(mock.Anything, clusterRef2).Return(&compute.ClusterDetails{
|
},
|
||||||
ClusterId: "9876-5432-xywz",
|
}).Return([]compute.ClusterDetails{
|
||||||
|
{ClusterId: "1234-5678-abcd", ClusterName: clusterRef1},
|
||||||
|
{ClusterId: "9876-5432-xywz", ClusterName: clusterRef2},
|
||||||
}, nil)
|
}, nil)
|
||||||
|
|
||||||
diags := bundle.Apply(context.Background(), b, ResolveResourceReferences())
|
diags := bundle.Apply(context.Background(), b, ResolveResourceReferences())
|
||||||
|
@ -78,10 +79,16 @@ func TestResolveNonExistentClusterReference(t *testing.T) {
|
||||||
m := mocks.NewMockWorkspaceClient(t)
|
m := mocks.NewMockWorkspaceClient(t)
|
||||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||||
clusterApi := m.GetMockClustersAPI()
|
clusterApi := m.GetMockClustersAPI()
|
||||||
clusterApi.EXPECT().GetByClusterName(mock.Anything, clusterRef).Return(nil, fmt.Errorf("ClusterDetails named '%s' does not exist", clusterRef))
|
clusterApi.EXPECT().ListAll(mock.Anything, compute.ListClustersRequest{
|
||||||
|
FilterBy: &compute.ListClustersFilterBy{
|
||||||
|
ClusterSources: []compute.ClusterSource{compute.ClusterSourceApi, compute.ClusterSourceUi},
|
||||||
|
},
|
||||||
|
}).Return([]compute.ClusterDetails{
|
||||||
|
{ClusterId: "1234-5678-abcd", ClusterName: "some other cluster"},
|
||||||
|
}, nil)
|
||||||
|
|
||||||
diags := bundle.Apply(context.Background(), b, ResolveResourceReferences())
|
diags := bundle.Apply(context.Background(), b, ResolveResourceReferences())
|
||||||
require.ErrorContains(t, diags.Error(), "failed to resolve cluster: Random, err: ClusterDetails named 'Random' does not exist")
|
require.ErrorContains(t, diags.Error(), "failed to resolve cluster: Random, err: cluster named 'Random' does not exist")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNoLookupIfVariableIsSet(t *testing.T) {
|
func TestNoLookupIfVariableIsSet(t *testing.T) {
|
||||||
|
@ -158,8 +165,14 @@ func TestResolveVariableReferencesInVariableLookups(t *testing.T) {
|
||||||
m := mocks.NewMockWorkspaceClient(t)
|
m := mocks.NewMockWorkspaceClient(t)
|
||||||
b.SetWorkpaceClient(m.WorkspaceClient)
|
b.SetWorkpaceClient(m.WorkspaceClient)
|
||||||
clusterApi := m.GetMockClustersAPI()
|
clusterApi := m.GetMockClustersAPI()
|
||||||
clusterApi.EXPECT().GetByClusterName(mock.Anything, "cluster-bar-dev").Return(&compute.ClusterDetails{
|
|
||||||
ClusterId: "1234-5678-abcd",
|
clusterApi.EXPECT().ListAll(mock.Anything, compute.ListClustersRequest{
|
||||||
|
FilterBy: &compute.ListClustersFilterBy{
|
||||||
|
ClusterSources: []compute.ClusterSource{compute.ClusterSourceApi, compute.ClusterSourceUi},
|
||||||
|
},
|
||||||
|
}).Return([]compute.ClusterDetails{
|
||||||
|
{ClusterId: "1234-5678-abcd", ClusterName: "cluster-bar-dev"},
|
||||||
|
{ClusterId: "9876-5432-xywz", ClusterName: "some other cluster"},
|
||||||
}, nil)
|
}, nil)
|
||||||
|
|
||||||
diags := bundle.Apply(context.Background(), b, bundle.Seq(ResolveVariableReferencesInLookup(), ResolveResourceReferences()))
|
diags := bundle.Apply(context.Background(), b, bundle.Seq(ResolveVariableReferencesInLookup(), ResolveResourceReferences()))
|
||||||
|
|
|
@ -220,6 +220,10 @@ type resolvers struct {
|
||||||
func allResolvers() *resolvers {
|
func allResolvers() *resolvers {
|
||||||
r := &resolvers{}
|
r := &resolvers{}
|
||||||
r.Alert = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.Alert = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["Alert"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.Alerts.GetByDisplayName(ctx, name)
|
entity, err := w.Alerts.GetByDisplayName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -228,6 +232,10 @@ func allResolvers() *resolvers {
|
||||||
return fmt.Sprint(entity.Id), nil
|
return fmt.Sprint(entity.Id), nil
|
||||||
}
|
}
|
||||||
r.ClusterPolicy = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.ClusterPolicy = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["ClusterPolicy"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.ClusterPolicies.GetByName(ctx, name)
|
entity, err := w.ClusterPolicies.GetByName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -236,6 +244,10 @@ func allResolvers() *resolvers {
|
||||||
return fmt.Sprint(entity.PolicyId), nil
|
return fmt.Sprint(entity.PolicyId), nil
|
||||||
}
|
}
|
||||||
r.Cluster = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.Cluster = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["Cluster"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.Clusters.GetByClusterName(ctx, name)
|
entity, err := w.Clusters.GetByClusterName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -244,6 +256,10 @@ func allResolvers() *resolvers {
|
||||||
return fmt.Sprint(entity.ClusterId), nil
|
return fmt.Sprint(entity.ClusterId), nil
|
||||||
}
|
}
|
||||||
r.Dashboard = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.Dashboard = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["Dashboard"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.Dashboards.GetByName(ctx, name)
|
entity, err := w.Dashboards.GetByName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -252,6 +268,10 @@ func allResolvers() *resolvers {
|
||||||
return fmt.Sprint(entity.Id), nil
|
return fmt.Sprint(entity.Id), nil
|
||||||
}
|
}
|
||||||
r.InstancePool = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.InstancePool = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["InstancePool"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.InstancePools.GetByInstancePoolName(ctx, name)
|
entity, err := w.InstancePools.GetByInstancePoolName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -260,6 +280,10 @@ func allResolvers() *resolvers {
|
||||||
return fmt.Sprint(entity.InstancePoolId), nil
|
return fmt.Sprint(entity.InstancePoolId), nil
|
||||||
}
|
}
|
||||||
r.Job = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.Job = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["Job"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.Jobs.GetBySettingsName(ctx, name)
|
entity, err := w.Jobs.GetBySettingsName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -268,6 +292,10 @@ func allResolvers() *resolvers {
|
||||||
return fmt.Sprint(entity.JobId), nil
|
return fmt.Sprint(entity.JobId), nil
|
||||||
}
|
}
|
||||||
r.Metastore = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.Metastore = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["Metastore"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.Metastores.GetByName(ctx, name)
|
entity, err := w.Metastores.GetByName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -276,6 +304,10 @@ func allResolvers() *resolvers {
|
||||||
return fmt.Sprint(entity.MetastoreId), nil
|
return fmt.Sprint(entity.MetastoreId), nil
|
||||||
}
|
}
|
||||||
r.Pipeline = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.Pipeline = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["Pipeline"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.Pipelines.GetByName(ctx, name)
|
entity, err := w.Pipelines.GetByName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -284,6 +316,10 @@ func allResolvers() *resolvers {
|
||||||
return fmt.Sprint(entity.PipelineId), nil
|
return fmt.Sprint(entity.PipelineId), nil
|
||||||
}
|
}
|
||||||
r.Query = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.Query = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["Query"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.Queries.GetByDisplayName(ctx, name)
|
entity, err := w.Queries.GetByDisplayName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -292,6 +328,10 @@ func allResolvers() *resolvers {
|
||||||
return fmt.Sprint(entity.Id), nil
|
return fmt.Sprint(entity.Id), nil
|
||||||
}
|
}
|
||||||
r.ServicePrincipal = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.ServicePrincipal = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["ServicePrincipal"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.ServicePrincipals.GetByDisplayName(ctx, name)
|
entity, err := w.ServicePrincipals.GetByDisplayName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -300,6 +340,10 @@ func allResolvers() *resolvers {
|
||||||
return fmt.Sprint(entity.ApplicationId), nil
|
return fmt.Sprint(entity.ApplicationId), nil
|
||||||
}
|
}
|
||||||
r.Warehouse = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
r.Warehouse = func(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
fn, ok := lookupOverrides["Warehouse"]
|
||||||
|
if ok {
|
||||||
|
return fn(ctx, w, name)
|
||||||
|
}
|
||||||
entity, err := w.Warehouses.GetByName(ctx, name)
|
entity, err := w.Warehouses.GetByName(ctx, name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
package variable
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/databricks/databricks-sdk-go"
|
||||||
|
"github.com/databricks/databricks-sdk-go/service/compute"
|
||||||
|
)
|
||||||
|
|
||||||
|
var lookupOverrides = map[string]resolverFunc{
|
||||||
|
"Cluster": resolveCluster,
|
||||||
|
}
|
||||||
|
|
||||||
|
// We added a custom resolver for the cluster to add filtering for the cluster source when we list all clusters.
|
||||||
|
// Without the filtering listing could take a very long time (5-10 mins) which leads to lookup timeouts.
|
||||||
|
func resolveCluster(ctx context.Context, w *databricks.WorkspaceClient, name string) (string, error) {
|
||||||
|
result, err := w.Clusters.ListAll(ctx, compute.ListClustersRequest{
|
||||||
|
FilterBy: &compute.ListClustersFilterBy{
|
||||||
|
ClusterSources: []compute.ClusterSource{compute.ClusterSourceApi, compute.ClusterSourceUi},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
tmp := map[string][]compute.ClusterDetails{}
|
||||||
|
for _, v := range result {
|
||||||
|
key := v.ClusterName
|
||||||
|
tmp[key] = append(tmp[key], v)
|
||||||
|
}
|
||||||
|
alternatives, ok := tmp[name]
|
||||||
|
if !ok || len(alternatives) == 0 {
|
||||||
|
return "", fmt.Errorf("cluster named '%s' does not exist", name)
|
||||||
|
}
|
||||||
|
if len(alternatives) > 1 {
|
||||||
|
return "", fmt.Errorf("there are %d instances of clusters named '%s'", len(alternatives), name)
|
||||||
|
}
|
||||||
|
return alternatives[0].ClusterId, nil
|
||||||
|
}
|
|
@ -124,8 +124,13 @@ func TestVariablesWithTargetLookupOverrides(t *testing.T) {
|
||||||
}, nil)
|
}, nil)
|
||||||
|
|
||||||
clustersApi := mockWorkspaceClient.GetMockClustersAPI()
|
clustersApi := mockWorkspaceClient.GetMockClustersAPI()
|
||||||
clustersApi.EXPECT().GetByClusterName(mock.Anything, "some-test-cluster").Return(&compute.ClusterDetails{
|
clustersApi.EXPECT().ListAll(mock.Anything, compute.ListClustersRequest{
|
||||||
ClusterId: "4321",
|
FilterBy: &compute.ListClustersFilterBy{
|
||||||
|
ClusterSources: []compute.ClusterSource{compute.ClusterSourceApi, compute.ClusterSourceUi},
|
||||||
|
},
|
||||||
|
}).Return([]compute.ClusterDetails{
|
||||||
|
{ClusterId: "4321", ClusterName: "some-test-cluster"},
|
||||||
|
{ClusterId: "9876", ClusterName: "some-other-cluster"},
|
||||||
}, nil)
|
}, nil)
|
||||||
|
|
||||||
clusterPoliciesApi := mockWorkspaceClient.GetMockClusterPoliciesAPI()
|
clusterPoliciesApi := mockWorkspaceClient.GetMockClusterPoliciesAPI()
|
||||||
|
|
Loading…
Reference in New Issue