diff --git a/bundle/bundle.go b/bundle/bundle.go index 69a2f6a4..076294d7 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -7,7 +7,7 @@ import ( "github.com/databricks/bricks/bundle/config" "github.com/databricks/bricks/bundle/config/mutator" - "github.com/databricks/databricks-sdk-go/workspaces" + "github.com/databricks/databricks-sdk-go" ) type Bundle struct { @@ -16,7 +16,7 @@ type Bundle struct { // Store a pointer to the workspace client. // It can be initialized on demand after loading the configuration. clientOnce sync.Once - client *workspaces.WorkspacesClient + client *databricks.WorkspaceClient } func (b *Bundle) MutateForEnvironment(env string) error { @@ -59,9 +59,13 @@ func ConfigureForEnvironment(ctx context.Context, env string) (context.Context, return Context(ctx, b), nil } -func (b *Bundle) WorkspaceClient() *workspaces.WorkspacesClient { +func (b *Bundle) WorkspaceClient() *databricks.WorkspaceClient { b.clientOnce.Do(func() { - b.client = b.Config.Workspace.Client() + var err error + b.client, err = b.Config.Workspace.Client() + if err != nil { + panic(err) + } }) return b.client } diff --git a/bundle/config/workspace.go b/bundle/config/workspace.go index 0da2acbb..45b244d9 100644 --- a/bundle/config/workspace.go +++ b/bundle/config/workspace.go @@ -1,8 +1,7 @@ package config import ( - "github.com/databricks/databricks-sdk-go/databricks" - "github.com/databricks/databricks-sdk-go/workspaces" + "github.com/databricks/databricks-sdk-go" ) // Workspace defines configurables at the workspace level. @@ -31,7 +30,7 @@ type Workspace struct { AzureLoginAppID string `json:"azure_login_app_id,omitempty"` } -func (w *Workspace) Client() *workspaces.WorkspacesClient { +func (w *Workspace) Client() (*databricks.WorkspaceClient, error) { config := databricks.Config{ // Generic Host: w.Host, @@ -49,5 +48,5 @@ func (w *Workspace) Client() *workspaces.WorkspacesClient { AzureLoginAppID: w.AzureLoginAppID, } - return workspaces.New(&config) + return databricks.NewWorkspaceClient(&config) } diff --git a/cmd/api/api.go b/cmd/api/api.go index 5b4d87e5..1ce91607 100644 --- a/cmd/api/api.go +++ b/cmd/api/api.go @@ -8,8 +8,8 @@ import ( "strings" "github.com/databricks/bricks/cmd/root" - "github.com/databricks/databricks-sdk-go/databricks" - "github.com/databricks/databricks-sdk-go/databricks/client" + "github.com/databricks/databricks-sdk-go/client" + "github.com/databricks/databricks-sdk-go/config" "github.com/spf13/cobra" ) @@ -52,7 +52,7 @@ func makeCommand(method string) *cobra.Command { return err } - api, err := client.New(&databricks.Config{}) + api, err := client.New(&config.Config{}) if err != nil { return err } diff --git a/cmd/sync/watchdog.go b/cmd/sync/watchdog.go index 19e55a32..03f8e120 100644 --- a/cmd/sync/watchdog.go +++ b/cmd/sync/watchdog.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log" + "net/http" "os" "path" "path/filepath" @@ -13,10 +14,10 @@ import ( "time" "github.com/databricks/bricks/project" - "github.com/databricks/databricks-sdk-go/databricks/apierr" - "github.com/databricks/databricks-sdk-go/databricks/client" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/service/workspace" - "github.com/databricks/databricks-sdk-go/workspaces" "golang.org/x/sync/errgroup" ) @@ -57,12 +58,12 @@ func putFile(ctx context.Context, path string, content io.Reader) error { apiPath := fmt.Sprintf( "/api/2.0/workspace-files/import-file/%s?overwrite=true", strings.TrimLeft(path, "/")) - return apiClient.Post(ctx, apiPath, content, nil) + return apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) } // path: The remote path of the file in the workspace -func deleteFile(ctx context.Context, path string, wsc *workspaces.WorkspacesClient) error { - err := wsc.Workspace.Delete(ctx, +func deleteFile(ctx context.Context, path string, w *databricks.WorkspaceClient) error { + err := w.Workspace.Delete(ctx, workspace.Delete{ Path: path, Recursive: true, @@ -78,7 +79,7 @@ func deleteFile(ctx context.Context, path string, wsc *workspaces.WorkspacesClie return err } -func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, wsc *workspaces.WorkspacesClient) func(localDiff diff) error { +func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, w *databricks.WorkspaceClient) func(localDiff diff) error { return func(d diff) error { // Abstraction over wait groups which allows you to get the errors @@ -95,7 +96,7 @@ func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, wsc *wor // is evaluated remoteNameCopy := remoteName g.Go(func() error { - err := deleteFile(ctx, path.Join(remoteDir, remoteNameCopy), wsc) + err := deleteFile(ctx, path.Join(remoteDir, remoteNameCopy), w) if err != nil { return err } diff --git a/ext/databricks-sdk-go b/ext/databricks-sdk-go index cf6bddd6..9a36314a 160000 --- a/ext/databricks-sdk-go +++ b/ext/databricks-sdk-go @@ -1 +1 @@ -Subproject commit cf6bddd603989710180c05887006f1958c46afaa +Subproject commit 9a36314ae27dd011abe0fefeedba018bb83cd90b diff --git a/git/git.go b/git/git.go index 9d800b91..f713dda1 100644 --- a/git/git.go +++ b/git/git.go @@ -9,8 +9,8 @@ import ( "strings" "github.com/databricks/bricks/folders" - "github.com/databricks/bricks/utilities" - "github.com/databricks/databricks-sdk-go/workspaces" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/repos" giturls "github.com/whilp/git-urls" "gopkg.in/ini.v1" ) @@ -82,8 +82,8 @@ func RepositoryName() (string, error) { return strings.TrimSuffix(base, ".git"), nil } -func RepoExists(remotePath string, ctx context.Context, wsc *workspaces.WorkspacesClient) (bool, error) { - repos, err := utilities.GetAllRepos(ctx, wsc, remotePath) +func RepoExists(remotePath string, ctx context.Context, w *databricks.WorkspaceClient) (bool, error) { + repos, err := w.Repos.ListAll(ctx, repos.ListRequest{}) if err != nil { return false, fmt.Errorf("could not get repos: %s", err) } diff --git a/internal/sync_test.go b/internal/sync_test.go index accf487e..0a40e608 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -15,9 +15,9 @@ import ( "github.com/databricks/bricks/cmd/sync" "github.com/databricks/bricks/folders" + "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/repos" "github.com/databricks/databricks-sdk-go/service/workspace" - "github.com/databricks/databricks-sdk-go/workspaces" "github.com/stretchr/testify/assert" ) @@ -39,7 +39,7 @@ func TestAccFullSync(t *testing.T) { t.Log("bricks repo location: : ", bricksRepo) assert.Equal(t, "bricks", filepath.Base(bricksRepo)) - wsc := workspaces.New() + wsc := databricks.Must(databricks.NewWorkspaceClient()) ctx := context.Background() me, err := wsc.CurrentUser.Me(ctx) assert.NoError(t, err) @@ -104,18 +104,18 @@ func TestAccFullSync(t *testing.T) { // First upload assertion assert.Eventually(t, func() bool { - repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) - return len(repoContent.Objects) == 3 + return len(objects) == 3 }, 30*time.Second, 5*time.Second) - repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) var files1 []string - for _, v := range repoContent.Objects { + for _, v := range objects { files1 = append(files1, filepath.Base(v.Path)) } assert.Len(t, files1, 3) @@ -127,18 +127,18 @@ func TestAccFullSync(t *testing.T) { os.Create(filepath.Join(projectDir, "hello.txt")) os.Create(filepath.Join(projectDir, "world.txt")) assert.Eventually(t, func() bool { - repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) - return len(repoContent.Objects) == 5 + return len(objects) == 5 }, 30*time.Second, 5*time.Second) - repoContent, err = wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) var files2 []string - for _, v := range repoContent.Objects { + for _, v := range objects { files2 = append(files2, filepath.Base(v.Path)) } assert.Len(t, files2, 5) @@ -151,18 +151,18 @@ func TestAccFullSync(t *testing.T) { // delete a file and assert os.Remove(filepath.Join(projectDir, "hello.txt")) assert.Eventually(t, func() bool { - repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) - return len(repoContent.Objects) == 4 + return len(objects) == 4 }, 30*time.Second, 5*time.Second) - repoContent, err = wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) var files3 []string - for _, v := range repoContent.Objects { + for _, v := range objects { files3 = append(files3, filepath.Base(v.Path)) } assert.Len(t, files3, 4) @@ -211,7 +211,7 @@ func TestAccIncrementalSync(t *testing.T) { t.Log("bricks repo location: : ", bricksRepo) assert.Equal(t, "bricks", filepath.Base(bricksRepo)) - wsc := workspaces.New() + wsc := databricks.Must(databricks.NewWorkspaceClient()) ctx := context.Background() me, err := wsc.CurrentUser.Me(ctx) assert.NoError(t, err) @@ -281,18 +281,18 @@ func TestAccIncrementalSync(t *testing.T) { // First upload assertion assert.Eventually(t, func() bool { - repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) - return len(repoContent.Objects) == 2 + return len(objects) == 2 }, 30*time.Second, 5*time.Second) - repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) var files1 []string - for _, v := range repoContent.Objects { + for _, v := range objects { files1 = append(files1, filepath.Base(v.Path)) } assert.Len(t, files1, 2) @@ -307,18 +307,18 @@ func TestAccIncrementalSync(t *testing.T) { // new file upload assertion assert.Eventually(t, func() bool { - repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) - return len(repoContent.Objects) == 3 + return len(objects) == 3 }, 30*time.Second, 5*time.Second) - repoContent, err = wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) var files2 []string - for _, v := range repoContent.Objects { + for _, v := range objects { files2 = append(files2, filepath.Base(v.Path)) } assert.Len(t, files2, 3) @@ -330,18 +330,18 @@ func TestAccIncrementalSync(t *testing.T) { // delete a file and assert os.Remove(filepath.Join(projectDir, ".gitkeep")) assert.Eventually(t, func() bool { - repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) - return len(repoContent.Objects) == 2 + return len(objects) == 2 }, 30*time.Second, 5*time.Second) - repoContent, err = wsc.Workspace.List(ctx, workspace.ListRequest{ + objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ Path: repoPath, }) assert.NoError(t, err) var files3 []string - for _, v := range repoContent.Objects { + for _, v := range objects { files3 = append(files3, filepath.Base(v.Path)) } assert.Len(t, files3, 2) diff --git a/project/project.go b/project/project.go index f6103ce9..0c9676bc 100644 --- a/project/project.go +++ b/project/project.go @@ -8,11 +8,9 @@ import ( "sync" "github.com/databricks/bricks/git" - "github.com/databricks/databricks-sdk-go/databricks" - "github.com/databricks/databricks-sdk-go/service/clusters" + "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/commands" "github.com/databricks/databricks-sdk-go/service/scim" - "github.com/databricks/databricks-sdk-go/workspaces" "github.com/spf13/cobra" ) @@ -26,7 +24,7 @@ type project struct { config *Config environment *Environment - wsc *workspaces.WorkspacesClient + wsc *databricks.WorkspaceClient me *scim.User fileSet *git.FileSet } @@ -96,7 +94,7 @@ func (p *project) initializeWorkspacesClient(ctx context.Context) { config.Profile = p.environment.Workspace.Profile } - p.wsc = workspaces.New(&config) + p.wsc = databricks.Must(databricks.NewWorkspaceClient(&config)) } // Get returns the project as configured on the context. @@ -110,7 +108,7 @@ func Get(ctx context.Context) *project { } // Make sure to initialize the workspaces client on project init -func (p *project) WorkspacesClient() *workspaces.WorkspacesClient { +func (p *project) WorkspacesClient() *databricks.WorkspaceClient { return p.wsc } @@ -183,22 +181,14 @@ func (p *project) DeploymentIsolationPrefix() string { } func getClusterIdFromClusterName(ctx context.Context, - wsc *workspaces.WorkspacesClient, + wsc *databricks.WorkspaceClient, clusterName string, ) (clusterId string, err error) { - clusterId = "" - clustersList, err := wsc.Clusters.List(ctx, clusters.ListRequest{}) + clusterInfo, err := wsc.Clusters.GetClusterInfoByClusterName(ctx, clusterName) if err != nil { - return + return "", err } - for _, cluster := range clustersList.Clusters { - if cluster.ClusterName == clusterName { - clusterId = cluster.ClusterId - return - } - } - err = fmt.Errorf("could not find cluster with name: %s", clusterName) - return + return clusterInfo.ClusterId, nil } // Old version of getting development cluster details with isolation implemented. diff --git a/utilities/dbfs.go b/utilities/dbfs.go index 985fac5a..26c772c6 100644 --- a/utilities/dbfs.go +++ b/utilities/dbfs.go @@ -6,22 +6,22 @@ import ( "encoding/base64" "fmt" + "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/dbfs" - "github.com/databricks/databricks-sdk-go/workspaces" ) // move to go sdk / replace with utility function once // https://github.com/databricks/databricks-sdk-go/issues/57 is Done // Tracked in https://github.com/databricks/bricks/issues/25 func CreateDbfsFile(ctx context.Context, - wsc *workspaces.WorkspacesClient, + w *databricks.WorkspaceClient, path string, contents []byte, overwrite bool, ) error { // see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#add-block const WRITE_BYTE_CHUNK_SIZE = 1e6 - createResponse, err := wsc.Dbfs.Create(ctx, + createResponse, err := w.Dbfs.Create(ctx, dbfs.Create{ Overwrite: overwrite, Path: path, @@ -38,7 +38,7 @@ func CreateDbfsFile(ctx context.Context, break } b64Data := base64.StdEncoding.EncodeToString(byteChunk) - err := wsc.Dbfs.AddBlock(ctx, + err := w.Dbfs.AddBlock(ctx, dbfs.AddBlock{ Data: b64Data, Handle: handle, @@ -48,7 +48,7 @@ func CreateDbfsFile(ctx context.Context, return fmt.Errorf("cannot add block: %w", err) } } - err = wsc.Dbfs.Close(ctx, + err = w.Dbfs.Close(ctx, dbfs.Close{ Handle: handle, }, @@ -60,7 +60,7 @@ func CreateDbfsFile(ctx context.Context, } func ReadDbfsFile(ctx context.Context, - wsc *workspaces.WorkspacesClient, + w *databricks.WorkspaceClient, path string, ) (content []byte, err error) { // see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#read @@ -69,7 +69,7 @@ func ReadDbfsFile(ctx context.Context, offSet := 0 length := int(READ_BYTE_CHUNK_SIZE) for fetchLoop { - dbfsReadReponse, err := wsc.Dbfs.Read(ctx, + dbfsReadReponse, err := w.Dbfs.Read(ctx, dbfs.ReadRequest{ Path: path, Offset: offSet, diff --git a/utilities/repos.go b/utilities/repos.go deleted file mode 100644 index 14807b54..00000000 --- a/utilities/repos.go +++ /dev/null @@ -1,29 +0,0 @@ -package utilities - -import ( - "context" - - "github.com/databricks/databricks-sdk-go/service/repos" - "github.com/databricks/databricks-sdk-go/workspaces" -) - -// Remove once this function is in go sdk -// https://github.com/databricks/databricks-sdk-go/issues/58 -// Tracked in : https://github.com/databricks/bricks/issues/26 -func GetAllRepos(ctx context.Context, wsc *workspaces.WorkspacesClient, pathPrefix string) (resultRepos []repos.RepoInfo, err error) { - nextPageToken := "" - for { - listReposResponse, err := wsc.Repos.List(ctx, repos.ListRequest{ - PathPrefix: pathPrefix, - NextPageToken: nextPageToken, - }) - if err != nil { - break - } - resultRepos = append(resultRepos, listReposResponse.Repos...) - if nextPageToken == "" { - break - } - } - return -}