databricks-cli/bundle/deploy/state_pull.go

134 lines
3.1 KiB
Go
Raw Normal View History

package deploy
import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"io/fs"
"os"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/deploy/files"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/cli/libs/log"
"github.com/databricks/cli/libs/sync"
)
type statePull struct {
filerFactory FilerFactory
}
func (s *statePull) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
f, err := s.filerFactory(b)
if err != nil {
return diag.FromErr(err)
}
// Download deployment state file from filer to local cache directory.
log.Infof(ctx, "Opening remote deployment state file")
remote, err := s.remoteState(ctx, f)
if err != nil {
log.Infof(ctx, "Unable to open remote deployment state file: %s", err)
return diag.FromErr(err)
}
if remote == nil {
log.Infof(ctx, "Remote deployment state file does not exist")
return nil
}
statePath, err := getPathToStateFile(ctx, b)
if err != nil {
return diag.FromErr(err)
}
local, err := os.OpenFile(statePath, os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return diag.FromErr(err)
}
defer local.Close()
data := remote.Bytes()
err = validateRemoteStateCompatibility(bytes.NewReader(data))
if err != nil {
return diag.FromErr(err)
}
if !isLocalStateStale(local, bytes.NewReader(data)) {
log.Infof(ctx, "Local deployment state is the same or newer, ignoring remote state")
return nil
}
// Truncating the file before writing
err = local.Truncate(0)
if err != nil {
return diag.FromErr(err)
}
_, err = local.Seek(0, 0)
if err != nil {
return diag.FromErr(err)
}
// Write file to disk.
log.Infof(ctx, "Writing remote deployment state file to local cache directory")
_, err = io.Copy(local, bytes.NewReader(data))
if err != nil {
return diag.FromErr(err)
}
var state DeploymentState
err = json.Unmarshal(data, &state)
if err != nil {
return diag.FromErr(err)
}
// Create a new snapshot based on the deployment state file.
opts, err := files.GetSyncOptions(ctx, bundle.ReadOnly(b))
if err != nil {
return diag.FromErr(err)
}
log.Infof(ctx, "Creating new snapshot")
snapshot, err := sync.NewSnapshot(state.Files.ToSlice(b.SyncRoot), opts)
if err != nil {
return diag.FromErr(err)
}
// Persist the snapshot to disk.
log.Infof(ctx, "Persisting snapshot to disk")
return diag.FromErr(snapshot.Save(ctx))
}
func (s *statePull) remoteState(ctx context.Context, f filer.Filer) (*bytes.Buffer, error) {
// Download deployment state file from filer to local cache directory.
remote, err := f.Read(ctx, DeploymentStateFileName)
if err != nil {
// On first deploy this file doesn't yet exist.
if errors.Is(err, fs.ErrNotExist) {
return nil, nil
}
return nil, err
}
defer remote.Close()
var buf bytes.Buffer
_, err = io.Copy(&buf, remote)
if err != nil {
return nil, err
}
return &buf, nil
}
func (s *statePull) Name() string {
return "deploy:state-pull"
}
// StatePull returns a mutator that pulls the deployment state from the Databricks workspace
func StatePull() bundle.Mutator {
return &statePull{StateFiler}
}