2022-09-07 09:55:59 +00:00
|
|
|
package utilities
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"encoding/base64"
|
|
|
|
"fmt"
|
|
|
|
|
2022-11-24 20:41:57 +00:00
|
|
|
"github.com/databricks/databricks-sdk-go"
|
2022-09-07 09:55:59 +00:00
|
|
|
"github.com/databricks/databricks-sdk-go/service/dbfs"
|
|
|
|
)
|
|
|
|
|
|
|
|
// move to go sdk / replace with utility function once
|
|
|
|
// https://github.com/databricks/databricks-sdk-go/issues/57 is Done
|
|
|
|
// Tracked in https://github.com/databricks/bricks/issues/25
|
|
|
|
func CreateDbfsFile(ctx context.Context,
|
2022-11-24 20:41:57 +00:00
|
|
|
w *databricks.WorkspaceClient,
|
2022-09-07 09:55:59 +00:00
|
|
|
path string,
|
|
|
|
contents []byte,
|
|
|
|
overwrite bool,
|
|
|
|
) error {
|
|
|
|
// see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#add-block
|
|
|
|
const WRITE_BYTE_CHUNK_SIZE = 1e6
|
2022-11-24 20:41:57 +00:00
|
|
|
createResponse, err := w.Dbfs.Create(ctx,
|
2022-09-27 16:58:55 +00:00
|
|
|
dbfs.Create{
|
2022-09-07 09:55:59 +00:00
|
|
|
Overwrite: overwrite,
|
|
|
|
Path: path,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
handle := createResponse.Handle
|
|
|
|
buffer := bytes.NewBuffer(contents)
|
|
|
|
for {
|
|
|
|
byteChunk := buffer.Next(WRITE_BYTE_CHUNK_SIZE)
|
|
|
|
if len(byteChunk) == 0 {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
b64Data := base64.StdEncoding.EncodeToString(byteChunk)
|
2022-11-24 20:41:57 +00:00
|
|
|
err := w.Dbfs.AddBlock(ctx,
|
2022-09-27 16:58:55 +00:00
|
|
|
dbfs.AddBlock{
|
2022-09-07 09:55:59 +00:00
|
|
|
Data: b64Data,
|
|
|
|
Handle: handle,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("cannot add block: %w", err)
|
|
|
|
}
|
|
|
|
}
|
2022-11-24 20:41:57 +00:00
|
|
|
err = w.Dbfs.Close(ctx,
|
2022-09-27 16:58:55 +00:00
|
|
|
dbfs.Close{
|
2022-09-07 09:55:59 +00:00
|
|
|
Handle: handle,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("cannot close handle: %w", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func ReadDbfsFile(ctx context.Context,
|
2022-11-24 20:41:57 +00:00
|
|
|
w *databricks.WorkspaceClient,
|
2022-09-07 09:55:59 +00:00
|
|
|
path string,
|
|
|
|
) (content []byte, err error) {
|
|
|
|
// see https://docs.databricks.com/dev-tools/api/latest/dbfs.html#read
|
|
|
|
const READ_BYTE_CHUNK_SIZE = 1e6
|
|
|
|
fetchLoop := true
|
|
|
|
offSet := 0
|
|
|
|
length := int(READ_BYTE_CHUNK_SIZE)
|
|
|
|
for fetchLoop {
|
2022-11-24 20:41:57 +00:00
|
|
|
dbfsReadReponse, err := w.Dbfs.Read(ctx,
|
2022-09-07 09:55:59 +00:00
|
|
|
dbfs.ReadRequest{
|
|
|
|
Path: path,
|
|
|
|
Offset: offSet,
|
|
|
|
Length: length,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return content, fmt.Errorf("cannot read %s: %w", path, err)
|
|
|
|
}
|
|
|
|
if dbfsReadReponse.BytesRead == 0 || dbfsReadReponse.BytesRead < int64(length) {
|
|
|
|
fetchLoop = false
|
|
|
|
}
|
|
|
|
decodedBytes, err := base64.StdEncoding.DecodeString(dbfsReadReponse.Data)
|
|
|
|
if err != nil {
|
|
|
|
return content, err
|
|
|
|
}
|
|
|
|
content = append(content, decodedBytes...)
|
|
|
|
offSet += length
|
|
|
|
}
|
|
|
|
return content, err
|
|
|
|
}
|