From 1a390f7b0b412135a50c3724d361fd2e144f913c Mon Sep 17 00:00:00 2001 From: Andrew Nester Date: Fri, 17 Nov 2023 15:51:46 +0100 Subject: [PATCH] Added progress bar when uploading files to Databricks --- bundle/artifacts/artifacts.go | 4 +- cmd/fs/cp.go | 20 +++++---- cmd/fs/helpers.go | 4 +- cmd/workspace/workspace/export_dir.go | 2 +- cmd/workspace/workspace/import_dir.go | 11 +++-- go.mod | 3 ++ go.sum | 8 ++++ internal/filer_test.go | 64 +++++++++++++-------------- internal/fs_cat_test.go | 4 +- internal/fs_cp_test.go | 22 ++++----- internal/fs_ls_test.go | 10 ++--- internal/fs_mkdir_test.go | 2 +- internal/fs_rm_test.go | 6 +-- internal/python/python_tasks_test.go | 6 +-- internal/workspace_test.go | 32 +++++++------- libs/filer/dbfs_client.go | 22 ++++++++- libs/filer/filer.go | 4 +- libs/filer/files_client.go | 31 ++++++++++++- libs/filer/fs_test.go | 2 +- libs/filer/local_client.go | 2 +- libs/filer/workspace_files_client.go | 32 ++++++++++++-- libs/locker/locker.go | 4 +- libs/sync/watchdog.go | 2 +- 23 files changed, 194 insertions(+), 103 deletions(-) diff --git a/bundle/artifacts/artifacts.go b/bundle/artifacts/artifacts.go index e703668e..0b0a7e7a 100644 --- a/bundle/artifacts/artifacts.go +++ b/bundle/artifacts/artifacts.go @@ -101,7 +101,7 @@ func (m *basicUpload) Apply(ctx context.Context, b *bundle.Bundle) error { return err } - client, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), uploadPath) + client, err := filer.NewWorkspaceFilesClientWithProgressLogging(b.WorkspaceClient(), uploadPath) if err != nil { return err } @@ -151,7 +151,7 @@ func uploadArtifactFile(ctx context.Context, file string, uploadPath string, cli return "", fmt.Errorf("unable to import %s: %w", remotePath, err) } - err = client.Write(ctx, relPath, bytes.NewReader(raw), filer.OverwriteIfExists, filer.CreateParentDirectories) + err = client.Write(ctx, relPath, bytes.NewReader(raw), int64(len(raw)), filer.OverwriteIfExists, filer.CreateParentDirectories) if err != nil { return "", fmt.Errorf("unable to import %s: %w", remotePath, err) } diff --git a/cmd/fs/cp.go b/cmd/fs/cp.go index 294d2dab..8b98a082 100644 --- a/cmd/fs/cp.go +++ b/cmd/fs/cp.go @@ -45,8 +45,12 @@ func (c *copy) cpWriteCallback(sourceDir, targetDir string) fs.WalkDirFunc { if d.IsDir() { return c.targetFiler.Mkdir(c.ctx, targetPath) } + info, err := d.Info() + if err != nil { + return err + } - return c.cpFileToFile(sourcePath, targetPath) + return c.cpFileToFile(sourcePath, targetPath, info.Size()) } } @@ -59,14 +63,14 @@ func (c *copy) cpDirToDir(sourceDir, targetDir string) error { return fs.WalkDir(sourceFs, sourceDir, c.cpWriteCallback(sourceDir, targetDir)) } -func (c *copy) cpFileToDir(sourcePath, targetDir string) error { +func (c *copy) cpFileToDir(sourcePath, targetDir string, size int64) error { fileName := path.Base(sourcePath) targetPath := path.Join(targetDir, fileName) - return c.cpFileToFile(sourcePath, targetPath) + return c.cpFileToFile(sourcePath, targetPath, size) } -func (c *copy) cpFileToFile(sourcePath, targetPath string) error { +func (c *copy) cpFileToFile(sourcePath, targetPath string, size int64) error { // Get reader for file at source path r, err := c.sourceFiler.Read(c.ctx, sourcePath) if err != nil { @@ -75,12 +79,12 @@ func (c *copy) cpFileToFile(sourcePath, targetPath string) error { defer r.Close() if c.overwrite { - err = c.targetFiler.Write(c.ctx, targetPath, r, filer.OverwriteIfExists) + err = c.targetFiler.Write(c.ctx, targetPath, r, size, filer.OverwriteIfExists) if err != nil { return err } } else { - err = c.targetFiler.Write(c.ctx, targetPath, r) + err = c.targetFiler.Write(c.ctx, targetPath, r, size) // skip if file already exists if err != nil && errors.Is(err, fs.ErrExist) { return c.emitFileSkippedEvent(sourcePath, targetPath) @@ -196,11 +200,11 @@ func newCpCommand() *cobra.Command { // case 2: source path is a file, and target path is a directory. In this case // we copy the file to inside the directory if targetInfo, err := targetFiler.Stat(ctx, targetPath); err == nil && targetInfo.IsDir() { - return c.cpFileToDir(sourcePath, targetPath) + return c.cpFileToDir(sourcePath, targetPath, sourceInfo.Size()) } // case 3: source path is a file, and target path is a file - return c.cpFileToFile(sourcePath, targetPath) + return c.cpFileToFile(sourcePath, targetPath, sourceInfo.Size()) } return cmd diff --git a/cmd/fs/helpers.go b/cmd/fs/helpers.go index 43d65b5d..67ee4cdf 100644 --- a/cmd/fs/helpers.go +++ b/cmd/fs/helpers.go @@ -37,12 +37,12 @@ func filerForPath(ctx context.Context, fullPath string) (filer.Filer, string, er // If the specified path has the "Volumes" prefix, use the Files API. if strings.HasPrefix(path, "/Volumes/") { - f, err := filer.NewFilesClient(w, "/") + f, err := filer.NewFilesClientWithProgressLogging(w, "/") return f, path, err } // The file is a dbfs file, and uses the DBFS APIs - f, err := filer.NewDbfsClient(w, "/") + f, err := filer.NewDbfsClientWithProgressLogging(w, "/") return f, path, err } diff --git a/cmd/workspace/workspace/export_dir.go b/cmd/workspace/workspace/export_dir.go index 4f50a96e..7d023535 100644 --- a/cmd/workspace/workspace/export_dir.go +++ b/cmd/workspace/workspace/export_dir.go @@ -116,7 +116,7 @@ func newExportDir() *cobra.Command { opts.targetDir = args[1] // Initialize a filer and a file system on the source directory - workspaceFiler, err := filer.NewWorkspaceFilesClient(w, opts.sourceDir) + workspaceFiler, err := filer.NewWorkspaceFilesClientWithProgressLogging(w, opts.sourceDir) if err != nil { return err } diff --git a/cmd/workspace/workspace/import_dir.go b/cmd/workspace/workspace/import_dir.go index bc0b8066..cedcc915 100644 --- a/cmd/workspace/workspace/import_dir.go +++ b/cmd/workspace/workspace/import_dir.go @@ -80,15 +80,18 @@ func (opts importDirOptions) callback(ctx context.Context, workspaceFiler filer. return err } defer f.Close() - + info, err := d.Info() + if err != nil { + return err + } // Create file in WSFS if overwrite { - err = workspaceFiler.Write(ctx, nameForApiCall, f, filer.OverwriteIfExists) + err = workspaceFiler.Write(ctx, nameForApiCall, f, info.Size(), filer.OverwriteIfExists) if err != nil { return err } } else { - err = workspaceFiler.Write(ctx, nameForApiCall, f) + err = workspaceFiler.Write(ctx, nameForApiCall, f, info.Size()) if errors.Is(err, fs.ErrExist) { // Emit file skipped event with the appropriate template fileSkippedEvent := newFileSkippedEvent(localName, path.Join(targetDir, remoteName)) @@ -129,7 +132,7 @@ Notebooks will have their extensions (one of .scala, .py, .sql, .ipynb, .r) stri opts.targetDir = args[1] // Initialize a filer rooted at targetDir - workspaceFiler, err := filer.NewWorkspaceFilesClient(w, opts.targetDir) + workspaceFiler, err := filer.NewWorkspaceFilesClientWithProgressLogging(w, opts.targetDir) if err != nil { return err } diff --git a/go.mod b/go.mod index 7cef4cd4..9e7f3427 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,10 @@ require ( github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rivo/uniseg v0.4.4 // indirect + github.com/schollz/progressbar/v3 v3.14.1 // indirect github.com/zclconf/go-cty v1.14.1 // indirect go.opencensus.io v0.24.0 // indirect golang.org/x/crypto v0.15.0 // indirect diff --git a/go.sum b/go.sum index 25409bd6..ec793179 100644 --- a/go.sum +++ b/go.sum @@ -108,6 +108,7 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= +github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4= github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/manifoldco/promptui v0.9.0 h1:3V4HzJk1TtXW1MTZMP7mdlwbBpIinw3HztaIlYthEiA= @@ -120,6 +121,8 @@ github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOA github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db h1:62I3jR2EmQ4l5rM/4FEfDWcRD+abF5XlKShorW5LRoQ= +github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db/go.mod h1:l0dey0ia/Uv7NcFFVbCLtqEBQbrT4OCwCSKTEv6enCw= github.com/nwidger/jsoncolor v0.3.2 h1:rVJJlwAWDJShnbTYOQ5RM7yTA20INyKXlJ/fg4JMhHQ= github.com/nwidger/jsoncolor v0.3.2/go.mod h1:Cs34umxLbJvgBMnVNVqhji9BhoT/N/KinHqZptQ7cf4= github.com/pjbgf/sha1cd v0.3.0 h1:4D5XXmUUBUl/xQ6IjCkEAbqXskkq/4O7LmGn0AqMDs4= @@ -129,9 +132,13 @@ github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzL github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= +github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06 h1:OkMGxebDjyw0ULyrTYWeN0UNCCkmCWfjPnIA2W6oviI= github.com/sabhiram/go-gitignore v0.0.0-20210923224102-525f6e181f06/go.mod h1:+ePHsJ1keEjQtpvf9HHw0f4ZeJ0TLRsxhunSI2hYJSs= +github.com/schollz/progressbar/v3 v3.14.1 h1:VD+MJPCr4s3wdhTc7OEJ/Z3dAeBzJ7yKH/P4lC5yRTI= +github.com/schollz/progressbar/v3 v3.14.1/go.mod h1:Zc9xXneTzWXF81TGoqL71u0sBPjULtEHYtj/WVgVy8E= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/skeema/knownhosts v1.2.0 h1:h9r9cf0+u7wSE+M183ZtMGgOJKiL96brpaz5ekfJCpM= @@ -143,6 +150,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= diff --git a/internal/filer_test.go b/internal/filer_test.go index b1af6886..78737127 100644 --- a/internal/filer_test.go +++ b/internal/filer_test.go @@ -44,7 +44,7 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { var err error // Write should fail because the root path doesn't yet exist. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`)) + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), -1) assert.True(t, errors.As(err, &filer.NoSuchDirectoryError{})) assert.True(t, errors.Is(err, fs.ErrNotExist)) @@ -60,22 +60,22 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.ErrorIs(t, err, fs.ErrInvalid) // Write with CreateParentDirectories flag should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), -1, filer.CreateParentDirectories) assert.NoError(t, err) filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`) // Write should fail because there is an existing file at the specified path. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`)) + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), -1) assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{})) assert.True(t, errors.Is(err, fs.ErrExist)) // Write with OverwriteIfExists should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists) + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), -1, filer.OverwriteIfExists) assert.NoError(t, err) filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`) // Write should succeed if there is no existing file at the specified path. - err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`)) + err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`), -1) assert.NoError(t, err) // Stat on a directory should succeed. @@ -134,7 +134,7 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.Len(t, entries, 0) // Write a file. - err = f.Write(ctx, "/hello.txt", strings.NewReader(`hello world`)) + err = f.Write(ctx, "/hello.txt", strings.NewReader(`hello world`), -1) require.NoError(t, err) // Create a directory. @@ -142,7 +142,7 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { require.NoError(t, err) // Write a file. - err = f.Write(ctx, "/dir/world.txt", strings.NewReader(`hello world`)) + err = f.Write(ctx, "/dir/world.txt", strings.NewReader(`hello world`), -1) require.NoError(t, err) // Create a nested directory (check that it creates intermediate directories). @@ -197,7 +197,7 @@ func runFilerReadDirTest(t *testing.T, ctx context.Context, f filer.Filer) { assert.Len(t, entries, 0) // Expect one entry for a directory with a file in it - err = f.Write(ctx, "dir-with-one-file/my-file.txt", strings.NewReader("abc"), filer.CreateParentDirectories) + err = f.Write(ctx, "dir-with-one-file/my-file.txt", strings.NewReader("abc"), -1, filer.CreateParentDirectories) require.NoError(t, err) entries, err = f.ReadDir(ctx, "dir-with-one-file") assert.NoError(t, err) @@ -309,15 +309,15 @@ func TestAccFilerWorkspaceNotebookConflict(t *testing.T) { var err error // Upload the notebooks - err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('first upload'))")) + err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('first upload'))"), -1) require.NoError(t, err) - err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('first upload'))")) + err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('first upload'))"), -1) require.NoError(t, err) - err = f.Write(ctx, "sqlNb.sql", strings.NewReader("-- Databricks notebook source\n SELECT \"first upload\"")) + err = f.Write(ctx, "sqlNb.sql", strings.NewReader("-- Databricks notebook source\n SELECT \"first upload\""), -1) require.NoError(t, err) - err = f.Write(ctx, "scalaNb.scala", strings.NewReader("// Databricks notebook source\n println(\"first upload\"))")) + err = f.Write(ctx, "scalaNb.scala", strings.NewReader("// Databricks notebook source\n println(\"first upload\"))"), -1) require.NoError(t, err) - err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent1)) + err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent1), -1) require.NoError(t, err) // Assert contents after initial upload @@ -328,23 +328,23 @@ func TestAccFilerWorkspaceNotebookConflict(t *testing.T) { filerTest{t, f}.assertContents(ctx, "jupyterNb", "# Databricks notebook source\nprint(\"Jupyter Notebook Version 1\")") // Assert uploading a second time fails due to overwrite mode missing - err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('second upload'))")) + err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('second upload'))"), -1) assert.ErrorIs(t, err, fs.ErrExist) assert.Regexp(t, regexp.MustCompile(`file already exists: .*/pyNb$`), err.Error()) - err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('second upload'))")) + err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('second upload'))"), -1) assert.ErrorIs(t, err, fs.ErrExist) assert.Regexp(t, regexp.MustCompile(`file already exists: .*/rNb$`), err.Error()) - err = f.Write(ctx, "sqlNb.sql", strings.NewReader("# Databricks notebook source\n SELECT \"second upload\")")) + err = f.Write(ctx, "sqlNb.sql", strings.NewReader("# Databricks notebook source\n SELECT \"second upload\")"), -1) assert.ErrorIs(t, err, fs.ErrExist) assert.Regexp(t, regexp.MustCompile(`file already exists: .*/sqlNb$`), err.Error()) - err = f.Write(ctx, "scalaNb.scala", strings.NewReader("# Databricks notebook source\n println(\"second upload\"))")) + err = f.Write(ctx, "scalaNb.scala", strings.NewReader("# Databricks notebook source\n println(\"second upload\"))"), -1) assert.ErrorIs(t, err, fs.ErrExist) assert.Regexp(t, regexp.MustCompile(`file already exists: .*/scalaNb$`), err.Error()) - err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent2)) + err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent2), -1) assert.ErrorIs(t, err, fs.ErrExist) assert.Regexp(t, regexp.MustCompile(`file already exists: .*/jupyterNb$`), err.Error()) } @@ -354,15 +354,15 @@ func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) { var err error // Upload notebooks - err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('first upload'))")) + err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('first upload'))"), -1) require.NoError(t, err) - err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('first upload'))")) + err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('first upload'))"), -1) require.NoError(t, err) - err = f.Write(ctx, "sqlNb.sql", strings.NewReader("-- Databricks notebook source\n SELECT \"first upload\"")) + err = f.Write(ctx, "sqlNb.sql", strings.NewReader("-- Databricks notebook source\n SELECT \"first upload\""), -1) require.NoError(t, err) - err = f.Write(ctx, "scalaNb.scala", strings.NewReader("// Databricks notebook source\n println(\"first upload\"))")) + err = f.Write(ctx, "scalaNb.scala", strings.NewReader("// Databricks notebook source\n println(\"first upload\"))"), -1) require.NoError(t, err) - err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent1)) + err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent1), -1) require.NoError(t, err) // Assert contents after initial upload @@ -373,15 +373,15 @@ func TestAccFilerWorkspaceNotebookWithOverwriteFlag(t *testing.T) { filerTest{t, f}.assertContents(ctx, "jupyterNb", "# Databricks notebook source\nprint(\"Jupyter Notebook Version 1\")") // Upload notebooks a second time, overwriting the initial uplaods - err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('second upload'))"), filer.OverwriteIfExists) + err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint('second upload'))"), -1, filer.OverwriteIfExists) require.NoError(t, err) - err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('second upload'))"), filer.OverwriteIfExists) + err = f.Write(ctx, "rNb.r", strings.NewReader("# Databricks notebook source\nprint('second upload'))"), -1, filer.OverwriteIfExists) require.NoError(t, err) - err = f.Write(ctx, "sqlNb.sql", strings.NewReader("-- Databricks notebook source\n SELECT \"second upload\""), filer.OverwriteIfExists) + err = f.Write(ctx, "sqlNb.sql", strings.NewReader("-- Databricks notebook source\n SELECT \"second upload\""), -1, filer.OverwriteIfExists) require.NoError(t, err) - err = f.Write(ctx, "scalaNb.scala", strings.NewReader("// Databricks notebook source\n println(\"second upload\"))"), filer.OverwriteIfExists) + err = f.Write(ctx, "scalaNb.scala", strings.NewReader("// Databricks notebook source\n println(\"second upload\"))"), -1, filer.OverwriteIfExists) require.NoError(t, err) - err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent2), filer.OverwriteIfExists) + err = f.Write(ctx, "jupyterNb.ipynb", strings.NewReader(jupyterNotebookContent2), -1, filer.OverwriteIfExists) require.NoError(t, err) // Assert contents have been overwritten @@ -459,22 +459,22 @@ func TestAccFilerFilesApiReadWrite(t *testing.T) { // assert.ErrorIs(t, err, fs.ErrInvalid) // Write with CreateParentDirectories flag should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories) + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), -1, filer.CreateParentDirectories) assert.NoError(t, err) filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello world`) // Write should fail because there is an existing file at the specified path. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`)) + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), -1) assert.True(t, errors.As(err, &filer.FileAlreadyExistsError{})) assert.True(t, errors.Is(err, fs.ErrExist)) // Write with OverwriteIfExists should succeed. - err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), filer.OverwriteIfExists) + err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello universe`), -1, filer.OverwriteIfExists) assert.NoError(t, err) filerTest{t, f}.assertContents(ctx, "/foo/bar", `hello universe`) // Write should succeed if there is no existing file at the specified path. - err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`)) + err = f.Write(ctx, "/foo/qux", strings.NewReader(`hello universe`), -1) assert.NoError(t, err) // Stat on a directory should succeed. diff --git a/internal/fs_cat_test.go b/internal/fs_cat_test.go index 2c979ea7..91e9df6f 100644 --- a/internal/fs_cat_test.go +++ b/internal/fs_cat_test.go @@ -25,7 +25,7 @@ func TestAccFsCatForDbfs(t *testing.T) { f, err := filer.NewDbfsClient(w, tmpDir) require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) + err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), -1, filer.CreateParentDirectories) require.NoError(t, err) stdout, stderr := RequireSuccessfulRun(t, "fs", "cat", "dbfs:"+path.Join(tmpDir, "a", "hello.txt")) @@ -59,7 +59,7 @@ func TestAccFsCatDoesNotSupportOutputModeJson(t *testing.T) { f, err := filer.NewDbfsClient(w, tmpDir) require.NoError(t, err) - err = f.Write(ctx, "hello.txt", strings.NewReader("abc")) + err = f.Write(ctx, "hello.txt", strings.NewReader("abc"), -1) require.NoError(t, err) _, _, err = RequireErrorRun(t, "fs", "cat", "dbfs:"+path.Join(tmpDir, "hello.txt"), "--output=json") diff --git a/internal/fs_cp_test.go b/internal/fs_cp_test.go index 3b73b48d..57b2fa62 100644 --- a/internal/fs_cp_test.go +++ b/internal/fs_cp_test.go @@ -18,18 +18,18 @@ import ( func setupSourceDir(t *testing.T, ctx context.Context, f filer.Filer) { var err error - err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint(123)")) + err = f.Write(ctx, "pyNb.py", strings.NewReader("# Databricks notebook source\nprint(123)"), -1) require.NoError(t, err) - err = f.Write(ctx, "query.sql", strings.NewReader("SELECT 1")) + err = f.Write(ctx, "query.sql", strings.NewReader("SELECT 1"), -1) require.NoError(t, err) - err = f.Write(ctx, "a/b/c/hello.txt", strings.NewReader("hello, world\n"), filer.CreateParentDirectories) + err = f.Write(ctx, "a/b/c/hello.txt", strings.NewReader("hello, world\n"), -1, filer.CreateParentDirectories) require.NoError(t, err) } func setupSourceFile(t *testing.T, ctx context.Context, f filer.Filer) { - err := f.Write(ctx, "foo.txt", strings.NewReader("abc")) + err := f.Write(ctx, "foo.txt", strings.NewReader("abc"), -1) require.NoError(t, err) } @@ -150,7 +150,7 @@ func TestAccFsCpDirToDirFileNotOverwritten(t *testing.T) { setupSourceDir(t, ctx, sourceFiler) // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) + err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), -1, filer.CreateParentDirectories) require.NoError(t, err) RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive") @@ -170,7 +170,7 @@ func TestAccFsCpFileToDirFileNotOverwritten(t *testing.T) { setupSourceDir(t, ctx, sourceFiler) // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) + err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this should not be overwritten"), -1, filer.CreateParentDirectories) require.NoError(t, err) RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c")) @@ -188,7 +188,7 @@ func TestAccFsCpFileToFileFileNotOverwritten(t *testing.T) { setupSourceDir(t, ctx, sourceFiler) // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this should not be overwritten"), filer.CreateParentDirectories) + err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this should not be overwritten"), -1, filer.CreateParentDirectories) require.NoError(t, err) RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/hola.txt"), "--recursive") @@ -206,7 +206,7 @@ func TestAccFsCpDirToDirWithOverwriteFlag(t *testing.T) { setupSourceDir(t, ctx, sourceFiler) // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten"), filer.CreateParentDirectories) + err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten"), -1, filer.CreateParentDirectories) require.NoError(t, err) RequireSuccessfulRun(t, "fs", "cp", sourceDir, targetDir, "--recursive", "--overwrite") @@ -224,7 +224,7 @@ func TestAccFsCpFileToFileWithOverwriteFlag(t *testing.T) { setupSourceDir(t, ctx, sourceFiler) // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this will be overwritten. Such is life."), filer.CreateParentDirectories) + err := targetFiler.Write(ctx, "a/b/c/hola.txt", strings.NewReader("this will be overwritten. Such is life."), -1, filer.CreateParentDirectories) require.NoError(t, err) RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c/hola.txt"), "--overwrite") @@ -242,7 +242,7 @@ func TestAccFsCpFileToDirWithOverwriteFlag(t *testing.T) { setupSourceDir(t, ctx, sourceFiler) // Write a conflicting file to target - err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten :') "), filer.CreateParentDirectories) + err := targetFiler.Write(ctx, "a/b/c/hello.txt", strings.NewReader("this will be overwritten :') "), -1, filer.CreateParentDirectories) require.NoError(t, err) RequireSuccessfulRun(t, "fs", "cp", path.Join(sourceDir, "a/b/c/hello.txt"), path.Join(targetDir, "a/b/c"), "--recursive", "--overwrite") @@ -279,7 +279,7 @@ func TestAccFsCpSourceIsDirectoryButTargetIsFile(t *testing.T) { setupSourceDir(t, ctx, sourceFiler) // Write a conflicting file to target - err := targetFiler.Write(ctx, "my_target", strings.NewReader("I'll block any attempts to recursively copy"), filer.CreateParentDirectories) + err := targetFiler.Write(ctx, "my_target", strings.NewReader("I'll block any attempts to recursively copy"), -1, filer.CreateParentDirectories) require.NoError(t, err) _, _, err = RequireErrorRun(t, "fs", "cp", sourceDir, path.Join(targetDir, "my_target"), "--recursive", "--overwrite") diff --git a/internal/fs_ls_test.go b/internal/fs_ls_test.go index 9e02b09c..041844d7 100644 --- a/internal/fs_ls_test.go +++ b/internal/fs_ls_test.go @@ -30,9 +30,9 @@ func TestAccFsLsForDbfs(t *testing.T) { err = f.Mkdir(ctx, "a") require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) + err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), -1, filer.CreateParentDirectories) require.NoError(t, err) - err = f.Write(ctx, "bye.txt", strings.NewReader("def")) + err = f.Write(ctx, "bye.txt", strings.NewReader("def"), -1) require.NoError(t, err) stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json") @@ -65,9 +65,9 @@ func TestAccFsLsForDbfsWithAbsolutePaths(t *testing.T) { err = f.Mkdir(ctx, "a") require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) + err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), -1, filer.CreateParentDirectories) require.NoError(t, err) - err = f.Write(ctx, "bye.txt", strings.NewReader("def")) + err = f.Write(ctx, "bye.txt", strings.NewReader("def"), -1) require.NoError(t, err) stdout, stderr := RequireSuccessfulRun(t, "fs", "ls", "dbfs:"+tmpDir, "--output=json", "--absolute") @@ -101,7 +101,7 @@ func TestAccFsLsForDbfsOnFile(t *testing.T) { err = f.Mkdir(ctx, "a") require.NoError(t, err) - err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), filer.CreateParentDirectories) + err = f.Write(ctx, "a/hello.txt", strings.NewReader("abc"), -1, filer.CreateParentDirectories) require.NoError(t, err) _, _, err = RequireErrorRun(t, "fs", "ls", "dbfs:"+path.Join(tmpDir, "a", "hello.txt"), "--output=json") diff --git a/internal/fs_mkdir_test.go b/internal/fs_mkdir_test.go index af0e9d18..c18b930a 100644 --- a/internal/fs_mkdir_test.go +++ b/internal/fs_mkdir_test.go @@ -106,7 +106,7 @@ func TestAccFsMkdirWhenFileExistsAtPath(t *testing.T) { // create file hello f, err := filer.NewDbfsClient(w, tmpDir) require.NoError(t, err) - err = f.Write(ctx, "hello", strings.NewReader("abc")) + err = f.Write(ctx, "hello", strings.NewReader("abc"), -1) require.NoError(t, err) // assert run fails diff --git a/internal/fs_rm_test.go b/internal/fs_rm_test.go index d70827d1..7b8e0363 100644 --- a/internal/fs_rm_test.go +++ b/internal/fs_rm_test.go @@ -26,7 +26,7 @@ func TestAccFsRmForFile(t *testing.T) { require.NoError(t, err) // create file to delete - err = f.Write(ctx, "hello.txt", strings.NewReader("abc")) + err = f.Write(ctx, "hello.txt", strings.NewReader("abc"), -1) require.NoError(t, err) // check file was created @@ -90,7 +90,7 @@ func TestAccFsRmForNonEmptyDirectory(t *testing.T) { require.NoError(t, err) // create file in dir - err = f.Write(ctx, "avacado/guacamole", strings.NewReader("abc"), filer.CreateParentDirectories) + err = f.Write(ctx, "avacado/guacamole", strings.NewReader("abc"), -1, filer.CreateParentDirectories) require.NoError(t, err) // check file was created @@ -126,7 +126,7 @@ func TestAccFsRmForNonEmptyDirectoryWithRecursiveFlag(t *testing.T) { require.NoError(t, err) // create file in dir - err = f.Write(ctx, "avacado/guacamole", strings.NewReader("abc"), filer.CreateParentDirectories) + err = f.Write(ctx, "avacado/guacamole", strings.NewReader("abc"), -1, filer.CreateParentDirectories) require.NoError(t, err) // check file was created diff --git a/internal/python/python_tasks_test.go b/internal/python/python_tasks_test.go index fde9b37f..dc58ca25 100644 --- a/internal/python/python_tasks_test.go +++ b/internal/python/python_tasks_test.go @@ -232,16 +232,16 @@ func prepareDBFSFiles(t *testing.T) *testFiles { f, err := filer.NewDbfsClient(w, baseDir) require.NoError(t, err) - err = f.Write(ctx, "test.py", strings.NewReader(PY_CONTENT)) + err = f.Write(ctx, "test.py", strings.NewReader(PY_CONTENT), -1) require.NoError(t, err) - err = f.Write(ctx, "spark.py", strings.NewReader(SPARK_PY_CONTENT)) + err = f.Write(ctx, "spark.py", strings.NewReader(SPARK_PY_CONTENT), -1) require.NoError(t, err) raw, err := os.ReadFile("./testdata/my_test_code-0.0.1-py3-none-any.whl") require.NoError(t, err) - err = f.Write(ctx, "my_test_code-0.0.1-py3-none-any.whl", bytes.NewReader(raw)) + err = f.Write(ctx, "my_test_code-0.0.1-py3-none-any.whl", bytes.NewReader(raw), -1) require.NoError(t, err) return &testFiles{ diff --git a/internal/workspace_test.go b/internal/workspace_test.go index a6e641b6..54b291c2 100644 --- a/internal/workspace_test.go +++ b/internal/workspace_test.go @@ -53,7 +53,7 @@ func TestWorkpaceExportPrintsContents(t *testing.T) { // Write file to workspace contents := "#!/usr/bin/bash\necho hello, world\n" - err = f.Write(ctx, "file-a", strings.NewReader(contents)) + err = f.Write(ctx, "file-a", strings.NewReader(contents), -1) require.NoError(t, err) // Run export @@ -110,15 +110,15 @@ func TestAccExportDir(t *testing.T) { var err error // Write test data to the workspace - err = f.Write(ctx, "file-a", strings.NewReader("abc")) + err = f.Write(ctx, "file-a", strings.NewReader("abc"), -1) require.NoError(t, err) - err = f.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source")) + err = f.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source"), -1) require.NoError(t, err) - err = f.Write(ctx, "sqlNotebook.sql", strings.NewReader("-- Databricks notebook source")) + err = f.Write(ctx, "sqlNotebook.sql", strings.NewReader("-- Databricks notebook source"), -1) require.NoError(t, err) - err = f.Write(ctx, "scalaNotebook.scala", strings.NewReader("// Databricks notebook source")) + err = f.Write(ctx, "scalaNotebook.scala", strings.NewReader("// Databricks notebook source"), -1) require.NoError(t, err) - err = f.Write(ctx, "rNotebook.r", strings.NewReader("# Databricks notebook source")) + err = f.Write(ctx, "rNotebook.r", strings.NewReader("# Databricks notebook source"), -1) require.NoError(t, err) err = f.Write(ctx, "a/b/c/file-b", strings.NewReader("def"), filer.CreateParentDirectories) require.NoError(t, err) @@ -142,7 +142,7 @@ func TestAccExportDirDoesNotOverwrite(t *testing.T) { var err error // Write remote file - err = f.Write(ctx, "file-a", strings.NewReader("content from workspace")) + err = f.Write(ctx, "file-a", strings.NewReader("content from workspace"), -1) require.NoError(t, err) // Write local file @@ -163,7 +163,7 @@ func TestAccExportDirWithOverwriteFlag(t *testing.T) { var err error // Write remote file - err = f.Write(ctx, "file-a", strings.NewReader("content from workspace")) + err = f.Write(ctx, "file-a", strings.NewReader("content from workspace"), -1) require.NoError(t, err) // Write local file @@ -197,9 +197,9 @@ func TestAccImportDirDoesNotOverwrite(t *testing.T) { var err error // create preexisting files in the workspace - err = workspaceFiler.Write(ctx, "file-a", strings.NewReader("old file")) + err = workspaceFiler.Write(ctx, "file-a", strings.NewReader("old file"), -1) require.NoError(t, err) - err = workspaceFiler.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source\nprint(\"old notebook\")")) + err = workspaceFiler.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source\nprint(\"old notebook\")"), -1) require.NoError(t, err) // Assert contents of pre existing files @@ -225,9 +225,9 @@ func TestAccImportDirWithOverwriteFlag(t *testing.T) { var err error // create preexisting files in the workspace - err = workspaceFiler.Write(ctx, "file-a", strings.NewReader("old file")) + err = workspaceFiler.Write(ctx, "file-a", strings.NewReader("old file"), -1) require.NoError(t, err) - err = workspaceFiler.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source\nprint(\"old notebook\")")) + err = workspaceFiler.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source\nprint(\"old notebook\")"), -1) require.NoError(t, err) // Assert contents of pre existing files @@ -254,7 +254,7 @@ func TestAccExport(t *testing.T) { var err error // Export vanilla file - err = f.Write(ctx, "file-a", strings.NewReader("abc")) + err = f.Write(ctx, "file-a", strings.NewReader("abc"), -1) require.NoError(t, err) stdout, _ := RequireSuccessfulRun(t, "workspace", "export", path.Join(sourceDir, "file-a")) b, err := io.ReadAll(&stdout) @@ -262,7 +262,7 @@ func TestAccExport(t *testing.T) { assert.Equal(t, "abc", string(b)) // Export python notebook - err = f.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source")) + err = f.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source"), -1) require.NoError(t, err) stdout, _ = RequireSuccessfulRun(t, "workspace", "export", path.Join(sourceDir, "pyNotebook")) b, err = io.ReadAll(&stdout) @@ -284,7 +284,7 @@ func TestAccExportWithFileFlag(t *testing.T) { var err error // Export vanilla file - err = f.Write(ctx, "file-a", strings.NewReader("abc")) + err = f.Write(ctx, "file-a", strings.NewReader("abc"), -1) require.NoError(t, err) stdout, _ := RequireSuccessfulRun(t, "workspace", "export", path.Join(sourceDir, "file-a"), "--file", filepath.Join(localTmpDir, "file.txt")) b, err := io.ReadAll(&stdout) @@ -294,7 +294,7 @@ func TestAccExportWithFileFlag(t *testing.T) { assertLocalFileContents(t, filepath.Join(localTmpDir, "file.txt"), "abc") // Export python notebook - err = f.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source")) + err = f.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source"), -1) require.NoError(t, err) stdout, _ = RequireSuccessfulRun(t, "workspace", "export", path.Join(sourceDir, "pyNotebook"), "--file", filepath.Join(localTmpDir, "pyNb.py")) b, err = io.ReadAll(&stdout) diff --git a/libs/filer/dbfs_client.go b/libs/filer/dbfs_client.go index 38e8f9f3..996088f8 100644 --- a/libs/filer/dbfs_client.go +++ b/libs/filer/dbfs_client.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/service/files" + progressbar "github.com/schollz/progressbar/v3" ) // Type that implements fs.DirEntry for DBFS. @@ -69,6 +70,8 @@ type DbfsClient struct { // File operations will be relative to this path. root WorkspaceRootPath + + bar *progressbar.ProgressBar } func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { @@ -79,7 +82,16 @@ func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) { }, nil } -func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { +func NewDbfsClientWithProgressLogging(w *databricks.WorkspaceClient, root string) (Filer, error) { + return &DbfsClient{ + workspaceClient: w, + + root: NewWorkspaceRootPath(root), + bar: progressbar.DefaultBytes(100, "uploading"), + }, nil +} + +func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, size int64, mode ...WriteMode) error { absPath, err := w.root.Join(name) if err != nil { return err @@ -131,7 +143,13 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m return err } - _, err = io.Copy(handle, reader) + var writer io.Writer = handle + if w.bar != nil { + w.bar.ChangeMax64(size) + writer = io.MultiWriter(handle, w.bar) + } + + _, err = io.Copy(writer, reader) cerr := handle.Close() if err == nil { err = cerr diff --git a/libs/filer/filer.go b/libs/filer/filer.go index 8267dc34..02041502 100644 --- a/libs/filer/filer.go +++ b/libs/filer/filer.go @@ -108,7 +108,9 @@ func (err CannotDeleteRootError) Is(other error) bool { type Filer interface { // Write file at `path`. // Use the mode to further specify behavior. - Write(ctx context.Context, path string, reader io.Reader, mode ...WriteMode) error + // size is used to indicate size of tge content being uploaded and visualise the write progress with percentage + // it can be set to -1 if the size is unknown. + Write(ctx context.Context, path string, reader io.Reader, size int64, mode ...WriteMode) error // Read file at `path`. Read(ctx context.Context, path string) (io.ReadCloser, error) diff --git a/libs/filer/files_client.go b/libs/filer/files_client.go index 17884d57..5d1ed931 100644 --- a/libs/filer/files_client.go +++ b/libs/filer/files_client.go @@ -17,6 +17,7 @@ import ( "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/client" + "github.com/schollz/progressbar/v3" ) // Type that implements fs.FileInfo for the Files API. @@ -61,6 +62,8 @@ type FilesClient struct { // File operations will be relative to this path. root WorkspaceRootPath + + bar *progressbar.ProgressBar } func filesNotImplementedError(fn string) error { @@ -81,6 +84,22 @@ func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) { }, nil } +func NewFilesClientWithProgressLogging(w *databricks.WorkspaceClient, root string) (Filer, error) { + apiClient, err := client.New(w.Config) + if err != nil { + return nil, err + } + + return &FilesClient{ + workspaceClient: w, + apiClient: apiClient, + + root: NewWorkspaceRootPath(root), + + bar: progressbar.DefaultBytes(100, "uploading"), + }, nil +} + func (w *FilesClient) urlPath(name string) (string, string, error) { absPath, err := w.root.Join(name) if err != nil { @@ -96,7 +115,7 @@ func (w *FilesClient) urlPath(name string) (string, string, error) { return absPath, urlPath, nil } -func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { +func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, size int64, mode ...WriteMode) error { absPath, urlPath, err := w.urlPath(name) if err != nil { return err @@ -105,7 +124,15 @@ func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, overwrite := slices.Contains(mode, OverwriteIfExists) urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite) headers := map[string]string{"Content-Type": "application/octet-stream"} - err = w.apiClient.Do(ctx, http.MethodPut, urlPath, headers, reader, nil) + + r := reader + if w.bar != nil { + w.bar.ChangeMax64(size) + reader := progressbar.NewReader(r, w.bar) + r = &reader + } + + err = w.apiClient.Do(ctx, http.MethodPut, urlPath, headers, r, nil) // Return early on success. if err == nil { diff --git a/libs/filer/fs_test.go b/libs/filer/fs_test.go index 03ed312b..a8c8ce46 100644 --- a/libs/filer/fs_test.go +++ b/libs/filer/fs_test.go @@ -66,7 +66,7 @@ type fakeFiler struct { entries map[string]fakeFileInfo } -func (f *fakeFiler) Write(ctx context.Context, p string, reader io.Reader, mode ...WriteMode) error { +func (f *fakeFiler) Write(ctx context.Context, p string, reader io.Reader, size int64, mode ...WriteMode) error { return fmt.Errorf("not implemented") } diff --git a/libs/filer/local_client.go b/libs/filer/local_client.go index 958b6277..60a83414 100644 --- a/libs/filer/local_client.go +++ b/libs/filer/local_client.go @@ -21,7 +21,7 @@ func NewLocalClient(root string) (Filer, error) { }, nil } -func (w *LocalClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { +func (w *LocalClient) Write(ctx context.Context, name string, reader io.Reader, size int64, mode ...WriteMode) error { absPath, err := w.root.Join(name) if err != nil { return err diff --git a/libs/filer/workspace_files_client.go b/libs/filer/workspace_files_client.go index 41e35d9d..c2f688a7 100644 --- a/libs/filer/workspace_files_client.go +++ b/libs/filer/workspace_files_client.go @@ -20,6 +20,7 @@ import ( "github.com/databricks/databricks-sdk-go/apierr" "github.com/databricks/databricks-sdk-go/client" "github.com/databricks/databricks-sdk-go/service/workspace" + "github.com/schollz/progressbar/v3" ) // Type that implements fs.DirEntry for WSFS. @@ -79,6 +80,8 @@ type WorkspaceFilesClient struct { // File operations will be relative to this path. root WorkspaceRootPath + + bar *progressbar.ProgressBar } func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) { @@ -95,7 +98,23 @@ func NewWorkspaceFilesClient(w *databricks.WorkspaceClient, root string) (Filer, }, nil } -func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error { +func NewWorkspaceFilesClientWithProgressLogging(w *databricks.WorkspaceClient, root string) (Filer, error) { + apiClient, err := client.New(w.Config) + if err != nil { + return nil, err + } + + return &WorkspaceFilesClient{ + workspaceClient: w, + apiClient: apiClient, + + root: NewWorkspaceRootPath(root), + + bar: progressbar.DefaultBytes(100), + }, nil +} + +func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io.Reader, size int64, mode ...WriteMode) error { absPath, err := w.root.Join(name) if err != nil { return err @@ -115,7 +134,14 @@ func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io return err } - err = w.apiClient.Do(ctx, http.MethodPost, urlPath, nil, body, nil) + var r io.Reader = bytes.NewBuffer(body) + if w.bar != nil { + w.bar.ChangeMax64(size) + reader := progressbar.NewReader(r, w.bar) + r = &reader + } + + err = w.apiClient.Do(ctx, http.MethodPost, urlPath, nil, r, nil) // Return early on success. if err == nil { @@ -141,7 +167,7 @@ func (w *WorkspaceFilesClient) Write(ctx context.Context, name string, reader io } // Retry without CreateParentDirectories mode flag. - return w.Write(ctx, name, bytes.NewReader(body), sliceWithout(mode, CreateParentDirectories)...) + return w.Write(ctx, name, bytes.NewReader(body), int64(len(body)), sliceWithout(mode, CreateParentDirectories)...) } // This API returns 409 if the file already exists, when the object type is file diff --git a/libs/locker/locker.go b/libs/locker/locker.go index b0d65c42..0c56933d 100644 --- a/libs/locker/locker.go +++ b/libs/locker/locker.go @@ -118,7 +118,7 @@ func (locker *Locker) Write(ctx context.Context, pathToFile string, content []by if !locker.Active { return fmt.Errorf("failed to put file. deploy lock not held") } - return locker.filer.Write(ctx, pathToFile, bytes.NewReader(content), filer.OverwriteIfExists, filer.CreateParentDirectories) + return locker.filer.Write(ctx, pathToFile, bytes.NewReader(content), int64(len(content)), filer.OverwriteIfExists, filer.CreateParentDirectories) } func (locker *Locker) Read(ctx context.Context, path string) (io.ReadCloser, error) { @@ -150,7 +150,7 @@ func (locker *Locker) Lock(ctx context.Context, isForced bool) error { modes = append(modes, filer.OverwriteIfExists) } - err = locker.filer.Write(ctx, LockFileName, bytes.NewReader(buf), modes...) + err = locker.filer.Write(ctx, LockFileName, bytes.NewReader(buf), int64(len(buf)), modes...) if err != nil { // If the write failed because the lock file already exists, don't return // the error and instead fall through to [assertLockHeld] below. diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index b0c96e01..f9fcce75 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -67,7 +67,7 @@ func (s *Sync) applyPut(ctx context.Context, localName string) error { defer localFile.Close() opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} - err = s.filer.Write(ctx, localName, localFile, opts...) + err = s.filer.Write(ctx, localName, localFile, -1, opts...) if err != nil { return err }