mirror of https://github.com/databricks/cli.git
346 lines
12 KiB
Go
346 lines
12 KiB
Go
package filer
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"net/http"
|
|
"path"
|
|
"strings"
|
|
|
|
"github.com/databricks/cli/libs/log"
|
|
"github.com/databricks/cli/libs/notebook"
|
|
"github.com/databricks/databricks-sdk-go"
|
|
"github.com/databricks/databricks-sdk-go/apierr"
|
|
"github.com/databricks/databricks-sdk-go/client"
|
|
"github.com/databricks/databricks-sdk-go/marshal"
|
|
"github.com/databricks/databricks-sdk-go/service/workspace"
|
|
)
|
|
|
|
type workspaceFilesExtensionsClient struct {
|
|
workspaceClient *databricks.WorkspaceClient
|
|
apiClient *client.DatabricksClient
|
|
|
|
wsfs Filer
|
|
root string
|
|
}
|
|
|
|
var extensionsToLanguages = map[string]workspace.Language{
|
|
".py": workspace.LanguagePython,
|
|
".r": workspace.LanguageR,
|
|
".scala": workspace.LanguageScala,
|
|
".sql": workspace.LanguageSql,
|
|
".ipynb": workspace.LanguagePython,
|
|
}
|
|
|
|
// workspaceFileStatus defines a custom response body for the "/api/2.0/workspace/get-status" API.
|
|
// The "repos_export_format" field is not exposed by the SDK.
|
|
type workspaceFileStatus struct {
|
|
*workspace.ObjectInfo
|
|
|
|
// The export format of the notebook. This is not exposed by the SDK.
|
|
ReposExportFormat workspace.ExportFormat `json:"repos_export_format,omitempty"`
|
|
|
|
// Name of the file to be used in any API calls made using the workspace files
|
|
// filer. For notebooks this path does not include the extension.
|
|
nameForWorkspaceAPI string
|
|
}
|
|
|
|
// A custom unmarsaller for the workspaceFileStatus struct. This is needed because
|
|
// workspaceFileStatus embeds the workspace.ObjectInfo which itself has a custom
|
|
// unmarshaller.
|
|
// If a custom unmarshaller is not provided extra fields like ReposExportFormat
|
|
// will not have values set.
|
|
func (s *workspaceFileStatus) UnmarshalJSON(b []byte) error {
|
|
return marshal.Unmarshal(b, s)
|
|
}
|
|
|
|
func (s *workspaceFileStatus) MarshalJSON() ([]byte, error) {
|
|
return marshal.Marshal(s)
|
|
}
|
|
|
|
func (w *workspaceFilesExtensionsClient) stat(ctx context.Context, name string) (*workspaceFileStatus, error) {
|
|
stat := &workspaceFileStatus{
|
|
nameForWorkspaceAPI: name,
|
|
}
|
|
|
|
// Perform bespoke API call because "return_export_info" is not exposed by the SDK.
|
|
// We need "repos_export_format" to determine if the file is a py or a ipynb notebook.
|
|
// This is not exposed by the SDK so we need to make a direct API call.
|
|
err := w.apiClient.Do(
|
|
ctx,
|
|
http.MethodGet,
|
|
"/api/2.0/workspace/get-status",
|
|
nil,
|
|
map[string]string{
|
|
"path": path.Join(w.root, name),
|
|
"return_export_info": "true",
|
|
},
|
|
stat,
|
|
)
|
|
if err != nil {
|
|
// If we got an API error we deal with it below.
|
|
var aerr *apierr.APIError
|
|
if !errors.As(err, &aerr) {
|
|
return nil, err
|
|
}
|
|
|
|
// This API returns a 404 if the specified path does not exist.
|
|
if aerr.StatusCode == http.StatusNotFound {
|
|
return nil, FileDoesNotExistError{path.Join(w.root, name)}
|
|
}
|
|
}
|
|
return stat, err
|
|
}
|
|
|
|
// This function returns the stat for the provided notebook. The stat object itself contains the path
|
|
// with the extension since it is meant to be used in the context of a fs.FileInfo.
|
|
func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithExt(ctx context.Context, name string) (*workspaceFileStatus, error) {
|
|
ext := path.Ext(name)
|
|
nameWithoutExt := strings.TrimSuffix(name, ext)
|
|
|
|
// File name does not have an extension associated with Databricks notebooks, return early.
|
|
if _, ok := extensionsToLanguages[ext]; !ok {
|
|
return nil, nil
|
|
}
|
|
|
|
// If the file could be a notebook, check if it is and has the correct language.
|
|
stat, err := w.stat(ctx, nameWithoutExt)
|
|
if err != nil {
|
|
// If the file does not exist, return early.
|
|
if errors.As(err, &FileDoesNotExistError{}) {
|
|
return nil, nil
|
|
}
|
|
log.Debugf(ctx, "attempting to determine if %s could be a notebook. Failed to fetch the status of object at %s: %s", name, path.Join(w.root, nameWithoutExt), err)
|
|
return nil, err
|
|
}
|
|
|
|
// Not a notebook. Return early.
|
|
if stat.ObjectType != workspace.ObjectTypeNotebook {
|
|
log.Debugf(ctx, "attempting to determine if %s could be a notebook. Found an object at %s but it is not a notebook. It is a %s.", name, path.Join(w.root, nameWithoutExt), stat.ObjectType)
|
|
return nil, nil
|
|
}
|
|
|
|
// Not the correct language. Return early.
|
|
if stat.Language != extensionsToLanguages[ext] {
|
|
log.Debugf(ctx, "attempting to determine if %s could be a notebook. Found a notebook at %s but it is not of the correct language. Expected %s but found %s.", name, path.Join(w.root, nameWithoutExt), extensionsToLanguages[ext], stat.Language)
|
|
return nil, nil
|
|
}
|
|
|
|
// When the extension is .py we expect the export format to be source.
|
|
// If it's not, return early.
|
|
if ext == ".py" && stat.ReposExportFormat != workspace.ExportFormatSource {
|
|
log.Debugf(ctx, "attempting to determine if %s could be a notebook. Found a notebook at %s but it is not exported as a source notebook. Its export format is %s.", name, path.Join(w.root, nameWithoutExt), stat.ReposExportFormat)
|
|
return nil, nil
|
|
}
|
|
|
|
// When the extension is .ipynb we expect the export format to be Jupyter.
|
|
// If it's not, return early.
|
|
if ext == ".ipynb" && stat.ReposExportFormat != workspace.ExportFormatJupyter {
|
|
log.Debugf(ctx, "attempting to determine if %s could be a notebook. Found a notebook at %s but it is not exported as a Jupyter notebook. Its export format is %s.", name, path.Join(w.root, nameWithoutExt), stat.ReposExportFormat)
|
|
return nil, nil
|
|
}
|
|
|
|
// Modify the stat object path to include the extension. This stat object will be used
|
|
// to return the fs.FileInfo object in the stat method.
|
|
stat.Path = stat.Path + ext
|
|
return stat, nil
|
|
}
|
|
|
|
func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithoutExt(ctx context.Context, name string) (*workspaceFileStatus, error) {
|
|
stat, err := w.stat(ctx, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// We expect this internal function to only be called from [ReadDir] when we are sure
|
|
// that the object is a notebook. Thus, this should never happen.
|
|
if stat.ObjectType != workspace.ObjectTypeNotebook {
|
|
return nil, fmt.Errorf("expected object at %s to be a notebook but it is a %s", path.Join(w.root, name), stat.ObjectType)
|
|
}
|
|
|
|
// Get the extension for the notebook.
|
|
ext := notebook.GetExtensionByLanguage(stat.ObjectInfo)
|
|
|
|
// If the notebook was exported as a Jupyter notebook, the extension should be .ipynb.
|
|
if stat.Language == workspace.LanguagePython && stat.ReposExportFormat == workspace.ExportFormatJupyter {
|
|
ext = ".ipynb"
|
|
}
|
|
|
|
// Modify the stat object path to include the extension. This stat object will be used
|
|
// to return the fs.DirEntry object in the ReadDir method.
|
|
stat.Path = stat.Path + ext
|
|
return stat, nil
|
|
}
|
|
|
|
type DuplicatePathError struct {
|
|
oi1 workspace.ObjectInfo
|
|
oi2 workspace.ObjectInfo
|
|
|
|
commonName string
|
|
}
|
|
|
|
func (e DuplicatePathError) Error() string {
|
|
return fmt.Sprintf("failed to read files from the workspace file system. Duplicate paths encountered. Both %s at %s and %s at %s resolve to the same name %s. Changing the name of one of these objects will resolve this issue", e.oi1.ObjectType, e.oi1.Path, e.oi2.ObjectType, e.oi2.Path, e.commonName)
|
|
}
|
|
|
|
// This is a filer for the workspace file system that allows you to pretend the
|
|
// workspace file system is a traditional file system. It allows you to list, read, write,
|
|
// delete, and stat notebooks (and files in general) in the workspace, using their paths
|
|
// with the extension included.
|
|
//
|
|
// The ReadDir method returns a DuplicatePathError if this traditional file system view is
|
|
// not possible. For example, a Python notebook called foo and a Python file called `foo.py`
|
|
// would resolve to the same path `foo.py` in a tradition file system.
|
|
//
|
|
// Users of this filer should be careful when using the Write and Mkdir methods.
|
|
// The underlying import API we use to upload notebooks and files returns opaque internal
|
|
// errors for namespace clashes (e.g. a file and a notebook or a directory and a notebook).
|
|
// Thus users of these methods should be careful to avoid such clashes.
|
|
func NewWorkspaceFilesExtensionsClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
|
|
apiClient, err := client.New(w.Config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
filer, err := NewWorkspaceFilesClient(w, root)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &workspaceFilesExtensionsClient{
|
|
workspaceClient: w,
|
|
apiClient: apiClient,
|
|
|
|
wsfs: filer,
|
|
root: root,
|
|
}, nil
|
|
}
|
|
|
|
func (w *workspaceFilesExtensionsClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
|
entries, err := w.wsfs.ReadDir(ctx, name)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
seenPaths := make(map[string]workspace.ObjectInfo)
|
|
for i := range entries {
|
|
info, err := entries[i].Info()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
sysInfo := info.Sys().(workspace.ObjectInfo)
|
|
|
|
// If the object is a notebook, include an extension in the entry.
|
|
if sysInfo.ObjectType == workspace.ObjectTypeNotebook {
|
|
stat, err := w.getNotebookStatByNameWithoutExt(ctx, path.Join(name, entries[i].Name()))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Replace the entry with the new entry that includes the extension.
|
|
entries[i] = wsfsDirEntry{wsfsFileInfo{oi: *stat.ObjectInfo}}
|
|
}
|
|
|
|
// Error if we have seen this path before in the current directory.
|
|
// If not seen before, add it to the seen paths.
|
|
if _, ok := seenPaths[entries[i].Name()]; ok {
|
|
return nil, DuplicatePathError{
|
|
oi1: seenPaths[entries[i].Name()],
|
|
oi2: sysInfo,
|
|
commonName: path.Join(name, entries[i].Name()),
|
|
}
|
|
}
|
|
seenPaths[entries[i].Name()] = sysInfo
|
|
}
|
|
|
|
return entries, nil
|
|
}
|
|
|
|
// Note: The import API returns opaque internal errors for namespace clashes
|
|
// (e.g. a file and a notebook or a directory and a notebook). Thus users of this
|
|
// method should be careful to avoid such clashes.
|
|
func (w *workspaceFilesExtensionsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
|
|
return w.wsfs.Write(ctx, name, reader, mode...)
|
|
}
|
|
|
|
// Try to read the file as a regular file. If the file is not found, try to read it as a notebook.
|
|
func (w *workspaceFilesExtensionsClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {
|
|
r, err := w.wsfs.Read(ctx, name)
|
|
|
|
// If the file is not found, it might be a notebook.
|
|
if errors.As(err, &FileDoesNotExistError{}) {
|
|
stat, serr := w.getNotebookStatByNameWithExt(ctx, name)
|
|
if serr != nil {
|
|
// Unable to stat. Return the stat error.
|
|
return nil, serr
|
|
}
|
|
if stat == nil {
|
|
// Not a notebook. Return the original error.
|
|
return nil, err
|
|
}
|
|
|
|
// The workspace files filer performs an additional stat call to make sure
|
|
// the path is not a directory. We can skip this step since we already have
|
|
// the stat object and know that the path is a notebook.
|
|
return w.workspaceClient.Workspace.Download(
|
|
ctx,
|
|
path.Join(w.root, stat.nameForWorkspaceAPI),
|
|
workspace.DownloadFormat(stat.ReposExportFormat),
|
|
)
|
|
}
|
|
return r, err
|
|
}
|
|
|
|
// Try to delete the file as a regular file. If the file is not found, try to delete it as a notebook.
|
|
func (w *workspaceFilesExtensionsClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
|
|
err := w.wsfs.Delete(ctx, name, mode...)
|
|
|
|
// If the file is not found, it might be a notebook.
|
|
if errors.As(err, &FileDoesNotExistError{}) {
|
|
stat, serr := w.getNotebookStatByNameWithExt(ctx, name)
|
|
if serr != nil {
|
|
// Unable to stat. Return the stat error.
|
|
return serr
|
|
}
|
|
if stat == nil {
|
|
// Not a notebook. Return the original error.
|
|
return err
|
|
}
|
|
|
|
return w.wsfs.Delete(ctx, stat.nameForWorkspaceAPI, mode...)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// Try to stat the file as a regular file. If the file is not found, try to stat it as a notebook.
|
|
func (w *workspaceFilesExtensionsClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
|
|
info, err := w.wsfs.Stat(ctx, name)
|
|
|
|
// If the file is not found, it might be a notebook.
|
|
if errors.As(err, &FileDoesNotExistError{}) {
|
|
stat, serr := w.getNotebookStatByNameWithExt(ctx, name)
|
|
if serr != nil {
|
|
// Unable to stat. Return the stat error.
|
|
return nil, serr
|
|
}
|
|
if stat == nil {
|
|
// Not a notebook. Return the original error.
|
|
return nil, err
|
|
}
|
|
|
|
return wsfsFileInfo{oi: *stat.ObjectInfo}, nil
|
|
}
|
|
|
|
return info, err
|
|
}
|
|
|
|
// Note: The import API returns opaque internal errors for namespace clashes
|
|
// (e.g. a file and a notebook or a directory and a notebook). Thus users of this
|
|
// method should be careful to avoid such clashes.
|
|
func (w *workspaceFilesExtensionsClient) Mkdir(ctx context.Context, name string) error {
|
|
return w.wsfs.Mkdir(ctx, name)
|
|
}
|