package filer import ( "bytes" "context" "errors" "fmt" "io" "io/fs" "net/http" "net/url" "path" "regexp" "slices" "sort" "strings" "time" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/marshal" "github.com/databricks/databricks-sdk-go/service/workspace" ) // Type that implements fs.DirEntry for WSFS. type wsfsDirEntry struct { wsfsFileInfo } func (entry wsfsDirEntry) Type() fs.FileMode { return entry.wsfsFileInfo.Mode() } func (entry wsfsDirEntry) Info() (fs.FileInfo, error) { return entry.wsfsFileInfo, nil } func wsfsDirEntriesFromObjectInfos(objects []workspace.ObjectInfo) []fs.DirEntry { info := make([]fs.DirEntry, len(objects)) for i, v := range objects { info[i] = wsfsDirEntry{wsfsFileInfo{ObjectInfo: v}} } // Sort by name for parity with os.ReadDir. sort.Slice(info, func(i, j int) bool { return info[i].Name() < info[j].Name() }) return info } // Type that implements fs.FileInfo for WSFS. type wsfsFileInfo struct { workspace.ObjectInfo // The export format of a notebook. This is not exposed by the SDK. ReposExportFormat workspace.ExportFormat `json:"repos_export_format,omitempty"` } func (info wsfsFileInfo) Name() string { return path.Base(info.ObjectInfo.Path) } func (info wsfsFileInfo) Size() int64 { return info.ObjectInfo.Size } func (info wsfsFileInfo) Mode() fs.FileMode { switch info.ObjectInfo.ObjectType { case workspace.ObjectTypeDirectory, workspace.ObjectTypeRepo: return fs.ModeDir default: return fs.ModePerm } } func (info wsfsFileInfo) ModTime() time.Time { return time.UnixMilli(info.ObjectInfo.ModifiedAt) } func (info wsfsFileInfo) IsDir() bool { return info.Mode() == fs.ModeDir } func (info wsfsFileInfo) Sys() any { return info.ObjectInfo } func (info wsfsFileInfo) WorkspaceObjectInfo() workspace.ObjectInfo { return info.ObjectInfo } // UnmarshalJSON is a custom unmarshaller for the wsfsFileInfo struct. // It must be defined for this type because otherwise the implementation // of the embedded ObjectInfo type will be used. func (info *wsfsFileInfo) UnmarshalJSON(b []byte) error { return marshal.Unmarshal(b, info) } // MarshalJSON is a custom marshaller for the wsfsFileInfo struct. // It must be defined for this type because otherwise the implementation // of the embedded ObjectInfo type will be used. func (info *wsfsFileInfo) MarshalJSON() ([]byte, error) { return marshal.Marshal(info) } // Interface for *client.DatabricksClient from the Databricks Go SDK. Abstracted // as an interface to allow for mocking in tests. type apiClient interface { Do(ctx context.Context, method, path string, headers map[string]string, request, response any, visitors ...func(*http.Request) error) error } // WorkspaceFilesClient implements the files-in-workspace API. // NOTE: This API is available for files under /Repos if a workspace has files-in-repos enabled. // It can access any workspace path if files-in-workspace is enabled. type workspaceFilesClient struct { workspaceClient *databricks.WorkspaceClient apiClient apiClient // File operations will be relative to this path. root WorkspaceRootPath } func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) { apiClient, err := client.New(w.Config) if err != nil { return nil, err } return &workspaceFilesClient{ workspaceClient: w, apiClient: apiClient, root: NewWorkspaceRootPath(root), }, nil } func (w *workspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { absPath, err := w.root.Join(name) if err != nil { return err } // Remove leading "/" so we can use it in the URL. overwrite := slices.Contains(mode, OverwriteIfExists) urlPath := fmt.Sprintf( "/api/2.0/workspace-files/import-file/%s?overwrite=%t", url.PathEscape(strings.TrimLeft(absPath, "/")), overwrite, ) // Buffer the file contents because we may need to retry below and we cannot read twice. body, err := io.ReadAll(reader) if err != nil { return err } err = w.apiClient.Do(ctx, http.MethodPost, urlPath, nil, body, nil) // Return early on success. if err == nil { return nil } // Special handling of this error only if it is an API error. var aerr *apierr.APIError if !errors.As(err, &aerr) { return err } // This API returns a 404 if the parent directory does not exist. if aerr.StatusCode == http.StatusNotFound { if !slices.Contains(mode, CreateParentDirectories) { return NoSuchDirectoryError{path.Dir(absPath)} } // Create parent directory. err = w.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(absPath)) if err != nil { if errors.As(err, &aerr) && aerr.StatusCode == http.StatusForbidden { return PermissionError{absPath} } return fmt.Errorf("unable to mkdir to write file %s: %w", absPath, err) } // Retry without CreateParentDirectories mode flag. return w.Write(ctx, name, bytes.NewReader(body), sliceWithout(mode, CreateParentDirectories)...) } // This API returns 409 if the file already exists, when the object type is file if aerr.StatusCode == http.StatusConflict { return FileAlreadyExistsError{absPath} } // This API returns 400 if the file already exists, when the object type is notebook regex := regexp.MustCompile(`Path \((.*)\) already exists.`) if aerr.StatusCode == http.StatusBadRequest && regex.Match([]byte(aerr.Message)) { // Parse file path from regex capture group matches := regex.FindStringSubmatch(aerr.Message) if len(matches) == 2 { return FileAlreadyExistsError{matches[1]} } // Default to path specified to filer.Write if regex capture fails return FileAlreadyExistsError{absPath} } // This API returns StatusForbidden when you have read access but don't have write access to a file if aerr.StatusCode == http.StatusForbidden { return PermissionError{absPath} } return err } func (w *workspaceFilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err } // This stat call serves two purposes: // 1. Checks file at path exists, and throws an error if it does not // 2. Allows us to error out if the path is a directory. This is needed // because the /workspace/export API does not error out, and returns the directory // as a DBC archive even if format "SOURCE" is specified stat, err := w.Stat(ctx, name) if err != nil { return nil, err } if stat.IsDir() { return nil, NotAFile{absPath} } // Export file contents. Note the /workspace/export API has a limit of 10MBs // for the file size return w.workspaceClient.Workspace.Download(ctx, absPath) } func (w *workspaceFilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error { absPath, err := w.root.Join(name) if err != nil { return err } // Illegal to delete the root path. if absPath == w.root.rootPath { return CannotDeleteRootError{} } recursive := false if slices.Contains(mode, DeleteRecursively) { recursive = true } err = w.workspaceClient.Workspace.Delete(ctx, workspace.Delete{ Path: absPath, Recursive: recursive, }) // Return early on success. if err == nil { return nil } // Special handling of this error only if it is an API error. var aerr *apierr.APIError if !errors.As(err, &aerr) { return err } switch aerr.StatusCode { case http.StatusBadRequest: if aerr.ErrorCode == "DIRECTORY_NOT_EMPTY" { return DirectoryNotEmptyError{absPath} } case http.StatusNotFound: return FileDoesNotExistError{absPath} } return err } func (w *workspaceFilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err } objects, err := w.workspaceClient.Workspace.ListAll(ctx, workspace.ListWorkspaceRequest{ Path: absPath, }) if len(objects) == 1 && objects[0].Path == absPath { return nil, NotADirectory{absPath} } if err != nil { // If we got an API error we deal with it below. var aerr *apierr.APIError if !errors.As(err, &aerr) { return nil, err } // NOTE: This API returns a 404 if the specified path does not exist, // but can also do so if we don't have read access. if aerr.StatusCode == http.StatusNotFound { return nil, NoSuchDirectoryError{path.Dir(absPath)} } return nil, err } // Convert to fs.DirEntry. return wsfsDirEntriesFromObjectInfos(objects), nil } func (w *workspaceFilesClient) Mkdir(ctx context.Context, name string) error { dirPath, err := w.root.Join(name) if err != nil { return err } return w.workspaceClient.Workspace.Mkdirs(ctx, workspace.Mkdirs{ Path: dirPath, }) } func (w *workspaceFilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) { absPath, err := w.root.Join(name) if err != nil { return nil, err } var stat wsfsFileInfo // Perform bespoke API call because "return_export_info" is not exposed by the SDK. // We need "repos_export_format" to determine if the file is a py or a ipynb notebook. // This is not exposed by the SDK so we need to make a direct API call. err = w.apiClient.Do( ctx, http.MethodGet, "/api/2.0/workspace/get-status", nil, map[string]string{ "path": absPath, "return_export_info": "true", }, &stat, ) if err != nil { // If we got an API error we deal with it below. var aerr *apierr.APIError if !errors.As(err, &aerr) { return nil, err } // This API returns a 404 if the specified path does not exist. if aerr.StatusCode == http.StatusNotFound { return nil, FileDoesNotExistError{absPath} } } return stat, nil }