2022-12-14 14:37:14 +00:00
|
|
|
package filer
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
2023-02-20 15:00:20 +00:00
|
|
|
"errors"
|
2022-12-14 14:37:14 +00:00
|
|
|
"fmt"
|
|
|
|
"io"
|
2023-05-31 12:22:26 +00:00
|
|
|
"io/fs"
|
2022-12-14 14:37:14 +00:00
|
|
|
"net/http"
|
2023-03-08 13:27:05 +00:00
|
|
|
"net/url"
|
2022-12-14 14:37:14 +00:00
|
|
|
"path"
|
2023-06-12 19:03:46 +00:00
|
|
|
"regexp"
|
2023-08-15 13:50:40 +00:00
|
|
|
"slices"
|
2023-05-31 09:11:17 +00:00
|
|
|
"sort"
|
2022-12-14 14:37:14 +00:00
|
|
|
"strings"
|
2023-05-31 09:11:17 +00:00
|
|
|
"time"
|
2022-12-14 14:37:14 +00:00
|
|
|
|
|
|
|
"github.com/databricks/databricks-sdk-go"
|
|
|
|
"github.com/databricks/databricks-sdk-go/apierr"
|
|
|
|
"github.com/databricks/databricks-sdk-go/client"
|
2024-07-05 11:32:29 +00:00
|
|
|
"github.com/databricks/databricks-sdk-go/marshal"
|
2022-12-14 14:37:14 +00:00
|
|
|
"github.com/databricks/databricks-sdk-go/service/workspace"
|
|
|
|
)
|
|
|
|
|
2023-05-31 12:22:26 +00:00
|
|
|
// Type that implements fs.DirEntry for WSFS.
|
|
|
|
type wsfsDirEntry struct {
|
|
|
|
wsfsFileInfo
|
|
|
|
}
|
|
|
|
|
|
|
|
func (entry wsfsDirEntry) Type() fs.FileMode {
|
|
|
|
return entry.wsfsFileInfo.Mode()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (entry wsfsDirEntry) Info() (fs.FileInfo, error) {
|
|
|
|
return entry.wsfsFileInfo, nil
|
|
|
|
}
|
|
|
|
|
2024-06-24 10:15:13 +00:00
|
|
|
func wsfsDirEntriesFromObjectInfos(objects []workspace.ObjectInfo) []fs.DirEntry {
|
|
|
|
info := make([]fs.DirEntry, len(objects))
|
|
|
|
for i, v := range objects {
|
2024-07-05 11:32:29 +00:00
|
|
|
info[i] = wsfsDirEntry{wsfsFileInfo{ObjectInfo: v}}
|
2024-06-24 10:15:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Sort by name for parity with os.ReadDir.
|
|
|
|
sort.Slice(info, func(i, j int) bool { return info[i].Name() < info[j].Name() })
|
|
|
|
return info
|
|
|
|
}
|
|
|
|
|
2023-05-31 12:22:26 +00:00
|
|
|
// Type that implements fs.FileInfo for WSFS.
|
|
|
|
type wsfsFileInfo struct {
|
2024-07-05 11:32:29 +00:00
|
|
|
workspace.ObjectInfo
|
|
|
|
|
|
|
|
// The export format of a notebook. This is not exposed by the SDK.
|
|
|
|
ReposExportFormat workspace.ExportFormat `json:"repos_export_format,omitempty"`
|
2023-05-31 12:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (info wsfsFileInfo) Name() string {
|
2024-07-05 11:32:29 +00:00
|
|
|
return path.Base(info.ObjectInfo.Path)
|
2023-05-31 12:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (info wsfsFileInfo) Size() int64 {
|
2024-07-05 11:32:29 +00:00
|
|
|
return info.ObjectInfo.Size
|
2023-05-31 12:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (info wsfsFileInfo) Mode() fs.FileMode {
|
2024-07-05 11:32:29 +00:00
|
|
|
switch info.ObjectInfo.ObjectType {
|
2024-06-24 10:15:13 +00:00
|
|
|
case workspace.ObjectTypeDirectory, workspace.ObjectTypeRepo:
|
2023-05-31 12:22:26 +00:00
|
|
|
return fs.ModeDir
|
|
|
|
default:
|
|
|
|
return fs.ModePerm
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (info wsfsFileInfo) ModTime() time.Time {
|
2024-07-05 11:32:29 +00:00
|
|
|
return time.UnixMilli(info.ObjectInfo.ModifiedAt)
|
2023-05-31 12:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (info wsfsFileInfo) IsDir() bool {
|
2024-06-24 10:15:13 +00:00
|
|
|
return info.Mode() == fs.ModeDir
|
2023-05-31 12:22:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (info wsfsFileInfo) Sys() any {
|
2024-07-05 11:32:29 +00:00
|
|
|
return info.ObjectInfo
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnmarshalJSON is a custom unmarshaller for the wsfsFileInfo struct.
|
|
|
|
// It must be defined for this type because otherwise the implementation
|
|
|
|
// of the embedded ObjectInfo type will be used.
|
|
|
|
func (info *wsfsFileInfo) UnmarshalJSON(b []byte) error {
|
|
|
|
return marshal.Unmarshal(b, info)
|
|
|
|
}
|
|
|
|
|
|
|
|
// MarshalJSON is a custom marshaller for the wsfsFileInfo struct.
|
|
|
|
// It must be defined for this type because otherwise the implementation
|
|
|
|
// of the embedded ObjectInfo type will be used.
|
|
|
|
func (info *wsfsFileInfo) MarshalJSON() ([]byte, error) {
|
|
|
|
return marshal.Marshal(info)
|
2023-05-31 12:22:26 +00:00
|
|
|
}
|
|
|
|
|
2022-12-15 16:16:07 +00:00
|
|
|
// WorkspaceFilesClient implements the files-in-workspace API.
|
|
|
|
|
|
|
|
// NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled.
|
|
|
|
// It can access any workspace path if files-in-workspace is enabled.
|
2022-12-14 14:37:14 +00:00
|
|
|
type WorkspaceFilesClient struct {
|
|
|
|
workspaceClient *databricks.WorkspaceClient
|
|
|
|
apiClient *client.DatabricksClient
|
|
|
|
|
|
|
|
// File operations will be relative to this path.
|
2023-06-23 14:07:09 +00:00
|
|
|
root WorkspaceRootPath
|
2022-12-14 14:37:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
|
|
|
|
apiClient, err := client.New(w.Config)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return &WorkspaceFilesClient{
|
|
|
|
workspaceClient: w,
|
|
|
|
apiClient: apiClient,
|
|
|
|
|
2023-06-23 14:07:09 +00:00
|
|
|
root: NewWorkspaceRootPath(root),
|
2022-12-14 14:37:14 +00:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
|
2022-12-14 22:41:37 +00:00
|
|
|
absPath, err := w.root.Join(name)
|
2022-12-14 14:37:14 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Remove leading "/" so we can use it in the URL.
|
|
|
|
overwrite := slices.Contains(mode, OverwriteIfExists)
|
|
|
|
urlPath := fmt.Sprintf(
|
|
|
|
"/api/2.0/workspace-files/import-file/%s?overwrite=%t",
|
2023-03-17 16:42:35 +00:00
|
|
|
url.PathEscape(strings.TrimLeft(absPath, "/")),
|
2022-12-14 14:37:14 +00:00
|
|
|
overwrite,
|
|
|
|
)
|
|
|
|
|
|
|
|
// Buffer the file contents because we may need to retry below and we cannot read twice.
|
|
|
|
body, err := io.ReadAll(reader)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-09-05 09:43:57 +00:00
|
|
|
err = w.apiClient.Do(ctx, http.MethodPost, urlPath, nil, body, nil)
|
2022-12-14 14:37:14 +00:00
|
|
|
|
2023-05-31 11:24:20 +00:00
|
|
|
// Return early on success.
|
|
|
|
if err == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Special handling of this error only if it is an API error.
|
2023-02-20 15:00:20 +00:00
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
2022-12-14 14:37:14 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This API returns a 404 if the parent directory does not exist.
|
|
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
|
|
if !slices.Contains(mode, CreateParentDirectories) {
|
|
|
|
return NoSuchDirectoryError{path.Dir(absPath)}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create parent directory.
|
|
|
|
err = w.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(absPath))
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("unable to mkdir to write file %s: %w", absPath, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Retry without CreateParentDirectories mode flag.
|
|
|
|
return w.Write(ctx, name, bytes.NewReader(body), sliceWithout(mode, CreateParentDirectories)...)
|
|
|
|
}
|
|
|
|
|
2023-06-12 19:03:46 +00:00
|
|
|
// This API returns 409 if the file already exists, when the object type is file
|
2022-12-14 14:37:14 +00:00
|
|
|
if aerr.StatusCode == http.StatusConflict {
|
|
|
|
return FileAlreadyExistsError{absPath}
|
|
|
|
}
|
|
|
|
|
2023-06-12 19:03:46 +00:00
|
|
|
// This API returns 400 if the file already exists, when the object type is notebook
|
|
|
|
regex := regexp.MustCompile(`Path \((.*)\) already exists.`)
|
|
|
|
if aerr.StatusCode == http.StatusBadRequest && regex.Match([]byte(aerr.Message)) {
|
|
|
|
// Parse file path from regex capture group
|
|
|
|
matches := regex.FindStringSubmatch(aerr.Message)
|
|
|
|
if len(matches) == 2 {
|
|
|
|
return FileAlreadyExistsError{matches[1]}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Default to path specified to filer.Write if regex capture fails
|
|
|
|
return FileAlreadyExistsError{absPath}
|
|
|
|
}
|
|
|
|
|
2022-12-14 14:37:14 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-12 13:53:58 +00:00
|
|
|
func (w *WorkspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {
|
2022-12-14 22:41:37 +00:00
|
|
|
absPath, err := w.root.Join(name)
|
2022-12-14 14:37:14 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2023-06-08 16:15:12 +00:00
|
|
|
// This stat call serves two purposes:
|
|
|
|
// 1. Checks file at path exists, and throws an error if it does not
|
|
|
|
// 2. Allows us to error out if the path is a directory. This is needed
|
|
|
|
// because the /workspace/export API does not error out, and returns the directory
|
|
|
|
// as a DBC archive even if format "SOURCE" is specified
|
|
|
|
stat, err := w.Stat(ctx, name)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if stat.IsDir() {
|
|
|
|
return nil, NotAFile{absPath}
|
2023-05-31 11:24:20 +00:00
|
|
|
}
|
|
|
|
|
2023-06-08 16:15:12 +00:00
|
|
|
// Export file contents. Note the /workspace/export API has a limit of 10MBs
|
|
|
|
// for the file size
|
2023-06-23 13:17:39 +00:00
|
|
|
return w.workspaceClient.Workspace.Download(ctx, absPath)
|
2022-12-14 14:37:14 +00:00
|
|
|
}
|
|
|
|
|
2023-06-06 06:27:47 +00:00
|
|
|
func (w *WorkspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
|
2022-12-14 22:41:37 +00:00
|
|
|
absPath, err := w.root.Join(name)
|
2022-12-14 14:37:14 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-06 06:27:47 +00:00
|
|
|
// Illegal to delete the root path.
|
|
|
|
if absPath == w.root.rootPath {
|
|
|
|
return CannotDeleteRootError{}
|
|
|
|
}
|
|
|
|
|
|
|
|
recursive := false
|
|
|
|
if slices.Contains(mode, DeleteRecursively) {
|
|
|
|
recursive = true
|
|
|
|
}
|
|
|
|
|
2023-05-31 11:24:20 +00:00
|
|
|
err = w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{
|
2022-12-14 14:37:14 +00:00
|
|
|
Path: absPath,
|
2023-06-06 06:27:47 +00:00
|
|
|
Recursive: recursive,
|
2022-12-14 14:37:14 +00:00
|
|
|
})
|
2023-05-31 11:24:20 +00:00
|
|
|
|
|
|
|
// Return early on success.
|
|
|
|
if err == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Special handling of this error only if it is an API error.
|
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-06-06 06:27:47 +00:00
|
|
|
switch aerr.StatusCode {
|
|
|
|
case http.StatusBadRequest:
|
|
|
|
if aerr.ErrorCode == "DIRECTORY_NOT_EMPTY" {
|
|
|
|
return DirectoryNotEmptyError{absPath}
|
|
|
|
}
|
|
|
|
case http.StatusNotFound:
|
2023-05-31 11:24:20 +00:00
|
|
|
return FileDoesNotExistError{absPath}
|
|
|
|
}
|
|
|
|
|
|
|
|
return err
|
2022-12-14 14:37:14 +00:00
|
|
|
}
|
2023-05-31 09:11:17 +00:00
|
|
|
|
2023-05-31 12:22:26 +00:00
|
|
|
func (w *WorkspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
2023-05-31 09:11:17 +00:00
|
|
|
absPath, err := w.root.Join(name)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
objects, err := w.workspaceClient.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{
|
|
|
|
Path: absPath,
|
|
|
|
})
|
2023-06-02 10:28:35 +00:00
|
|
|
|
|
|
|
if len(objects) == 1 && objects[0].Path == absPath {
|
|
|
|
return nil, NotADirectory{absPath}
|
|
|
|
}
|
|
|
|
|
2023-05-31 09:11:17 +00:00
|
|
|
if err != nil {
|
|
|
|
// If we got an API error we deal with it below.
|
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This API returns a 404 if the specified path does not exist.
|
|
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
|
|
return nil, NoSuchDirectoryError{path.Dir(absPath)}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-06-24 10:15:13 +00:00
|
|
|
// Convert to fs.DirEntry.
|
|
|
|
return wsfsDirEntriesFromObjectInfos(objects), nil
|
2023-05-31 09:11:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (w *WorkspaceFilesClient) Mkdir(ctx context.Context, name string) error {
|
|
|
|
dirPath, err := w.root.Join(name)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return w.workspaceClient.Workspace.Mkdirs(ctx, workspace.Mkdirs{
|
|
|
|
Path: dirPath,
|
|
|
|
})
|
|
|
|
}
|
2023-06-01 18:23:22 +00:00
|
|
|
|
|
|
|
func (w *WorkspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
|
|
|
|
absPath, err := w.root.Join(name)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2024-07-05 11:32:29 +00:00
|
|
|
var stat wsfsFileInfo
|
|
|
|
|
|
|
|
// Perform bespoke API call because "return_export_info" is not exposed by the SDK.
|
|
|
|
// We need "repos_export_format" to determine if the file is a py or a ipynb notebook.
|
|
|
|
// This is not exposed by the SDK so we need to make a direct API call.
|
|
|
|
err = w.apiClient.Do(
|
|
|
|
ctx,
|
|
|
|
http.MethodGet,
|
|
|
|
"/api/2.0/workspace/get-status",
|
|
|
|
nil,
|
|
|
|
map[string]string{
|
|
|
|
"path": absPath,
|
|
|
|
"return_export_info": "true",
|
|
|
|
},
|
|
|
|
&stat,
|
|
|
|
)
|
2023-06-01 18:23:22 +00:00
|
|
|
if err != nil {
|
|
|
|
// If we got an API error we deal with it below.
|
|
|
|
var aerr *apierr.APIError
|
|
|
|
if !errors.As(err, &aerr) {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This API returns a 404 if the specified path does not exist.
|
|
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
|
|
return nil, FileDoesNotExistError{absPath}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-07-05 11:32:29 +00:00
|
|
|
return stat, nil
|
2023-06-01 18:23:22 +00:00
|
|
|
}
|