mirror of https://github.com/databricks/cli.git
Add `filer.Filer` to read notebooks from WSFS without omitting their extension (#1457)
## Changes This PR adds a filer that'll allow us to read notebooks from the WSFS using their full paths (with the extension included). The filer relies on the existing workspace filer (and consequently the workspace import/export/list APIs). Using this filer along with a virtual filesystem layer (https://github.com/databricks/cli/pull/1452/files) will allow us to use our custom implementation (which preserves the notebook extensions) rather than the default mount available via DBR when the CLI is run from DBR. ## Tests Integration tests. --------- Co-authored-by: Pieter Noordhuis <pieter.noordhuis@databricks.com>
This commit is contained in:
parent
424499ec1d
commit
ec33a7c059
|
@ -3,9 +3,12 @@ package internal
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
|
@ -37,6 +40,36 @@ func (f filerTest) assertContents(ctx context.Context, name string, contents str
|
|||
assert.Equal(f, contents, body.String())
|
||||
}
|
||||
|
||||
func (f filerTest) assertContentsJupyter(ctx context.Context, name string) {
|
||||
reader, err := f.Read(ctx, name)
|
||||
if !assert.NoError(f, err) {
|
||||
return
|
||||
}
|
||||
|
||||
defer reader.Close()
|
||||
|
||||
var body bytes.Buffer
|
||||
_, err = io.Copy(&body, reader)
|
||||
if !assert.NoError(f, err) {
|
||||
return
|
||||
}
|
||||
|
||||
var actual map[string]any
|
||||
err = json.Unmarshal(body.Bytes(), &actual)
|
||||
if !assert.NoError(f, err) {
|
||||
return
|
||||
}
|
||||
|
||||
// Since a roundtrip to the workspace changes a Jupyter notebook's payload,
|
||||
// the best we can do is assert that the nbformat is correct.
|
||||
assert.EqualValues(f, 4, actual["nbformat"])
|
||||
}
|
||||
|
||||
func (f filerTest) assertNotExists(ctx context.Context, name string) {
|
||||
_, err := f.Stat(ctx, name)
|
||||
assert.ErrorIs(f, err, fs.ErrNotExist)
|
||||
}
|
||||
|
||||
func commonFilerRecursiveDeleteTest(t *testing.T, ctx context.Context, f filer.Filer) {
|
||||
var err error
|
||||
|
||||
|
@ -94,6 +127,7 @@ func TestAccFilerRecursiveDelete(t *testing.T) {
|
|||
{"workspace files", setupWsfsFiler},
|
||||
{"dbfs", setupDbfsFiler},
|
||||
{"files", setupUcVolumesFiler},
|
||||
{"workspace files extensions", setupWsfsExtensionsFiler},
|
||||
} {
|
||||
tc := testCase
|
||||
|
||||
|
@ -204,6 +238,7 @@ func TestAccFilerReadWrite(t *testing.T) {
|
|||
{"workspace files", setupWsfsFiler},
|
||||
{"dbfs", setupDbfsFiler},
|
||||
{"files", setupUcVolumesFiler},
|
||||
{"workspace files extensions", setupWsfsExtensionsFiler},
|
||||
} {
|
||||
tc := testCase
|
||||
|
||||
|
@ -312,6 +347,7 @@ func TestAccFilerReadDir(t *testing.T) {
|
|||
{"workspace files", setupWsfsFiler},
|
||||
{"dbfs", setupDbfsFiler},
|
||||
{"files", setupUcVolumesFiler},
|
||||
{"workspace files extensions", setupWsfsExtensionsFiler},
|
||||
} {
|
||||
tc := testCase
|
||||
|
||||
|
@ -374,6 +410,8 @@ var jupyterNotebookContent2 = `
|
|||
`
|
||||
|
||||
func TestAccFilerWorkspaceNotebookConflict(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f, _ := setupWsfsFiler(t)
|
||||
ctx := context.Background()
|
||||
var err error
|
||||
|
@ -420,6 +458,8 @@ func TestAccFilerWorkspaceNotebookConflict(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
f, _ := setupWsfsFiler(t)
|
||||
ctx := context.Background()
|
||||
var err error
|
||||
|
@ -462,3 +502,309 @@ func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) {
|
|||
filerTest{t, f}.assertContents(ctx, "scalaNb", "// Databricks notebook source\n println(\"second upload\"))")
|
||||
filerTest{t, f}.assertContents(ctx, "jupyterNb", "# Databricks notebook source\nprint(\"Jupyter Notebook Version 2\")")
|
||||
}
|
||||
|
||||
func TestAccFilerWorkspaceFilesExtensionsReadDir(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
files := []struct {
|
||||
name string
|
||||
content string
|
||||
}{
|
||||
{"dir1/dir2/dir3/file.txt", "file content"},
|
||||
{"foo.py", "print('foo')"},
|
||||
{"foo.r", "print('foo')"},
|
||||
{"foo.scala", "println('foo')"},
|
||||
{"foo.sql", "SELECT 'foo'"},
|
||||
{"jupyterNb.ipynb", jupyterNotebookContent1},
|
||||
{"jupyterNb2.ipynb", jupyterNotebookContent2},
|
||||
{"pyNb.py", "# Databricks notebook source\nprint('first upload'))"},
|
||||
{"rNb.r", "# Databricks notebook source\nprint('first upload'))"},
|
||||
{"scalaNb.scala", "// Databricks notebook source\n println(\"first upload\"))"},
|
||||
{"sqlNb.sql", "-- Databricks notebook source\n SELECT \"first upload\""},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
wf, _ := setupWsfsExtensionsFiler(t)
|
||||
|
||||
for _, f := range files {
|
||||
err := wf.Write(ctx, f.name, strings.NewReader(f.content), filer.CreateParentDirectories)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Read entries
|
||||
entries, err := wf.ReadDir(ctx, ".")
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, entries, len(files))
|
||||
names := []string{}
|
||||
for _, e := range entries {
|
||||
names = append(names, e.Name())
|
||||
}
|
||||
assert.Equal(t, []string{
|
||||
"dir1",
|
||||
"foo.py",
|
||||
"foo.r",
|
||||
"foo.scala",
|
||||
"foo.sql",
|
||||
"jupyterNb.ipynb",
|
||||
"jupyterNb2.ipynb",
|
||||
"pyNb.py",
|
||||
"rNb.r",
|
||||
"scalaNb.scala",
|
||||
"sqlNb.sql",
|
||||
}, names)
|
||||
}
|
||||
|
||||
func setupFilerWithExtensionsTest(t *testing.T) filer.Filer {
|
||||
files := []struct {
|
||||
name string
|
||||
content string
|
||||
}{
|
||||
{"foo.py", "# Databricks notebook source\nprint('first upload'))"},
|
||||
{"bar.py", "print('foo')"},
|
||||
{"jupyter.ipynb", jupyterNotebookContent1},
|
||||
{"pretender", "not a notebook"},
|
||||
{"dir/file.txt", "file content"},
|
||||
{"scala-notebook.scala", "// Databricks notebook source\nprintln('first upload')"},
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
wf, _ := setupWsfsExtensionsFiler(t)
|
||||
|
||||
for _, f := range files {
|
||||
err := wf.Write(ctx, f.name, strings.NewReader(f.content), filer.CreateParentDirectories)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
return wf
|
||||
}
|
||||
|
||||
func TestAccFilerWorkspaceFilesExtensionsRead(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
wf := setupFilerWithExtensionsTest(t)
|
||||
|
||||
// Read contents of test fixtures as a sanity check.
|
||||
filerTest{t, wf}.assertContents(ctx, "foo.py", "# Databricks notebook source\nprint('first upload'))")
|
||||
filerTest{t, wf}.assertContents(ctx, "bar.py", "print('foo')")
|
||||
filerTest{t, wf}.assertContentsJupyter(ctx, "jupyter.ipynb")
|
||||
filerTest{t, wf}.assertContents(ctx, "dir/file.txt", "file content")
|
||||
filerTest{t, wf}.assertContents(ctx, "scala-notebook.scala", "// Databricks notebook source\nprintln('first upload')")
|
||||
filerTest{t, wf}.assertContents(ctx, "pretender", "not a notebook")
|
||||
|
||||
// Read non-existent file
|
||||
_, err := wf.Read(ctx, "non-existent.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
|
||||
// Ensure we do not read a regular file as a notebook
|
||||
_, err = wf.Read(ctx, "pretender.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
_, err = wf.Read(ctx, "pretender.ipynb")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
|
||||
// Read directory
|
||||
_, err = wf.Read(ctx, "dir")
|
||||
assert.ErrorIs(t, err, fs.ErrInvalid)
|
||||
|
||||
// Ensure we do not read a Scala notebook as a Python notebook
|
||||
_, err = wf.Read(ctx, "scala-notebook.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
}
|
||||
|
||||
func TestAccFilerWorkspaceFilesExtensionsDelete(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
wf := setupFilerWithExtensionsTest(t)
|
||||
|
||||
// Delete notebook
|
||||
err := wf.Delete(ctx, "foo.py")
|
||||
require.NoError(t, err)
|
||||
filerTest{t, wf}.assertNotExists(ctx, "foo.py")
|
||||
|
||||
// Delete file
|
||||
err = wf.Delete(ctx, "bar.py")
|
||||
require.NoError(t, err)
|
||||
filerTest{t, wf}.assertNotExists(ctx, "bar.py")
|
||||
|
||||
// Delete jupyter notebook
|
||||
err = wf.Delete(ctx, "jupyter.ipynb")
|
||||
require.NoError(t, err)
|
||||
filerTest{t, wf}.assertNotExists(ctx, "jupyter.ipynb")
|
||||
|
||||
// Delete non-existent file
|
||||
err = wf.Delete(ctx, "non-existent.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
|
||||
// Ensure we do not delete a file as a notebook
|
||||
err = wf.Delete(ctx, "pretender.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
|
||||
// Ensure we do not delete a Scala notebook as a Python notebook
|
||||
_, err = wf.Read(ctx, "scala-notebook.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
|
||||
// Delete directory
|
||||
err = wf.Delete(ctx, "dir")
|
||||
assert.ErrorIs(t, err, fs.ErrInvalid)
|
||||
|
||||
// Delete directory recursively
|
||||
err = wf.Delete(ctx, "dir", filer.DeleteRecursively)
|
||||
require.NoError(t, err)
|
||||
filerTest{t, wf}.assertNotExists(ctx, "dir")
|
||||
}
|
||||
|
||||
func TestAccFilerWorkspaceFilesExtensionsStat(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
wf := setupFilerWithExtensionsTest(t)
|
||||
|
||||
// Stat on a notebook
|
||||
info, err := wf.Stat(ctx, "foo.py")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "foo.py", info.Name())
|
||||
assert.False(t, info.IsDir())
|
||||
|
||||
// Stat on a file
|
||||
info, err = wf.Stat(ctx, "bar.py")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "bar.py", info.Name())
|
||||
assert.False(t, info.IsDir())
|
||||
|
||||
// Stat on a Jupyter notebook
|
||||
info, err = wf.Stat(ctx, "jupyter.ipynb")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "jupyter.ipynb", info.Name())
|
||||
assert.False(t, info.IsDir())
|
||||
|
||||
// Stat on a directory
|
||||
info, err = wf.Stat(ctx, "dir")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, "dir", info.Name())
|
||||
assert.True(t, info.IsDir())
|
||||
|
||||
// Stat on a non-existent file
|
||||
_, err = wf.Stat(ctx, "non-existent.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
|
||||
// Ensure we do not stat a file as a notebook
|
||||
_, err = wf.Stat(ctx, "pretender.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
|
||||
// Ensure we do not stat a Scala notebook as a Python notebook
|
||||
_, err = wf.Stat(ctx, "scala-notebook.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
|
||||
_, err = wf.Stat(ctx, "pretender.ipynb")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
}
|
||||
|
||||
func TestAccFilerWorkspaceFilesExtensionsErrorsOnDupName(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
tcases := []struct {
|
||||
files []struct{ name, content string }
|
||||
name string
|
||||
}{
|
||||
{
|
||||
name: "python",
|
||||
files: []struct{ name, content string }{
|
||||
{"foo.py", "print('foo')"},
|
||||
{"foo.py", "# Databricks notebook source\nprint('foo')"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "r",
|
||||
files: []struct{ name, content string }{
|
||||
{"foo.r", "print('foo')"},
|
||||
{"foo.r", "# Databricks notebook source\nprint('foo')"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sql",
|
||||
files: []struct{ name, content string }{
|
||||
{"foo.sql", "SELECT 'foo'"},
|
||||
{"foo.sql", "-- Databricks notebook source\nSELECT 'foo'"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "scala",
|
||||
files: []struct{ name, content string }{
|
||||
{"foo.scala", "println('foo')"},
|
||||
{"foo.scala", "// Databricks notebook source\nprintln('foo')"},
|
||||
},
|
||||
},
|
||||
// We don't need to test this for ipynb notebooks. The import API
|
||||
// fails when the file extension is .ipynb but the content is not a
|
||||
// valid juptyer notebook.
|
||||
}
|
||||
|
||||
for i := range tcases {
|
||||
tc := tcases[i]
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
wf, tmpDir := setupWsfsExtensionsFiler(t)
|
||||
|
||||
for _, f := range tc.files {
|
||||
err := wf.Write(ctx, f.name, strings.NewReader(f.content), filer.CreateParentDirectories)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, err := wf.ReadDir(ctx, ".")
|
||||
assert.ErrorAs(t, err, &filer.DuplicatePathError{})
|
||||
assert.ErrorContains(t, err, fmt.Sprintf("failed to read files from the workspace file system. Duplicate paths encountered. Both NOTEBOOK at %s and FILE at %s resolve to the same name %s. Changing the name of one of these objects will resolve this issue", path.Join(tmpDir, "foo"), path.Join(tmpDir, tc.files[0].name), tc.files[0].name))
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAccWorkspaceFilesExtensionsDirectoriesAreNotNotebooks(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
wf, _ := setupWsfsExtensionsFiler(t)
|
||||
|
||||
// Create a directory with an extension
|
||||
err := wf.Mkdir(ctx, "foo")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Reading foo.py should fail. foo is a directory, not a notebook.
|
||||
_, err = wf.Read(ctx, "foo.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
}
|
||||
|
||||
func TestAccWorkspaceFilesExtensions_ExportFormatIsPreserved(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctx := context.Background()
|
||||
wf, _ := setupWsfsExtensionsFiler(t)
|
||||
|
||||
// Case 1: Source Notebook
|
||||
err := wf.Write(ctx, "foo.py", strings.NewReader("# Databricks notebook source\nprint('foo')"))
|
||||
require.NoError(t, err)
|
||||
|
||||
// The source notebook should exist but not the Jupyter notebook
|
||||
filerTest{t, wf}.assertContents(ctx, "foo.py", "# Databricks notebook source\nprint('foo')")
|
||||
_, err = wf.Stat(ctx, "foo.ipynb")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
_, err = wf.Read(ctx, "foo.ipynb")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
err = wf.Delete(ctx, "foo.ipynb")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
|
||||
// Case 2: Jupyter Notebook
|
||||
err = wf.Write(ctx, "bar.ipynb", strings.NewReader(jupyterNotebookContent1))
|
||||
require.NoError(t, err)
|
||||
|
||||
// The Jupyter notebook should exist but not the source notebook
|
||||
filerTest{t, wf}.assertContentsJupyter(ctx, "bar.ipynb")
|
||||
_, err = wf.Stat(ctx, "bar.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
_, err = wf.Read(ctx, "bar.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
err = wf.Delete(ctx, "bar.py")
|
||||
assert.ErrorIs(t, err, fs.ErrNotExist)
|
||||
}
|
||||
|
|
|
@ -559,6 +559,17 @@ func setupWsfsFiler(t *testing.T) (filer.Filer, string) {
|
|||
return f, tmpdir
|
||||
}
|
||||
|
||||
func setupWsfsExtensionsFiler(t *testing.T) (filer.Filer, string) {
|
||||
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||
|
||||
w := databricks.Must(databricks.NewWorkspaceClient())
|
||||
tmpdir := TemporaryWorkspaceDir(t, w)
|
||||
f, err := filer.NewWorkspaceFilesExtensionsClient(w, tmpdir)
|
||||
require.NoError(t, err)
|
||||
|
||||
return f, tmpdir
|
||||
}
|
||||
|
||||
func setupDbfsFiler(t *testing.T) (filer.Filer, string) {
|
||||
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))
|
||||
|
||||
|
|
|
@ -0,0 +1,345 @@
|
|||
package filer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/databricks/cli/libs/log"
|
||||
"github.com/databricks/cli/libs/notebook"
|
||||
"github.com/databricks/databricks-sdk-go"
|
||||
"github.com/databricks/databricks-sdk-go/apierr"
|
||||
"github.com/databricks/databricks-sdk-go/client"
|
||||
"github.com/databricks/databricks-sdk-go/marshal"
|
||||
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||||
)
|
||||
|
||||
type workspaceFilesExtensionsClient struct {
|
||||
workspaceClient *databricks.WorkspaceClient
|
||||
apiClient *client.DatabricksClient
|
||||
|
||||
wsfs Filer
|
||||
root string
|
||||
}
|
||||
|
||||
var extensionsToLanguages = map[string]workspace.Language{
|
||||
".py": workspace.LanguagePython,
|
||||
".r": workspace.LanguageR,
|
||||
".scala": workspace.LanguageScala,
|
||||
".sql": workspace.LanguageSql,
|
||||
".ipynb": workspace.LanguagePython,
|
||||
}
|
||||
|
||||
// workspaceFileStatus defines a custom response body for the "/api/2.0/workspace/get-status" API.
|
||||
// The "repos_export_format" field is not exposed by the SDK.
|
||||
type workspaceFileStatus struct {
|
||||
*workspace.ObjectInfo
|
||||
|
||||
// The export format of the notebook. This is not exposed by the SDK.
|
||||
ReposExportFormat workspace.ExportFormat `json:"repos_export_format,omitempty"`
|
||||
|
||||
// Name of the file to be used in any API calls made using the workspace files
|
||||
// filer. For notebooks this path does not include the extension.
|
||||
nameForWorkspaceAPI string
|
||||
}
|
||||
|
||||
// A custom unmarsaller for the workspaceFileStatus struct. This is needed because
|
||||
// workspaceFileStatus embeds the workspace.ObjectInfo which itself has a custom
|
||||
// unmarshaller.
|
||||
// If a custom unmarshaller is not provided extra fields like ReposExportFormat
|
||||
// will not have values set.
|
||||
func (s *workspaceFileStatus) UnmarshalJSON(b []byte) error {
|
||||
return marshal.Unmarshal(b, s)
|
||||
}
|
||||
|
||||
func (s *workspaceFileStatus) MarshalJSON() ([]byte, error) {
|
||||
return marshal.Marshal(s)
|
||||
}
|
||||
|
||||
func (w *workspaceFilesExtensionsClient) stat(ctx context.Context, name string) (*workspaceFileStatus, error) {
|
||||
stat := &workspaceFileStatus{
|
||||
nameForWorkspaceAPI: name,
|
||||
}
|
||||
|
||||
// Perform bespoke API call because "return_export_info" is not exposed by the SDK.
|
||||
// We need "repos_export_format" to determine if the file is a py or a ipynb notebook.
|
||||
// This is not exposed by the SDK so we need to make a direct API call.
|
||||
err := w.apiClient.Do(
|
||||
ctx,
|
||||
http.MethodGet,
|
||||
"/api/2.0/workspace/get-status",
|
||||
nil,
|
||||
map[string]string{
|
||||
"path": path.Join(w.root, name),
|
||||
"return_export_info": "true",
|
||||
},
|
||||
stat,
|
||||
)
|
||||
if err != nil {
|
||||
// If we got an API error we deal with it below.
|
||||
var aerr *apierr.APIError
|
||||
if !errors.As(err, &aerr) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// This API returns a 404 if the specified path does not exist.
|
||||
if aerr.StatusCode == http.StatusNotFound {
|
||||
return nil, FileDoesNotExistError{path.Join(w.root, name)}
|
||||
}
|
||||
}
|
||||
return stat, err
|
||||
}
|
||||
|
||||
// This function returns the stat for the provided notebook. The stat object itself contains the path
|
||||
// with the extension since it is meant to be used in the context of a fs.FileInfo.
|
||||
func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithExt(ctx context.Context, name string) (*workspaceFileStatus, error) {
|
||||
ext := path.Ext(name)
|
||||
nameWithoutExt := strings.TrimSuffix(name, ext)
|
||||
|
||||
// File name does not have an extension associated with Databricks notebooks, return early.
|
||||
if _, ok := extensionsToLanguages[ext]; !ok {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// If the file could be a notebook, check if it is and has the correct language.
|
||||
stat, err := w.stat(ctx, nameWithoutExt)
|
||||
if err != nil {
|
||||
// If the file does not exist, return early.
|
||||
if errors.As(err, &FileDoesNotExistError{}) {
|
||||
return nil, nil
|
||||
}
|
||||
log.Debugf(ctx, "attempting to determine if %s could be a notebook. Failed to fetch the status of object at %s: %s", name, path.Join(w.root, nameWithoutExt), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Not a notebook. Return early.
|
||||
if stat.ObjectType != workspace.ObjectTypeNotebook {
|
||||
log.Debugf(ctx, "attempting to determine if %s could be a notebook. Found an object at %s but it is not a notebook. It is a %s.", name, path.Join(w.root, nameWithoutExt), stat.ObjectType)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Not the correct language. Return early.
|
||||
if stat.Language != extensionsToLanguages[ext] {
|
||||
log.Debugf(ctx, "attempting to determine if %s could be a notebook. Found a notebook at %s but it is not of the correct language. Expected %s but found %s.", name, path.Join(w.root, nameWithoutExt), extensionsToLanguages[ext], stat.Language)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// When the extension is .py we expect the export format to be source.
|
||||
// If it's not, return early.
|
||||
if ext == ".py" && stat.ReposExportFormat != workspace.ExportFormatSource {
|
||||
log.Debugf(ctx, "attempting to determine if %s could be a notebook. Found a notebook at %s but it is not exported as a source notebook. Its export format is %s.", name, path.Join(w.root, nameWithoutExt), stat.ReposExportFormat)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// When the extension is .ipynb we expect the export format to be Jupyter.
|
||||
// If it's not, return early.
|
||||
if ext == ".ipynb" && stat.ReposExportFormat != workspace.ExportFormatJupyter {
|
||||
log.Debugf(ctx, "attempting to determine if %s could be a notebook. Found a notebook at %s but it is not exported as a Jupyter notebook. Its export format is %s.", name, path.Join(w.root, nameWithoutExt), stat.ReposExportFormat)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Modify the stat object path to include the extension. This stat object will be used
|
||||
// to return the fs.FileInfo object in the stat method.
|
||||
stat.Path = stat.Path + ext
|
||||
return stat, nil
|
||||
}
|
||||
|
||||
func (w *workspaceFilesExtensionsClient) getNotebookStatByNameWithoutExt(ctx context.Context, name string) (*workspaceFileStatus, error) {
|
||||
stat, err := w.stat(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// We expect this internal function to only be called from [ReadDir] when we are sure
|
||||
// that the object is a notebook. Thus, this should never happen.
|
||||
if stat.ObjectType != workspace.ObjectTypeNotebook {
|
||||
return nil, fmt.Errorf("expected object at %s to be a notebook but it is a %s", path.Join(w.root, name), stat.ObjectType)
|
||||
}
|
||||
|
||||
// Get the extension for the notebook.
|
||||
ext := notebook.GetExtensionByLanguage(stat.ObjectInfo)
|
||||
|
||||
// If the notebook was exported as a Jupyter notebook, the extension should be .ipynb.
|
||||
if stat.Language == workspace.LanguagePython && stat.ReposExportFormat == workspace.ExportFormatJupyter {
|
||||
ext = ".ipynb"
|
||||
}
|
||||
|
||||
// Modify the stat object path to include the extension. This stat object will be used
|
||||
// to return the fs.DirEntry object in the ReadDir method.
|
||||
stat.Path = stat.Path + ext
|
||||
return stat, nil
|
||||
}
|
||||
|
||||
type DuplicatePathError struct {
|
||||
oi1 workspace.ObjectInfo
|
||||
oi2 workspace.ObjectInfo
|
||||
|
||||
commonName string
|
||||
}
|
||||
|
||||
func (e DuplicatePathError) Error() string {
|
||||
return fmt.Sprintf("failed to read files from the workspace file system. Duplicate paths encountered. Both %s at %s and %s at %s resolve to the same name %s. Changing the name of one of these objects will resolve this issue", e.oi1.ObjectType, e.oi1.Path, e.oi2.ObjectType, e.oi2.Path, e.commonName)
|
||||
}
|
||||
|
||||
// This is a filer for the workspace file system that allows you to pretend the
|
||||
// workspace file system is a traditional file system. It allows you to list, read, write,
|
||||
// delete, and stat notebooks (and files in general) in the workspace, using their paths
|
||||
// with the extension included.
|
||||
//
|
||||
// The ReadDir method returns a DuplicatePathError if this traditional file system view is
|
||||
// not possible. For example, a Python notebook called foo and a Python file called `foo.py`
|
||||
// would resolve to the same path `foo.py` in a tradition file system.
|
||||
//
|
||||
// Users of this filer should be careful when using the Write and Mkdir methods.
|
||||
// The underlying import API we use to upload notebooks and files returns opaque internal
|
||||
// errors for namespace clashes (e.g. a file and a notebook or a directory and a notebook).
|
||||
// Thus users of these methods should be careful to avoid such clashes.
|
||||
func NewWorkspaceFilesExtensionsClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
|
||||
apiClient, err := client.New(w.Config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filer, err := NewWorkspaceFilesClient(w, root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &workspaceFilesExtensionsClient{
|
||||
workspaceClient: w,
|
||||
apiClient: apiClient,
|
||||
|
||||
wsfs: filer,
|
||||
root: root,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (w *workspaceFilesExtensionsClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
||||
entries, err := w.wsfs.ReadDir(ctx, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
seenPaths := make(map[string]workspace.ObjectInfo)
|
||||
for i := range entries {
|
||||
info, err := entries[i].Info()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sysInfo := info.Sys().(workspace.ObjectInfo)
|
||||
|
||||
// If the object is a notebook, include an extension in the entry.
|
||||
if sysInfo.ObjectType == workspace.ObjectTypeNotebook {
|
||||
stat, err := w.getNotebookStatByNameWithoutExt(ctx, entries[i].Name())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Replace the entry with the new entry that includes the extension.
|
||||
entries[i] = wsfsDirEntry{wsfsFileInfo{oi: *stat.ObjectInfo}}
|
||||
}
|
||||
|
||||
// Error if we have seen this path before in the current directory.
|
||||
// If not seen before, add it to the seen paths.
|
||||
if _, ok := seenPaths[entries[i].Name()]; ok {
|
||||
return nil, DuplicatePathError{
|
||||
oi1: seenPaths[entries[i].Name()],
|
||||
oi2: sysInfo,
|
||||
commonName: path.Join(name, entries[i].Name()),
|
||||
}
|
||||
}
|
||||
seenPaths[entries[i].Name()] = sysInfo
|
||||
}
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// Note: The import API returns opaque internal errors for namespace clashes
|
||||
// (e.g. a file and a notebook or a directory and a notebook). Thus users of this
|
||||
// method should be careful to avoid such clashes.
|
||||
func (w *workspaceFilesExtensionsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
|
||||
return w.wsfs.Write(ctx, name, reader, mode...)
|
||||
}
|
||||
|
||||
// Try to read the file as a regular file. If the file is not found, try to read it as a notebook.
|
||||
func (w *workspaceFilesExtensionsClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {
|
||||
r, err := w.wsfs.Read(ctx, name)
|
||||
|
||||
// If the file is not found, it might be a notebook.
|
||||
if errors.As(err, &FileDoesNotExistError{}) {
|
||||
stat, serr := w.getNotebookStatByNameWithExt(ctx, name)
|
||||
if serr != nil {
|
||||
// Unable to stat. Return the stat error.
|
||||
return nil, serr
|
||||
}
|
||||
if stat == nil {
|
||||
// Not a notebook. Return the original error.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// The workspace files filer performs an additional stat call to make sure
|
||||
// the path is not a directory. We can skip this step since we already have
|
||||
// the stat object and know that the path is a notebook.
|
||||
return w.workspaceClient.Workspace.Download(
|
||||
ctx,
|
||||
path.Join(w.root, stat.nameForWorkspaceAPI),
|
||||
workspace.DownloadFormat(stat.ReposExportFormat),
|
||||
)
|
||||
}
|
||||
return r, err
|
||||
}
|
||||
|
||||
// Try to delete the file as a regular file. If the file is not found, try to delete it as a notebook.
|
||||
func (w *workspaceFilesExtensionsClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
|
||||
err := w.wsfs.Delete(ctx, name, mode...)
|
||||
|
||||
// If the file is not found, it might be a notebook.
|
||||
if errors.As(err, &FileDoesNotExistError{}) {
|
||||
stat, serr := w.getNotebookStatByNameWithExt(ctx, name)
|
||||
if serr != nil {
|
||||
// Unable to stat. Return the stat error.
|
||||
return serr
|
||||
}
|
||||
if stat == nil {
|
||||
// Not a notebook. Return the original error.
|
||||
return err
|
||||
}
|
||||
|
||||
return w.wsfs.Delete(ctx, stat.nameForWorkspaceAPI, mode...)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Try to stat the file as a regular file. If the file is not found, try to stat it as a notebook.
|
||||
func (w *workspaceFilesExtensionsClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
|
||||
info, err := w.wsfs.Stat(ctx, name)
|
||||
|
||||
// If the file is not found, it might be a notebook.
|
||||
if errors.As(err, &FileDoesNotExistError{}) {
|
||||
stat, serr := w.getNotebookStatByNameWithExt(ctx, name)
|
||||
if serr != nil {
|
||||
// Unable to stat. Return the stat error.
|
||||
return nil, serr
|
||||
}
|
||||
if stat == nil {
|
||||
// Not a notebook. Return the original error.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return wsfsFileInfo{oi: *stat.ObjectInfo}, nil
|
||||
}
|
||||
|
||||
return info, err
|
||||
}
|
||||
|
||||
// Note: The import API returns opaque internal errors for namespace clashes
|
||||
// (e.g. a file and a notebook or a directory and a notebook). Thus users of this
|
||||
// method should be careful to avoid such clashes.
|
||||
func (w *workspaceFilesExtensionsClient) Mkdir(ctx context.Context, name string) error {
|
||||
return w.wsfs.Mkdir(ctx, name)
|
||||
}
|
Loading…
Reference in New Issue