2023-11-09 16:38:45 +00:00
|
|
|
package cfgpickers
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"regexp"
|
|
|
|
"strings"
|
|
|
|
|
|
|
|
"github.com/databricks/cli/libs/cmdio"
|
|
|
|
"github.com/databricks/databricks-sdk-go"
|
|
|
|
"github.com/databricks/databricks-sdk-go/service/compute"
|
|
|
|
"github.com/databricks/databricks-sdk-go/service/iam"
|
|
|
|
"github.com/fatih/color"
|
|
|
|
"github.com/manifoldco/promptui"
|
|
|
|
"golang.org/x/mod/semver"
|
|
|
|
)
|
|
|
|
|
|
|
|
var minUcRuntime = canonicalVersion("v12.0")
|
|
|
|
|
|
|
|
var dbrVersionRegex = regexp.MustCompile(`^(\d+\.\d+)\.x-.*`)
|
|
|
|
var dbrSnapshotVersionRegex = regexp.MustCompile(`^(\d+)\.x-snapshot.*`)
|
|
|
|
|
|
|
|
func canonicalVersion(v string) string {
|
|
|
|
return semver.Canonical("v" + strings.TrimPrefix(v, "v"))
|
|
|
|
}
|
|
|
|
|
|
|
|
func GetRuntimeVersion(cluster compute.ClusterDetails) (string, bool) {
|
|
|
|
match := dbrVersionRegex.FindStringSubmatch(cluster.SparkVersion)
|
|
|
|
if len(match) < 1 {
|
|
|
|
match = dbrSnapshotVersionRegex.FindStringSubmatch(cluster.SparkVersion)
|
|
|
|
if len(match) > 1 {
|
|
|
|
// we return 14.999 for 14.x-snapshot for semver.Compare() to work properly
|
|
|
|
return fmt.Sprintf("%s.999", match[1]), true
|
|
|
|
}
|
|
|
|
return "", false
|
|
|
|
}
|
|
|
|
return match[1], true
|
|
|
|
}
|
|
|
|
|
|
|
|
func IsCompatibleWithUC(cluster compute.ClusterDetails, minVersion string) bool {
|
|
|
|
minVersion = canonicalVersion(minVersion)
|
|
|
|
if semver.Compare(minUcRuntime, minVersion) >= 0 {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
runtimeVersion, ok := GetRuntimeVersion(cluster)
|
|
|
|
if !ok {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
clusterRuntime := canonicalVersion(runtimeVersion)
|
|
|
|
if semver.Compare(minVersion, clusterRuntime) > 0 {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
switch cluster.DataSecurityMode {
|
|
|
|
case compute.DataSecurityModeUserIsolation, compute.DataSecurityModeSingleUser:
|
|
|
|
return true
|
|
|
|
default:
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
var ErrNoCompatibleClusters = errors.New("no compatible clusters found")
|
|
|
|
|
|
|
|
type compatibleCluster struct {
|
|
|
|
compute.ClusterDetails
|
|
|
|
versionName string
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v compatibleCluster) Access() string {
|
|
|
|
switch v.DataSecurityMode {
|
|
|
|
case compute.DataSecurityModeUserIsolation:
|
|
|
|
return "Shared"
|
|
|
|
case compute.DataSecurityModeSingleUser:
|
|
|
|
return "Assigned"
|
|
|
|
default:
|
|
|
|
return "Unknown"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v compatibleCluster) Runtime() string {
|
|
|
|
runtime, _, _ := strings.Cut(v.versionName, " (")
|
|
|
|
return runtime
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v compatibleCluster) State() string {
|
|
|
|
state := v.ClusterDetails.State
|
|
|
|
switch state {
|
|
|
|
case compute.StateRunning, compute.StateResizing:
|
|
|
|
return color.GreenString(state.String())
|
|
|
|
case compute.StateError, compute.StateTerminated, compute.StateTerminating, compute.StateUnknown:
|
|
|
|
return color.RedString(state.String())
|
|
|
|
default:
|
|
|
|
return color.BlueString(state.String())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type clusterFilter func(cluster *compute.ClusterDetails, me *iam.User) bool
|
|
|
|
|
|
|
|
func WithDatabricksConnect(minVersion string) func(*compute.ClusterDetails, *iam.User) bool {
|
|
|
|
return func(cluster *compute.ClusterDetails, me *iam.User) bool {
|
|
|
|
if !IsCompatibleWithUC(*cluster, minVersion) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
switch cluster.ClusterSource {
|
|
|
|
case compute.ClusterSourceJob,
|
|
|
|
compute.ClusterSourceModels,
|
|
|
|
compute.ClusterSourcePipeline,
|
|
|
|
compute.ClusterSourcePipelineMaintenance,
|
|
|
|
compute.ClusterSourceSql:
|
|
|
|
// only UI and API clusters are usable for DBConnect.
|
|
|
|
// `CanUseClient: "NOTEBOOKS"`` didn't seem to have an effect.
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
if cluster.SingleUserName != "" && cluster.SingleUserName != me.UserName {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-11-30 09:59:11 +00:00
|
|
|
// WithoutSystemClusters removes clusters created for system purposes (e.g. job runs, pipeline maintenance, etc.).
|
|
|
|
// It does this by keeping only clusters created through the UI or an API call.
|
|
|
|
func WithoutSystemClusters() func(*compute.ClusterDetails, *iam.User) bool {
|
|
|
|
return func(cluster *compute.ClusterDetails, me *iam.User) bool {
|
|
|
|
switch cluster.ClusterSource {
|
|
|
|
case compute.ClusterSourceApi, compute.ClusterSourceUi:
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-11-09 16:38:45 +00:00
|
|
|
func loadInteractiveClusters(ctx context.Context, w *databricks.WorkspaceClient, filters []clusterFilter) ([]compatibleCluster, error) {
|
|
|
|
promptSpinner := cmdio.Spinner(ctx)
|
|
|
|
promptSpinner <- "Loading list of clusters to select from"
|
|
|
|
defer close(promptSpinner)
|
|
|
|
all, err := w.Clusters.ListAll(ctx, compute.ListClustersRequest{
|
|
|
|
CanUseClient: "NOTEBOOKS",
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("list clusters: %w", err)
|
|
|
|
}
|
|
|
|
me, err := w.CurrentUser.Me(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("current user: %w", err)
|
|
|
|
}
|
|
|
|
versions := map[string]string{}
|
|
|
|
sv, err := w.Clusters.SparkVersions(ctx)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("list runtime versions: %w", err)
|
|
|
|
}
|
|
|
|
for _, v := range sv.Versions {
|
|
|
|
versions[v.Key] = v.Name
|
|
|
|
}
|
|
|
|
var compatible []compatibleCluster
|
|
|
|
for _, cluster := range all {
|
|
|
|
var skip bool
|
|
|
|
for _, filter := range filters {
|
|
|
|
if !filter(&cluster, me) {
|
|
|
|
skip = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if skip {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
compatible = append(compatible, compatibleCluster{
|
|
|
|
ClusterDetails: cluster,
|
|
|
|
versionName: versions[cluster.SparkVersion],
|
|
|
|
})
|
|
|
|
}
|
|
|
|
return compatible, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func AskForCluster(ctx context.Context, w *databricks.WorkspaceClient, filters ...clusterFilter) (string, error) {
|
|
|
|
compatible, err := loadInteractiveClusters(ctx, w, filters)
|
|
|
|
if err != nil {
|
|
|
|
return "", fmt.Errorf("load: %w", err)
|
|
|
|
}
|
|
|
|
if len(compatible) == 0 {
|
|
|
|
return "", ErrNoCompatibleClusters
|
|
|
|
}
|
|
|
|
if len(compatible) == 1 {
|
|
|
|
return compatible[0].ClusterId, nil
|
|
|
|
}
|
|
|
|
i, _, err := cmdio.RunSelect(ctx, &promptui.Select{
|
|
|
|
Label: "Choose compatible cluster",
|
|
|
|
Items: compatible,
|
|
|
|
Searcher: func(input string, idx int) bool {
|
|
|
|
lower := strings.ToLower(compatible[idx].ClusterName)
|
|
|
|
return strings.Contains(lower, input)
|
|
|
|
},
|
|
|
|
StartInSearchMode: true,
|
|
|
|
Templates: &promptui.SelectTemplates{
|
|
|
|
Label: "{{.ClusterName | faint}}",
|
|
|
|
Active: `{{.ClusterName | bold}} ({{.State}} {{.Access}} Runtime {{.Runtime}}) ({{.ClusterId | faint}})`,
|
|
|
|
Inactive: `{{.ClusterName}} ({{.State}} {{.Access}} Runtime {{.Runtime}})`,
|
|
|
|
Selected: `{{ "Configured cluster" | faint }}: {{ .ClusterName | bold }} ({{.ClusterId | faint}})`,
|
|
|
|
},
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
return compatible[i].ClusterId, nil
|
|
|
|
}
|