From 487bf6fd5cc88ce191749abb7427fc0785bfa0d8 Mon Sep 17 00:00:00 2001 From: Serge Smertin <259697+nfx@users.noreply.github.com> Date: Thu, 1 Dec 2022 12:17:36 +0100 Subject: [PATCH] Use Databricks Go SDK v0.1.0 (#110) This PR pins the version of Databricks SDK for Go to v0.1.0 --- ext/databricks-sdk-go | 1 - git/git.go | 2 +- go.mod | 4 +- go.sum | 2 + internal/sync_test.go | 24 +++++------ project/project.go | 2 +- python/wheel.go | 13 +----- utilities/dbfs.go | 93 ------------------------------------------- 8 files changed, 18 insertions(+), 123 deletions(-) delete mode 160000 ext/databricks-sdk-go delete mode 100644 utilities/dbfs.go diff --git a/ext/databricks-sdk-go b/ext/databricks-sdk-go deleted file mode 160000 index 9a36314a..00000000 --- a/ext/databricks-sdk-go +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 9a36314ae27dd011abe0fefeedba018bb83cd90b diff --git a/git/git.go b/git/git.go index f713dda1..33d322fa 100644 --- a/git/git.go +++ b/git/git.go @@ -83,7 +83,7 @@ func RepositoryName() (string, error) { } func RepoExists(remotePath string, ctx context.Context, w *databricks.WorkspaceClient) (bool, error) { - repos, err := w.Repos.ListAll(ctx, repos.ListRequest{}) + repos, err := w.Repos.ListAll(ctx, repos.List{}) if err != nil { return false, fmt.Errorf("could not get repos: %s", err) } diff --git a/go.mod b/go.mod index 920467e3..5a41cbf8 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/atotto/clipboard v0.1.4 - github.com/databricks/databricks-sdk-go v0.0.0 + github.com/databricks/databricks-sdk-go v0.1.0 github.com/ghodss/yaml v1.0.0 // MIT + NOTICE github.com/manifoldco/promptui v0.9.0 // BSD-3-Clause license github.com/mitchellh/go-homedir v1.1.0 // MIT @@ -51,5 +51,3 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/databricks/databricks-sdk-go v0.0.0 => ./ext/databricks-sdk-go diff --git a/go.sum b/go.sum index b6818512..260960b9 100644 --- a/go.sum +++ b/go.sum @@ -17,6 +17,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/databricks/databricks-sdk-go v0.1.0 h1:DvIBY1Ir0yknodV4si9vV0geMRhSa/+qG9o5CRTp8pM= +github.com/databricks/databricks-sdk-go v0.1.0/go.mod h1:n4we8UoagEFH0VQuxkhQ3WHdvG0S164NYxmX6pMrW98= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/internal/sync_test.go b/internal/sync_test.go index 0a40e608..b42be3e8 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -104,13 +104,13 @@ func TestAccFullSync(t *testing.T) { // First upload assertion assert.Eventually(t, func() bool { - objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) return len(objects) == 3 }, 30*time.Second, 5*time.Second) - objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) @@ -127,13 +127,13 @@ func TestAccFullSync(t *testing.T) { os.Create(filepath.Join(projectDir, "hello.txt")) os.Create(filepath.Join(projectDir, "world.txt")) assert.Eventually(t, func() bool { - objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) return len(objects) == 5 }, 30*time.Second, 5*time.Second) - objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err = wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) @@ -151,13 +151,13 @@ func TestAccFullSync(t *testing.T) { // delete a file and assert os.Remove(filepath.Join(projectDir, "hello.txt")) assert.Eventually(t, func() bool { - objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) return len(objects) == 4 }, 30*time.Second, 5*time.Second) - objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err = wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) @@ -281,13 +281,13 @@ func TestAccIncrementalSync(t *testing.T) { // First upload assertion assert.Eventually(t, func() bool { - objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) return len(objects) == 2 }, 30*time.Second, 5*time.Second) - objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) @@ -307,13 +307,13 @@ func TestAccIncrementalSync(t *testing.T) { // new file upload assertion assert.Eventually(t, func() bool { - objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) return len(objects) == 3 }, 30*time.Second, 5*time.Second) - objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err = wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) @@ -330,13 +330,13 @@ func TestAccIncrementalSync(t *testing.T) { // delete a file and assert os.Remove(filepath.Join(projectDir, ".gitkeep")) assert.Eventually(t, func() bool { - objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err := wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) return len(objects) == 2 }, 30*time.Second, 5*time.Second) - objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ + objects, err = wsc.Workspace.ListAll(ctx, workspace.List{ Path: repoPath, }) assert.NoError(t, err) diff --git a/project/project.go b/project/project.go index 0c9676bc..00b93935 100644 --- a/project/project.go +++ b/project/project.go @@ -184,7 +184,7 @@ func getClusterIdFromClusterName(ctx context.Context, wsc *databricks.WorkspaceClient, clusterName string, ) (clusterId string, err error) { - clusterInfo, err := wsc.Clusters.GetClusterInfoByClusterName(ctx, clusterName) + clusterInfo, err := wsc.Clusters.GetByClusterName(ctx, clusterName) if err != nil { return "", err } diff --git a/python/wheel.go b/python/wheel.go index 61e2a792..c6c8942f 100644 --- a/python/wheel.go +++ b/python/wheel.go @@ -3,14 +3,12 @@ package python import ( "context" "fmt" - "io" "log" "os" "path" "strings" "github.com/databricks/bricks/project" - "github.com/databricks/bricks/utilities" ) func BuildWheel(ctx context.Context, dir string) (string, error) { @@ -69,17 +67,8 @@ func UploadWheelToDBFSWithPEP503(ctx context.Context, dir string) (string, error return "", err } defer wf.Close() - raw, err := io.ReadAll(wf) - if err != nil { - return "", err - } // err = dbfs.Create(dbfsLoc, raw, true) - err = utilities.CreateDbfsFile(ctx, - wsc, - dbfsLoc, - raw, - true, - ) + err = wsc.Dbfs.Overwrite(ctx, dbfsLoc, wf) // TODO: maintain PEP503 compliance and update meta-files: // ${DBFSWheelLocation}/index.html and ${DBFSWheelLocation}/${NormalizedName}/index.html return dbfsLoc, err diff --git a/utilities/dbfs.go b/utilities/dbfs.go deleted file mode 100644 index 26c772c6..00000000 --- a/utilities/dbfs.go +++ /dev/null @@ -1,93 +0,0 @@ -package utilities - -import ( - "bytes" - "context" - "encoding/base64" - "fmt" - - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/service/dbfs" -) - -// move to go sdk / replace with utility function once -// https://github.com/databricks/databricks-sdk-go/issues/57 is Done -// Tracked in https://github.com/databricks/bricks/issues/25 -func CreateDbfsFile(ctx context.Context, - w *databricks.WorkspaceClient, - path string, - contents []byte, - overwrite bool, -) error { - // see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#add-block - const WRITE_BYTE_CHUNK_SIZE = 1e6 - createResponse, err := w.Dbfs.Create(ctx, - dbfs.Create{ - Overwrite: overwrite, - Path: path, - }, - ) - if err != nil { - return err - } - handle := createResponse.Handle - buffer := bytes.NewBuffer(contents) - for { - byteChunk := buffer.Next(WRITE_BYTE_CHUNK_SIZE) - if len(byteChunk) == 0 { - break - } - b64Data := base64.StdEncoding.EncodeToString(byteChunk) - err := w.Dbfs.AddBlock(ctx, - dbfs.AddBlock{ - Data: b64Data, - Handle: handle, - }, - ) - if err != nil { - return fmt.Errorf("cannot add block: %w", err) - } - } - err = w.Dbfs.Close(ctx, - dbfs.Close{ - Handle: handle, - }, - ) - if err != nil { - return fmt.Errorf("cannot close handle: %w", err) - } - return nil -} - -func ReadDbfsFile(ctx context.Context, - w *databricks.WorkspaceClient, - path string, -) (content []byte, err error) { - // see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#read - const READ_BYTE_CHUNK_SIZE = 1e6 - fetchLoop := true - offSet := 0 - length := int(READ_BYTE_CHUNK_SIZE) - for fetchLoop { - dbfsReadReponse, err := w.Dbfs.Read(ctx, - dbfs.ReadRequest{ - Path: path, - Offset: offSet, - Length: length, - }, - ) - if err != nil { - return content, fmt.Errorf("cannot read %s: %w", path, err) - } - if dbfsReadReponse.BytesRead == 0 || dbfsReadReponse.BytesRead < int64(length) { - fetchLoop = false - } - decodedBytes, err := base64.StdEncoding.DecodeString(dbfsReadReponse.Data) - if err != nil { - return content, err - } - content = append(content, decodedBytes...) - offSet += length - } - return content, err -}