diff --git a/cmd/deploy/deploy.go b/cmd/deploy/deploy.go new file mode 100644 index 00000000..bb759123 --- /dev/null +++ b/cmd/deploy/deploy.go @@ -0,0 +1,32 @@ +package deploy + +import ( + "github.com/databricks/bricks/cmd/root" + "github.com/databricks/bricks/project" + "github.com/spf13/cobra" +) + +// launchCmd represents the launch command +var deployCmd = &cobra.Command{ + Use: "deploy", + Short: "deploys a DAB", + // Long: `Reads a file and executes it on dev cluster`, + // Args: cobra.ExactArgs(1), + PreRunE: project.Configure, + Run: func(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + deployMutex := &DeployMutex{ + User: "shreyas.goenka@databricks.com", + // TODO: Adjust this using a command line arguement + IsForced: false, + // TODO: pass through cmd line arg + ProjectRoot: "/Repos/shreyas.goenka@databricks.com/test-dbx", + } + deployMutex.Lock(ctx) + defer deployMutex.Unlock(ctx) + }, +} + +func init() { + root.RootCmd.AddCommand(deployCmd) +} diff --git a/cmd/deploy/lock.go b/cmd/deploy/lock.go new file mode 100644 index 00000000..8edf0be5 --- /dev/null +++ b/cmd/deploy/lock.go @@ -0,0 +1,110 @@ +package deploy + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "path/filepath" + "strings" + "time" + + "github.com/databricks/bricks/project" + "github.com/databricks/databricks-sdk-go/databricks/client" + "github.com/databricks/databricks-sdk-go/service/workspace" +) + +// TODO: create a mutex for dbfs too +type Mutex interface { + Lock(ctx context.Context) error + Unlock(ctx context.Context) error +} + +type DeployMutex struct { + User string + // Remove if we do not need this and its enoguh to assert that AcquireLock did not return an error + AcquisitionTime time.Time + // TODO: Add a timestamp demostrating the time of lock acquisition + IsForced bool + ProjectRoot string +} + +// TODO: Consider makeing aquire lock a blocking operation with a timeout. Incase of a race conflict, all lower priority +// lock claims would delete themselves and still one person would end up with the lock + +// TODO: Lets keep a history of all the deployments made. +// 1. When waiting for lock, we can also show that from whom are we waiting to gain a lock from and when they accesses the lock +// 2. For force, delete all the other locks and +func (mu *DeployMutex) Lock(ctx context.Context) error { + prj := project.Get(ctx) + wsc := prj.WorkspacesClient() + lockFilePath := filepath.Join(mu.ProjectRoot, ".bundle/deploy.lock") + + // Do I need to dereference this pointer to marshal ? + mu.AcquisitionTime = time.Now() + mutexMetadata, err := json.Marshal(*mu) + if err != nil { + return err + } + mutexMetadataReader := bytes.NewReader(mutexMetadata) + + // create .bundle dir in remote project root + err = wsc.Workspace.MkdirsByPath(ctx, filepath.Dir(lockFilePath)) + if err != nil { + return fmt.Errorf("could not mkdir to put file: %s", err) + } + + // Try to create the lock file. If we get an error, that means the lock + // file already exists + apiClient, err := client.New(wsc.Config) + if err != nil { + return err + } + importApiPath := fmt.Sprintf( + "/api/2.0/workspace-files/import-file/%s?overwrite=false", + strings.TrimLeft(lockFilePath, "/")) + err = apiClient.Post(ctx, importApiPath, mutexMetadataReader, nil) + + if err != nil && strings.Contains(err.Error(), fmt.Sprintf("%s already exists", lockFilePath)) { + expectApiPath := fmt.Sprintf( + "/api/2.0/workspace-files/%s", + strings.TrimLeft(lockFilePath, "/")) + + var res interface{} + + err = apiClient.Get(ctx, expectApiPath, nil, &res) + // TODO: handle the race condition when the file gets deleted before you can read + // it. + // NOTE: https://databricks.atlassian.net/browse/ES-510449 + // Do some error parsing maybe, add on previos error for more context and + // add on suggestion for the user to retry deployment + if err != nil { + return err + } + lockJson, err := json.Marshal(res) + if err != nil { + return err + } + ownerMutex := DeployMutex{} + err = json.Unmarshal(lockJson, &ownerMutex) + if err != nil { + return err + } + // TODO: add isForce to message here and convert timestamp to human readable format + return fmt.Errorf("cannot deploy. %s has been deploying since %v. Use --force to forcibly deploy your bundle", ownerMutex.User, ownerMutex.AcquisitionTime) + } + return nil +} + +func (mu *DeployMutex) Unlock(ctx context.Context) error { + prj := project.Get(ctx) + wsc := prj.WorkspacesClient() + lockFilePath := filepath.Join(mu.ProjectRoot, ".bundle/deploy.lock") + err := wsc.Workspace.Delete(ctx, + workspace.Delete{ + Path: lockFilePath, + Recursive: false, + }, + ) + return err +} diff --git a/cmd/deploy/lock_test.go b/cmd/deploy/lock_test.go new file mode 100644 index 00000000..7301ce13 --- /dev/null +++ b/cmd/deploy/lock_test.go @@ -0,0 +1,8 @@ +package deploy + +// TODO: maybe move this to integraton tests + +func TestAccLock(t *testing.T) { + +} + diff --git a/main.go b/main.go index 805c5bb6..feb460fc 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( _ "github.com/databricks/bricks/cmd/api" _ "github.com/databricks/bricks/cmd/configure" + _ "github.com/databricks/bricks/cmd/deploy" _ "github.com/databricks/bricks/cmd/fs" _ "github.com/databricks/bricks/cmd/init" _ "github.com/databricks/bricks/cmd/launch" diff --git a/project/project.go b/project/project.go index f6103ce9..c9fadee2 100644 --- a/project/project.go +++ b/project/project.go @@ -155,6 +155,7 @@ func (p *project) Environment() Environment { } func (p *project) Me() (*scim.User, error) { + // QQ: Why is there a lock here? p.mu.Lock() defer p.mu.Unlock() if p.me != nil {