Use Databricks Go SDK v0.1.0 (#110)

This PR pins the version of Databricks SDK for Go to v0.1.0
This commit is contained in:
Serge Smertin 2022-12-01 12:17:36 +01:00 committed by GitHub
parent 34af98a8c3
commit 487bf6fd5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 18 additions and 123 deletions

@ -1 +0,0 @@
Subproject commit 9a36314ae27dd011abe0fefeedba018bb83cd90b

View File

@ -83,7 +83,7 @@ func RepositoryName() (string, error) {
} }
func RepoExists(remotePath string, ctx context.Context, w *databricks.WorkspaceClient) (bool, error) { func RepoExists(remotePath string, ctx context.Context, w *databricks.WorkspaceClient) (bool, error) {
repos, err := w.Repos.ListAll(ctx, repos.ListRequest{}) repos, err := w.Repos.ListAll(ctx, repos.List{})
if err != nil { if err != nil {
return false, fmt.Errorf("could not get repos: %s", err) return false, fmt.Errorf("could not get repos: %s", err)
} }

4
go.mod
View File

@ -4,7 +4,7 @@ go 1.18
require ( require (
github.com/atotto/clipboard v0.1.4 github.com/atotto/clipboard v0.1.4
github.com/databricks/databricks-sdk-go v0.0.0 github.com/databricks/databricks-sdk-go v0.1.0
github.com/ghodss/yaml v1.0.0 // MIT + NOTICE github.com/ghodss/yaml v1.0.0 // MIT + NOTICE
github.com/manifoldco/promptui v0.9.0 // BSD-3-Clause license github.com/manifoldco/promptui v0.9.0 // BSD-3-Clause license
github.com/mitchellh/go-homedir v1.1.0 // MIT github.com/mitchellh/go-homedir v1.1.0 // MIT
@ -51,5 +51,3 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )
replace github.com/databricks/databricks-sdk-go v0.0.0 => ./ext/databricks-sdk-go

2
go.sum
View File

@ -17,6 +17,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/databricks/databricks-sdk-go v0.1.0 h1:DvIBY1Ir0yknodV4si9vV0geMRhSa/+qG9o5CRTp8pM=
github.com/databricks/databricks-sdk-go v0.1.0/go.mod h1:n4we8UoagEFH0VQuxkhQ3WHdvG0S164NYxmX6pMrW98=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@ -104,13 +104,13 @@ func TestAccFullSync(t *testing.T) {
// First upload assertion // First upload assertion
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(objects) == 3 return len(objects) == 3
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
@ -127,13 +127,13 @@ func TestAccFullSync(t *testing.T) {
os.Create(filepath.Join(projectDir, "hello.txt")) os.Create(filepath.Join(projectDir, "hello.txt"))
os.Create(filepath.Join(projectDir, "world.txt")) os.Create(filepath.Join(projectDir, "world.txt"))
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(objects) == 5 return len(objects) == 5
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err = wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
@ -151,13 +151,13 @@ func TestAccFullSync(t *testing.T) {
// delete a file and assert // delete a file and assert
os.Remove(filepath.Join(projectDir, "hello.txt")) os.Remove(filepath.Join(projectDir, "hello.txt"))
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(objects) == 4 return len(objects) == 4
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err = wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
@ -281,13 +281,13 @@ func TestAccIncrementalSync(t *testing.T) {
// First upload assertion // First upload assertion
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(objects) == 2 return len(objects) == 2
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
@ -307,13 +307,13 @@ func TestAccIncrementalSync(t *testing.T) {
// new file upload assertion // new file upload assertion
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(objects) == 3 return len(objects) == 3
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err = wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
@ -330,13 +330,13 @@ func TestAccIncrementalSync(t *testing.T) {
// delete a file and assert // delete a file and assert
os.Remove(filepath.Join(projectDir, ".gitkeep")) os.Remove(filepath.Join(projectDir, ".gitkeep"))
assert.Eventually(t, func() bool { assert.Eventually(t, func() bool {
objects, err := wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err := wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)
return len(objects) == 2 return len(objects) == 2
}, 30*time.Second, 5*time.Second) }, 30*time.Second, 5*time.Second)
objects, err = wsc.Workspace.ListAll(ctx, workspace.ListRequest{ objects, err = wsc.Workspace.ListAll(ctx, workspace.List{
Path: repoPath, Path: repoPath,
}) })
assert.NoError(t, err) assert.NoError(t, err)

View File

@ -184,7 +184,7 @@ func getClusterIdFromClusterName(ctx context.Context,
wsc *databricks.WorkspaceClient, wsc *databricks.WorkspaceClient,
clusterName string, clusterName string,
) (clusterId string, err error) { ) (clusterId string, err error) {
clusterInfo, err := wsc.Clusters.GetClusterInfoByClusterName(ctx, clusterName) clusterInfo, err := wsc.Clusters.GetByClusterName(ctx, clusterName)
if err != nil { if err != nil {
return "", err return "", err
} }

View File

@ -3,14 +3,12 @@ package python
import ( import (
"context" "context"
"fmt" "fmt"
"io"
"log" "log"
"os" "os"
"path" "path"
"strings" "strings"
"github.com/databricks/bricks/project" "github.com/databricks/bricks/project"
"github.com/databricks/bricks/utilities"
) )
func BuildWheel(ctx context.Context, dir string) (string, error) { func BuildWheel(ctx context.Context, dir string) (string, error) {
@ -69,17 +67,8 @@ func UploadWheelToDBFSWithPEP503(ctx context.Context, dir string) (string, error
return "", err return "", err
} }
defer wf.Close() defer wf.Close()
raw, err := io.ReadAll(wf)
if err != nil {
return "", err
}
// err = dbfs.Create(dbfsLoc, raw, true) // err = dbfs.Create(dbfsLoc, raw, true)
err = utilities.CreateDbfsFile(ctx, err = wsc.Dbfs.Overwrite(ctx, dbfsLoc, wf)
wsc,
dbfsLoc,
raw,
true,
)
// TODO: maintain PEP503 compliance and update meta-files: // TODO: maintain PEP503 compliance and update meta-files:
// ${DBFSWheelLocation}/index.html and ${DBFSWheelLocation}/${NormalizedName}/index.html // ${DBFSWheelLocation}/index.html and ${DBFSWheelLocation}/${NormalizedName}/index.html
return dbfsLoc, err return dbfsLoc, err

View File

@ -1,93 +0,0 @@
package utilities
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/dbfs"
)
// move to go sdk / replace with utility function once
// https://github.com/databricks/databricks-sdk-go/issues/57 is Done
// Tracked in https://github.com/databricks/bricks/issues/25
func CreateDbfsFile(ctx context.Context,
w *databricks.WorkspaceClient,
path string,
contents []byte,
overwrite bool,
) error {
// see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#add-block
const WRITE_BYTE_CHUNK_SIZE = 1e6
createResponse, err := w.Dbfs.Create(ctx,
dbfs.Create{
Overwrite: overwrite,
Path: path,
},
)
if err != nil {
return err
}
handle := createResponse.Handle
buffer := bytes.NewBuffer(contents)
for {
byteChunk := buffer.Next(WRITE_BYTE_CHUNK_SIZE)
if len(byteChunk) == 0 {
break
}
b64Data := base64.StdEncoding.EncodeToString(byteChunk)
err := w.Dbfs.AddBlock(ctx,
dbfs.AddBlock{
Data: b64Data,
Handle: handle,
},
)
if err != nil {
return fmt.Errorf("cannot add block: %w", err)
}
}
err = w.Dbfs.Close(ctx,
dbfs.Close{
Handle: handle,
},
)
if err != nil {
return fmt.Errorf("cannot close handle: %w", err)
}
return nil
}
func ReadDbfsFile(ctx context.Context,
w *databricks.WorkspaceClient,
path string,
) (content []byte, err error) {
// see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#read
const READ_BYTE_CHUNK_SIZE = 1e6
fetchLoop := true
offSet := 0
length := int(READ_BYTE_CHUNK_SIZE)
for fetchLoop {
dbfsReadReponse, err := w.Dbfs.Read(ctx,
dbfs.ReadRequest{
Path: path,
Offset: offSet,
Length: length,
},
)
if err != nil {
return content, fmt.Errorf("cannot read %s: %w", path, err)
}
if dbfsReadReponse.BytesRead == 0 || dbfsReadReponse.BytesRead < int64(length) {
fetchLoop = false
}
decodedBytes, err := base64.StdEncoding.DecodeString(dbfsReadReponse.Data)
if err != nil {
return content, err
}
content = append(content, decodedBytes...)
offSet += length
}
return content, err
}