mirror of https://github.com/databricks/cli.git
Switch to use GET workspaces-files/{name} instead of workspace/export for state files (#2423)
## Changes Switch to use GET workspaces-files/{name} instead of workspace/export for state files. ## Why `/api/2.0./workspaces-files/{name}` has a higher limit which allows to export state files larger than 10 MBs (which is the current limit for `workspace/export`). We don't use the same API for read in other places and fully replacing existing Filer because it doesn't correct get the file content for notebooks and returns "File Not Found" error instead. ## Tests All existing tests pass
This commit is contained in:
parent
549b226cbc
commit
41961226be
|
@ -0,0 +1,21 @@
|
||||||
|
bundle:
|
||||||
|
name: state
|
||||||
|
|
||||||
|
resources:
|
||||||
|
jobs:
|
||||||
|
test:
|
||||||
|
name: "test"
|
||||||
|
tasks:
|
||||||
|
- task_key: "test-task"
|
||||||
|
spark_python_task:
|
||||||
|
python_file: ./test.py
|
||||||
|
new_cluster:
|
||||||
|
spark_version: 15.4.x-scala2.12
|
||||||
|
node_type_id: i3.xlarge
|
||||||
|
data_security_mode: SINGLE_USER
|
||||||
|
num_workers: 0
|
||||||
|
spark_conf:
|
||||||
|
spark.master: "local[*, 4]"
|
||||||
|
spark.databricks.cluster.profile: singleNode
|
||||||
|
custom_tags:
|
||||||
|
ResourceClass: SingleNode
|
|
@ -0,0 +1,4 @@
|
||||||
|
{
|
||||||
|
"method": "GET",
|
||||||
|
"path": "/api/2.0/workspace-files/Workspace/Users/[USERNAME]/.bundle/state/default/state/terraform.tfstate"
|
||||||
|
}
|
|
@ -0,0 +1,12 @@
|
||||||
|
|
||||||
|
>>> [CLI] bundle deploy
|
||||||
|
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/state/default/files...
|
||||||
|
Deploying resources...
|
||||||
|
Updating deployment state...
|
||||||
|
Deployment complete!
|
||||||
|
|
||||||
|
>>> [CLI] bundle deploy
|
||||||
|
Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/state/default/files...
|
||||||
|
Deploying resources...
|
||||||
|
Updating deployment state...
|
||||||
|
Deployment complete!
|
|
@ -0,0 +1,4 @@
|
||||||
|
trace $CLI bundle deploy
|
||||||
|
trace $CLI bundle deploy # We do 2 deploys because only 2nd deploy will pull state from remote after 1st created it
|
||||||
|
jq 'select(.path == "/api/2.0/workspace-files/Workspace/Users/[USERNAME]/.bundle/state/default/state/terraform.tfstate")' out.requests.txt > out.state.txt
|
||||||
|
rm out.requests.txt
|
|
@ -0,0 +1 @@
|
||||||
|
print("Hello world!")
|
|
@ -0,0 +1,2 @@
|
||||||
|
Cloud = false
|
||||||
|
RecordRequests = true
|
|
@ -111,6 +111,11 @@ func AddHandlers(server *testserver.Server) {
|
||||||
return ""
|
return ""
|
||||||
})
|
})
|
||||||
|
|
||||||
|
server.Handle("GET", "/api/2.0/workspace-files/{path:.*}", func(req testserver.Request) any {
|
||||||
|
path := req.Vars["path"]
|
||||||
|
return req.Workspace.WorkspaceFilesExportFile(path)
|
||||||
|
})
|
||||||
|
|
||||||
server.Handle("GET", "/api/2.1/unity-catalog/current-metastore-assignment", func(req testserver.Request) any {
|
server.Handle("GET", "/api/2.1/unity-catalog/current-metastore-assignment", func(req testserver.Request) any {
|
||||||
return testMetastore
|
return testMetastore
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,14 +1,94 @@
|
||||||
package deploy
|
package deploy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/fs"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/databricks/cli/bundle"
|
"github.com/databricks/cli/bundle"
|
||||||
"github.com/databricks/cli/libs/filer"
|
"github.com/databricks/cli/libs/filer"
|
||||||
|
"github.com/databricks/databricks-sdk-go/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FilerFactory is a function that returns a filer.Filer.
|
// FilerFactory is a function that returns a filer.Filer.
|
||||||
type FilerFactory func(b *bundle.Bundle) (filer.Filer, error)
|
type FilerFactory func(b *bundle.Bundle) (filer.Filer, error)
|
||||||
|
|
||||||
// StateFiler returns a filer.Filer that can be used to read/write state files.
|
type stateFiler struct {
|
||||||
func StateFiler(b *bundle.Bundle) (filer.Filer, error) {
|
filer filer.Filer
|
||||||
return filer.NewWorkspaceFilesClient(b.WorkspaceClient(), b.Config.Workspace.StatePath)
|
|
||||||
|
apiClient *client.DatabricksClient
|
||||||
|
root filer.WorkspaceRootPath
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s stateFiler) Delete(ctx context.Context, path string, mode ...filer.DeleteMode) error {
|
||||||
|
return s.filer.Delete(ctx, path, mode...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mkdir implements filer.Filer.
|
||||||
|
func (s stateFiler) Mkdir(ctx context.Context, path string) error {
|
||||||
|
return s.filer.Mkdir(ctx, path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s stateFiler) Read(ctx context.Context, path string) (io.ReadCloser, error) {
|
||||||
|
absPath, err := s.root.Join(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stat, err := s.Stat(ctx, path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if stat.IsDir() {
|
||||||
|
return nil, fmt.Errorf("not a file: %s", absPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
urlPath := "/api/2.0/workspace-files/" + url.PathEscape(strings.TrimLeft(absPath, "/"))
|
||||||
|
err = s.apiClient.Do(ctx, http.MethodGet, urlPath, nil, nil, nil, &buf)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return io.NopCloser(&buf), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s stateFiler) ReadDir(ctx context.Context, path string) ([]fs.DirEntry, error) {
|
||||||
|
return s.filer.ReadDir(ctx, path)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s stateFiler) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
|
||||||
|
return s.filer.Stat(ctx, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s stateFiler) Write(ctx context.Context, path string, reader io.Reader, mode ...filer.WriteMode) error {
|
||||||
|
return s.filer.Write(ctx, path, reader, mode...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StateFiler returns a filer.Filer that can be used to read/write state files.
|
||||||
|
// We use a custom workspace filer which uses workspace-files API to read state files.
|
||||||
|
// This API has a higher than 10 MB limits and allows to export large state files.
|
||||||
|
// We don't use the same API for read because it doesn't correct get the file content for notebooks and returns
|
||||||
|
// "File Not Found" error instead.
|
||||||
|
func StateFiler(b *bundle.Bundle) (filer.Filer, error) {
|
||||||
|
f, err := filer.NewWorkspaceFilesClient(b.WorkspaceClient(), b.Config.Workspace.StatePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
apiClient, err := client.New(b.WorkspaceClient().Config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create API client: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return stateFiler{
|
||||||
|
filer: f,
|
||||||
|
root: filer.NewWorkspaceRootPath(b.Config.Workspace.StatePath),
|
||||||
|
apiClient: apiClient,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,8 +10,6 @@ import (
|
||||||
"github.com/databricks/cli/libs/log"
|
"github.com/databricks/cli/libs/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const MaxStateFileSize = 10 * 1024 * 1024 // 10MB
|
|
||||||
|
|
||||||
type statePush struct {
|
type statePush struct {
|
||||||
filerFactory FilerFactory
|
filerFactory FilerFactory
|
||||||
}
|
}
|
||||||
|
@ -37,17 +35,6 @@ func (s *statePush) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostic
|
||||||
}
|
}
|
||||||
defer local.Close()
|
defer local.Close()
|
||||||
|
|
||||||
if !b.Config.Bundle.Force {
|
|
||||||
state, err := local.Stat()
|
|
||||||
if err != nil {
|
|
||||||
return diag.FromErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if state.Size() > MaxStateFileSize {
|
|
||||||
return diag.Errorf("Deployment state file size exceeds the maximum allowed size of %d bytes. Please reduce the number of resources in your bundle, split your bundle into multiple or re-run the command with --force flag.", MaxStateFileSize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof(ctx, "Writing local deployment state file to remote state directory")
|
log.Infof(ctx, "Writing local deployment state file to remote state directory")
|
||||||
err = f.Write(ctx, DeploymentStateFileName, local, filer.CreateParentDirectories, filer.OverwriteIfExists)
|
err = f.Write(ctx, DeploymentStateFileName, local, filer.CreateParentDirectories, filer.OverwriteIfExists)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -47,17 +47,6 @@ func (l *statePush) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostic
|
||||||
}
|
}
|
||||||
defer local.Close()
|
defer local.Close()
|
||||||
|
|
||||||
if !b.Config.Bundle.Force {
|
|
||||||
state, err := local.Stat()
|
|
||||||
if err != nil {
|
|
||||||
return diag.FromErr(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if state.Size() > deploy.MaxStateFileSize {
|
|
||||||
return diag.Errorf("Terraform state file size exceeds the maximum allowed size of %d bytes. Please reduce the number of resources in your bundle, split your bundle into multiple or re-run the command with --force flag", deploy.MaxStateFileSize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Upload state file from local cache directory to filer.
|
// Upload state file from local cache directory to filer.
|
||||||
cmdio.LogString(ctx, "Updating deployment state...")
|
cmdio.LogString(ctx, "Updating deployment state...")
|
||||||
log.Infof(ctx, "Writing local state file to remote state directory")
|
log.Infof(ctx, "Writing local state file to remote state directory")
|
||||||
|
|
|
@ -3,7 +3,6 @@ package terraform
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -60,29 +59,3 @@ func TestStatePush(t *testing.T) {
|
||||||
diags := bundle.Apply(ctx, b, m)
|
diags := bundle.Apply(ctx, b, m)
|
||||||
assert.NoError(t, diags.Error())
|
assert.NoError(t, diags.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestStatePushLargeState(t *testing.T) {
|
|
||||||
mock := mockfiler.NewMockFiler(t)
|
|
||||||
m := &statePush{
|
|
||||||
identityFiler(mock),
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
b := statePushTestBundle(t)
|
|
||||||
|
|
||||||
largeState := map[string]any{}
|
|
||||||
for i := range 1000000 {
|
|
||||||
largeState[fmt.Sprintf("field_%d", i)] = i
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write a stale local state file.
|
|
||||||
writeLocalState(t, ctx, b, largeState)
|
|
||||||
diags := bundle.Apply(ctx, b, m)
|
|
||||||
assert.ErrorContains(t, diags.Error(), "Terraform state file size exceeds the maximum allowed size of 10485760 bytes. Please reduce the number of resources in your bundle, split your bundle into multiple or re-run the command with --force flag")
|
|
||||||
|
|
||||||
// Force the write.
|
|
||||||
b = statePushTestBundle(t)
|
|
||||||
b.Config.Bundle.Force = true
|
|
||||||
diags = bundle.Apply(ctx, b, m)
|
|
||||||
assert.NoError(t, diags.Error())
|
|
||||||
}
|
|
||||||
|
|
|
@ -83,6 +83,13 @@ func (s *FakeWorkspace) WorkspaceFilesImportFile(path string, body []byte) {
|
||||||
s.files[path] = body
|
s.files[path] = body
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *FakeWorkspace) WorkspaceFilesExportFile(path string) []byte {
|
||||||
|
if !strings.HasPrefix(path, "/") {
|
||||||
|
path = "/" + path
|
||||||
|
}
|
||||||
|
return s.files[path]
|
||||||
|
}
|
||||||
|
|
||||||
func (s *FakeWorkspace) JobsCreate(request jobs.CreateJob) Response {
|
func (s *FakeWorkspace) JobsCreate(request jobs.CreateJob) Response {
|
||||||
jobId := s.nextJobId
|
jobId := s.nextJobId
|
||||||
s.nextJobId++
|
s.nextJobId++
|
||||||
|
|
Loading…
Reference in New Issue