Update databricks-sdk-go to latest (#102)

This commit is contained in:
Pieter Noordhuis 2022-11-24 21:41:57 +01:00 committed by GitHub
parent 07f07694a4
commit 8e786d76a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 70 additions and 105 deletions

View File

@ -7,7 +7,7 @@ import (
"github.com/databricks/bricks/bundle/config" "github.com/databricks/bricks/bundle/config"
"github.com/databricks/bricks/bundle/config/mutator" "github.com/databricks/bricks/bundle/config/mutator"
"github.com/databricks/databricks-sdk-go/workspaces" "github.com/databricks/databricks-sdk-go"
) )
type Bundle struct { type Bundle struct {
@ -16,7 +16,7 @@ type Bundle struct {
// Store a pointer to the workspace client. // Store a pointer to the workspace client.
// It can be initialized on demand after loading the configuration. // It can be initialized on demand after loading the configuration.
clientOnce sync.Once clientOnce sync.Once
client *workspaces.WorkspacesClient client *databricks.WorkspaceClient
} }
func (b *Bundle) MutateForEnvironment(env string) error { func (b *Bundle) MutateForEnvironment(env string) error {
@ -59,9 +59,13 @@ func ConfigureForEnvironment(ctx context.Context, env string) (context.Context,
return Context(ctx, b), nil return Context(ctx, b), nil
} }
func (b *Bundle) WorkspaceClient() *workspaces.WorkspacesClient { func (b *Bundle) WorkspaceClient() *databricks.WorkspaceClient {
b.clientOnce.Do(func() { b.clientOnce.Do(func() {
b.client = b.Config.Workspace.Client() var err error
b.client, err = b.Config.Workspace.Client()
if err != nil {
panic(err)
}
}) })
return b.client return b.client
} }

View File

@ -1,8 +1,7 @@
package config package config
import ( import (
"github.com/databricks/databricks-sdk-go/databricks" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/workspaces"
) )
// Workspace defines configurables at the workspace level. // Workspace defines configurables at the workspace level.
@ -31,7 +30,7 @@ type Workspace struct {
AzureLoginAppID string `json:"azure_login_app_id,omitempty"` AzureLoginAppID string `json:"azure_login_app_id,omitempty"`
} }
func (w *Workspace) Client() *workspaces.WorkspacesClient { func (w *Workspace) Client() (*databricks.WorkspaceClient, error) {
config := databricks.Config{ config := databricks.Config{
// Generic // Generic
Host: w.Host, Host: w.Host,
@ -49,5 +48,5 @@ func (w *Workspace) Client() *workspaces.WorkspacesClient {
AzureLoginAppID: w.AzureLoginAppID, AzureLoginAppID: w.AzureLoginAppID,
} }
return workspaces.New(&config) return databricks.NewWorkspaceClient(&config)
} }

View File

@ -8,8 +8,8 @@ import (
"strings" "strings"
"github.com/databricks/bricks/cmd/root" "github.com/databricks/bricks/cmd/root"
"github.com/databricks/databricks-sdk-go/databricks" "github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/databricks/client" "github.com/databricks/databricks-sdk-go/config"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -52,7 +52,7 @@ func makeCommand(method string) *cobra.Command {
return err return err
} }
api, err := client.New(&databricks.Config{}) api, err := client.New(&config.Config{})
if err != nil { if err != nil {
return err return err
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"net/http"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
@ -13,10 +14,10 @@ import (
"time" "time"
"github.com/databricks/bricks/project" "github.com/databricks/bricks/project"
"github.com/databricks/databricks-sdk-go/databricks/apierr" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/databricks/client" "github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/client"
"github.com/databricks/databricks-sdk-go/service/workspace" "github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/databricks/databricks-sdk-go/workspaces"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
@ -57,12 +58,12 @@ func putFile(ctx context.Context, path string, content io.Reader) error {
apiPath := fmt.Sprintf( apiPath := fmt.Sprintf(
"/api/2.0/workspace-files/import-file/%s?overwrite=true", "/api/2.0/workspace-files/import-file/%s?overwrite=true",
strings.TrimLeft(path, "/")) strings.TrimLeft(path, "/"))
return apiClient.Post(ctx, apiPath, content, nil) return apiClient.Do(ctx, http.MethodPost, apiPath, content, nil)
} }
// path: The remote path of the file in the workspace // path: The remote path of the file in the workspace
func deleteFile(ctx context.Context, path string, wsc *workspaces.WorkspacesClient) error { func deleteFile(ctx context.Context, path string, w *databricks.WorkspaceClient) error {
err := wsc.Workspace.Delete(ctx, err := w.Workspace.Delete(ctx,
workspace.Delete{ workspace.Delete{
Path: path, Path: path,
Recursive: true, Recursive: true,
@ -78,7 +79,7 @@ func deleteFile(ctx context.Context, path string, wsc *workspaces.WorkspacesClie
return err return err
} }
func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, wsc *workspaces.WorkspacesClient) func(localDiff diff) error { func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, w *databricks.WorkspaceClient) func(localDiff diff) error {
return func(d diff) error { return func(d diff) error {
// Abstraction over wait groups which allows you to get the errors // Abstraction over wait groups which allows you to get the errors
@ -95,7 +96,7 @@ func getRemoteSyncCallback(ctx context.Context, root, remoteDir string, wsc *wor
// is evaluated // is evaluated
remoteNameCopy := remoteName remoteNameCopy := remoteName
g.Go(func() error { g.Go(func() error {
err := deleteFile(ctx, path.Join(remoteDir, remoteNameCopy), wsc) err := deleteFile(ctx, path.Join(remoteDir, remoteNameCopy), w)
if err != nil { if err != nil {
return err return err
} }

@ -1 +1 @@
Subproject commit cf6bddd603989710180c05887006f1958c46afaa Subproject commit 9a36314ae27dd011abe0fefeedba018bb83cd90b

View File

@ -9,8 +9,8 @@ import (
"strings" "strings"
"github.com/databricks/bricks/folders" "github.com/databricks/bricks/folders"
"github.com/databricks/bricks/utilities" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/workspaces" "github.com/databricks/databricks-sdk-go/service/repos"
giturls "github.com/whilp/git-urls" giturls "github.com/whilp/git-urls"
"gopkg.in/ini.v1" "gopkg.in/ini.v1"
) )
@ -82,8 +82,8 @@ func RepositoryName() (string, error) {
return strings.TrimSuffix(base, ".git"), nil return strings.TrimSuffix(base, ".git"), nil
} }
func RepoExists(remotePath string, ctx context.Context, wsc *workspaces.WorkspacesClient) (bool, error) { func RepoExists(remotePath string, ctx context.Context, w *databricks.WorkspaceClient) (bool, error) {
repos, err := utilities.GetAllRepos(ctx, wsc, remotePath) repos, err := w.Repos.ListAll(ctx, repos.ListRequest{})
if err != nil { if err != nil {
return false, fmt.Errorf("could not get repos: %s", err) return false, fmt.Errorf("could not get repos: %s", err)
} }

View File

@ -15,9 +15,9 @@ import (
"github.com/databricks/bricks/cmd/sync" "github.com/databricks/bricks/cmd/sync"
"github.com/databricks/bricks/folders" "github.com/databricks/bricks/folders"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/repos" "github.com/databricks/databricks-sdk-go/service/repos"
"github.com/databricks/databricks-sdk-go/service/workspace" "github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/databricks/databricks-sdk-go/workspaces"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -39,7 +39,7 @@ func TestAccFullSync(t *testing.T) {
t.Log("bricks repo location: : ", bricksRepo) t.Log("bricks repo location: : ", bricksRepo)
assert.Equal(t, "bricks", filepath.Base(bricksRepo)) assert.Equal(t, "bricks", filepath.Base(bricksRepo))
wsc := workspaces.New() wsc := databricks.Must(databricks.NewWorkspaceClient())
ctx := context.Background() ctx := context.Background()
me, err := wsc.CurrentUser.Me(ctx) me, err := wsc.CurrentUser.Me(ctx)
assert.NoError(t, err) assert.NoError(t, err)
@ -104,18 +104,18 @@ func TestAccFullSync(t *testing.T) {
// First upload assertion // First upload assertion
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(repoContent.Objects) == 3 return len(objects) == 3
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
var files1 []string var files1 []string
for _, v := range repoContent.Objects { for _, v := range objects {
files1 = append(files1, filepath.Base(v.Path)) files1 = append(files1, filepath.Base(v.Path))
} }
assert.Len(t, files1, 3) assert.Len(t, files1, 3)
@ -127,18 +127,18 @@ func TestAccFullSync(t *testing.T) {
os.Create(filepath.Join(projectDir, "hello.txt")) os.Create(filepath.Join(projectDir, "hello.txt"))
os.Create(filepath.Join(projectDir, "world.txt")) os.Create(filepath.Join(projectDir, "world.txt"))
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(repoContent.Objects) == 5 return len(objects) == 5
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
repoContent, err = wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
var files2 []string var files2 []string
for _, v := range repoContent.Objects { for _, v := range objects {
files2 = append(files2, filepath.Base(v.Path)) files2 = append(files2, filepath.Base(v.Path))
} }
assert.Len(t, files2, 5) assert.Len(t, files2, 5)
@ -151,18 +151,18 @@ func TestAccFullSync(t *testing.T) {
// delete a file and assert // delete a file and assert
os.Remove(filepath.Join(projectDir, "hello.txt")) os.Remove(filepath.Join(projectDir, "hello.txt"))
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(repoContent.Objects) == 4 return len(objects) == 4
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
repoContent, err = wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
var files3 []string var files3 []string
for _, v := range repoContent.Objects { for _, v := range objects {
files3 = append(files3, filepath.Base(v.Path)) files3 = append(files3, filepath.Base(v.Path))
} }
assert.Len(t, files3, 4) assert.Len(t, files3, 4)
@ -211,7 +211,7 @@ func TestAccIncrementalSync(t *testing.T) {
t.Log("bricks repo location: : ", bricksRepo) t.Log("bricks repo location: : ", bricksRepo)
assert.Equal(t, "bricks", filepath.Base(bricksRepo)) assert.Equal(t, "bricks", filepath.Base(bricksRepo))
wsc := workspaces.New() wsc := databricks.Must(databricks.NewWorkspaceClient())
ctx := context.Background() ctx := context.Background()
me, err := wsc.CurrentUser.Me(ctx) me, err := wsc.CurrentUser.Me(ctx)
assert.NoError(t, err) assert.NoError(t, err)
@ -281,18 +281,18 @@ func TestAccIncrementalSync(t *testing.T) {
// First upload assertion // First upload assertion
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(repoContent.Objects) == 2 return len(objects) == 2
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
var files1 []string var files1 []string
for _, v := range repoContent.Objects { for _, v := range objects {
files1 = append(files1, filepath.Base(v.Path)) files1 = append(files1, filepath.Base(v.Path))
} }
assert.Len(t, files1, 2) assert.Len(t, files1, 2)
@ -307,18 +307,18 @@ func TestAccIncrementalSync(t *testing.T) {
// new file upload assertion // new file upload assertion
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(repoContent.Objects) == 3 return len(objects) == 3
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
repoContent, err = wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
var files2 []string var files2 []string
for _, v := range repoContent.Objects { for _, v := range objects {
files2 = append(files2, filepath.Base(v.Path)) files2 = append(files2, filepath.Base(v.Path))
} }
assert.Len(t, files2, 3) assert.Len(t, files2, 3)
@ -330,18 +330,18 @@ func TestAccIncrementalSync(t *testing.T) {
// delete a file and assert // delete a file and assert
os.Remove(filepath.Join(projectDir, ".gitkeep")) os.Remove(filepath.Join(projectDir, ".gitkeep"))
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
repoContent, err := wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(repoContent.Objects) == 2 return len(objects) == 2
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
repoContent, err = wsc.Workspace.List(ctx, workspace.ListRequest{ objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
var files3 []string var files3 []string
for _, v := range repoContent.Objects { for _, v := range objects {
files3 = append(files3, filepath.Base(v.Path)) files3 = append(files3, filepath.Base(v.Path))
} }
assert.Len(t, files3, 2) assert.Len(t, files3, 2)

View File

@ -8,11 +8,9 @@ import (
"sync" "sync"
"github.com/databricks/bricks/git" "github.com/databricks/bricks/git"
"github.com/databricks/databricks-sdk-go/databricks" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/clusters"
"github.com/databricks/databricks-sdk-go/service/commands" "github.com/databricks/databricks-sdk-go/service/commands"
"github.com/databricks/databricks-sdk-go/service/scim" "github.com/databricks/databricks-sdk-go/service/scim"
"github.com/databricks/databricks-sdk-go/workspaces"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -26,7 +24,7 @@ type project struct {
config *Config config *Config
environment *Environment environment *Environment
wsc *workspaces.WorkspacesClient wsc *databricks.WorkspaceClient
me *scim.User me *scim.User
fileSet *git.FileSet fileSet *git.FileSet
} }
@ -96,7 +94,7 @@ func (p *project) initializeWorkspacesClient(ctx context.Context) {
config.Profile = p.environment.Workspace.Profile config.Profile = p.environment.Workspace.Profile
} }
p.wsc = workspaces.New(&config) p.wsc = databricks.Must(databricks.NewWorkspaceClient(&config))
} }
// Get returns the project as configured on the context. // Get returns the project as configured on the context.
@ -110,7 +108,7 @@ func Get(ctx context.Context) *project {
} }
// Make sure to initialize the workspaces client on project init // Make sure to initialize the workspaces client on project init
func (p *project) WorkspacesClient() *workspaces.WorkspacesClient { func (p *project) WorkspacesClient() *databricks.WorkspaceClient {
return p.wsc return p.wsc
} }
@ -183,22 +181,14 @@ func (p *project) DeploymentIsolationPrefix() string {
} }
func getClusterIdFromClusterName(ctx context.Context, func getClusterIdFromClusterName(ctx context.Context,
wsc *workspaces.WorkspacesClient, wsc *databricks.WorkspaceClient,
clusterName string, clusterName string,
) (clusterId string, err error) { ) (clusterId string, err error) {
clusterId = "" clusterInfo, err := wsc.Clusters.GetClusterInfoByClusterName(ctx, clusterName)
clustersList, err := wsc.Clusters.List(ctx, clusters.ListRequest{})
if err != nil { if err != nil {
return return "", err
} }
for _, cluster := range clustersList.Clusters { return clusterInfo.ClusterId, nil
if cluster.ClusterName == clusterName {
clusterId = cluster.ClusterId
return
}
}
err = fmt.Errorf("could not find cluster with name: %s", clusterName)
return
} }
// Old version of getting development cluster details with isolation implemented. // Old version of getting development cluster details with isolation implemented.

View File

@ -6,22 +6,22 @@ import (
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/dbfs" "github.com/databricks/databricks-sdk-go/service/dbfs"
"github.com/databricks/databricks-sdk-go/workspaces"
) )
// move to go sdk / replace with utility function once // move to go sdk / replace with utility function once
// https://github.com/databricks/databricks-sdk-go/issues/57 is Done // https://github.com/databricks/databricks-sdk-go/issues/57 is Done
// Tracked in https://github.com/databricks/bricks/issues/25 // Tracked in https://github.com/databricks/bricks/issues/25
func CreateDbfsFile(ctx context.Context, func CreateDbfsFile(ctx context.Context,
wsc *workspaces.WorkspacesClient, w *databricks.WorkspaceClient,
path string, path string,
contents []byte, contents []byte,
overwrite bool, overwrite bool,
) error { ) error {
// see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#add-block // see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#add-block
const WRITE_BYTE_CHUNK_SIZE = 1e6 const WRITE_BYTE_CHUNK_SIZE = 1e6
createResponse, err := wsc.Dbfs.Create(ctx, createResponse, err := w.Dbfs.Create(ctx,
dbfs.Create{ dbfs.Create{
Overwrite: overwrite, Overwrite: overwrite,
Path: path, Path: path,
@ -38,7 +38,7 @@ func CreateDbfsFile(ctx context.Context,
break break
} }
b64Data := base64.StdEncoding.EncodeToString(byteChunk) b64Data := base64.StdEncoding.EncodeToString(byteChunk)
err := wsc.Dbfs.AddBlock(ctx, err := w.Dbfs.AddBlock(ctx,
dbfs.AddBlock{ dbfs.AddBlock{
Data: b64Data, Data: b64Data,
Handle: handle, Handle: handle,
@ -48,7 +48,7 @@ func CreateDbfsFile(ctx context.Context,
return fmt.Errorf("cannot add block: %w", err) return fmt.Errorf("cannot add block: %w", err)
} }
} }
err = wsc.Dbfs.Close(ctx, err = w.Dbfs.Close(ctx,
dbfs.Close{ dbfs.Close{
Handle: handle, Handle: handle,
}, },
@ -60,7 +60,7 @@ func CreateDbfsFile(ctx context.Context,
} }
func ReadDbfsFile(ctx context.Context, func ReadDbfsFile(ctx context.Context,
wsc *workspaces.WorkspacesClient, w *databricks.WorkspaceClient,
path string, path string,
) (content []byte, err error) { ) (content []byte, err error) {
// see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#read // see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#read
@ -69,7 +69,7 @@ func ReadDbfsFile(ctx context.Context,
offSet := 0 offSet := 0
length := int(READ_BYTE_CHUNK_SIZE) length := int(READ_BYTE_CHUNK_SIZE)
for fetchLoop { for fetchLoop {
dbfsReadReponse, err := wsc.Dbfs.Read(ctx, dbfsReadReponse, err := w.Dbfs.Read(ctx,
dbfs.ReadRequest{ dbfs.ReadRequest{
Path: path, Path: path,
Offset: offSet, Offset: offSet,

View File

@ -1,29 +0,0 @@
package utilities
import (
"context"
"github.com/databricks/databricks-sdk-go/service/repos"
"github.com/databricks/databricks-sdk-go/workspaces"
)
// Remove once this function is in go sdk
// https://github.com/databricks/databricks-sdk-go/issues/58
// Tracked in : https://github.com/databricks/bricks/issues/26
func GetAllRepos(ctx context.Context, wsc *workspaces.WorkspacesClient, pathPrefix string) (resultRepos []repos.RepoInfo, err error) {
nextPageToken := ""
for {
listReposResponse, err := wsc.Repos.List(ctx, repos.ListRequest{
PathPrefix: pathPrefix,
NextPageToken: nextPageToken,
})
if err != nil {
break
}
resultRepos = append(resultRepos, listReposResponse.Repos...)
if nextPageToken == "" {
break
}
}
return
}