diff --git a/bundle/artifacts/all.go b/bundle/artifacts/all.go new file mode 100644 index 00000000..066ecbc2 --- /dev/null +++ b/bundle/artifacts/all.go @@ -0,0 +1,41 @@ +package artifacts + +import ( + "context" + "fmt" + + "github.com/databricks/bricks/bundle" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" +) + +// all is an internal proxy for producing a list of mutators for all artifacts. +// It is used to produce the [BuildAll] and [UploadAll] mutators. +type all struct { + name string + fn func(name string) (bundle.Mutator, error) +} + +func (m *all) Name() string { + return fmt.Sprintf("artifacts.%sAll", m.name) +} + +func (m *all) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) { + var out []bundle.Mutator + + // Iterate with stable ordering. + keys := maps.Keys(b.Config.Artifacts) + slices.Sort(keys) + + for _, name := range keys { + m, err := m.fn(name) + if err != nil { + return nil, err + } + if m != nil { + out = append(out, m) + } + } + + return out, nil +} diff --git a/bundle/artifacts/build.go b/bundle/artifacts/build.go new file mode 100644 index 00000000..cdddb93a --- /dev/null +++ b/bundle/artifacts/build.go @@ -0,0 +1,41 @@ +package artifacts + +import ( + "context" + "fmt" + + "github.com/databricks/bricks/bundle" + "github.com/databricks/bricks/bundle/artifacts/notebook" +) + +func BuildAll() bundle.Mutator { + return &all{ + name: "Build", + fn: buildArtifactByName, + } +} + +type build struct { + name string +} + +func buildArtifactByName(name string) (bundle.Mutator, error) { + return &build{name}, nil +} + +func (m *build) Name() string { + return fmt.Sprintf("artifacts.Build(%s)", m.name) +} + +func (m *build) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) { + artifact, ok := b.Config.Artifacts[m.name] + if !ok { + return nil, fmt.Errorf("artifact doesn't exist: %s", m.name) + } + + if artifact.Notebook != nil { + return []bundle.Mutator{notebook.Build(m.name)}, nil + } + + return nil, nil +} diff --git a/bundle/artifacts/notebook/build.go b/bundle/artifacts/notebook/build.go new file mode 100644 index 00000000..e7c0e7d5 --- /dev/null +++ b/bundle/artifacts/notebook/build.go @@ -0,0 +1,81 @@ +package notebook + +import ( + "context" + "errors" + "fmt" + "os" + "path" + "path/filepath" + "strings" + + "github.com/databricks/bricks/bundle" + "github.com/databricks/databricks-sdk-go/service/workspace" +) + +type build struct { + name string +} + +func Build(name string) bundle.Mutator { + return &build{ + name: name, + } +} + +func (m *build) Name() string { + return fmt.Sprintf("notebook.Build(%s)", m.name) +} + +func (m *build) Apply(_ context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) { + a, ok := b.Config.Artifacts[m.name] + if !ok { + return nil, fmt.Errorf("artifact doesn't exist: %s", m.name) + } + + artifact := a.Notebook + + // Check if the filetype is supported. + switch ext := strings.ToLower(filepath.Ext(artifact.Path)); ext { + case ".py": + artifact.Language = workspace.LanguagePython + case ".scala": + artifact.Language = workspace.LanguageScala + case ".sql": + artifact.Language = workspace.LanguageSql + default: + return nil, fmt.Errorf("invalid notebook extension: %s", ext) + } + + // Open underlying file. + f, err := os.Open(filepath.Join(b.Config.Path, artifact.Path)) + if err != nil { + return nil, fmt.Errorf("unable to open artifact file %s: %w", artifact.Path, errors.Unwrap(err)) + } + defer f.Close() + + // Check that the file contains the notebook marker on its first line. + ok, err = hasMarker(artifact.Language, f) + if err != nil { + return nil, fmt.Errorf("unable to read artifact file %s: %s", artifact.Path, errors.Unwrap(err)) + } + if !ok { + return nil, fmt.Errorf("notebook marker not found in %s", artifact.Path) + } + + // Check that an artifact path is defined. + remotePath := b.Config.Workspace.ArtifactPath.Workspace + if remotePath == nil { + return nil, fmt.Errorf("remote artifact path not configured") + } + + // Store absolute paths. + artifact.LocalPath = filepath.Join(b.Config.Path, artifact.Path) + artifact.RemotePath = path.Join(*remotePath, stripExtension(artifact.Path)) + return nil, nil +} + +func stripExtension(path string) string { + ext := filepath.Ext(path) + return path[0 : len(path)-len(ext)] +} diff --git a/bundle/artifacts/notebook/marker.go b/bundle/artifacts/notebook/marker.go new file mode 100644 index 00000000..a04ca989 --- /dev/null +++ b/bundle/artifacts/notebook/marker.go @@ -0,0 +1,29 @@ +package notebook + +import ( + "bufio" + "io" + "strings" + + "github.com/databricks/databricks-sdk-go/service/workspace" +) + +func hasMarker(l workspace.Language, r io.Reader) (bool, error) { + scanner := bufio.NewScanner(r) + ok := scanner.Scan() + if !ok { + return false, scanner.Err() + } + + line := strings.TrimSpace(scanner.Text()) + switch l { + case workspace.LanguagePython: + return line == "# Databricks notebook source", nil + case workspace.LanguageScala: + return line == "// Databricks notebook source", nil + case workspace.LanguageSql: + return line == "-- Databricks notebook source", nil + default: + panic("language not handled: " + l) + } +} diff --git a/bundle/artifacts/notebook/upload.go b/bundle/artifacts/notebook/upload.go new file mode 100644 index 00000000..c9cf1356 --- /dev/null +++ b/bundle/artifacts/notebook/upload.go @@ -0,0 +1,60 @@ +package notebook + +import ( + "context" + "encoding/base64" + "errors" + "fmt" + "os" + "path" + + "github.com/databricks/bricks/bundle" + "github.com/databricks/databricks-sdk-go/service/workspace" +) + +type upload struct { + name string +} + +func Upload(name string) bundle.Mutator { + return &upload{ + name: name, + } +} + +func (m *upload) Name() string { + return fmt.Sprintf("notebook.Upload(%s)", m.name) +} + +func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) { + a, ok := b.Config.Artifacts[m.name] + if !ok { + return nil, fmt.Errorf("artifact doesn't exist: %s", m.name) + } + + artifact := a.Notebook + raw, err := os.ReadFile(artifact.LocalPath) + if err != nil { + return nil, fmt.Errorf("unable to read %s: %w", m.name, errors.Unwrap(err)) + } + + // Make sure target directory exists. + err = b.WorkspaceClient().Workspace.MkdirsByPath(ctx, path.Dir(artifact.RemotePath)) + if err != nil { + return nil, fmt.Errorf("unable to create directory for %s: %w", m.name, err) + } + + // Import to workspace. + err = b.WorkspaceClient().Workspace.Import(ctx, workspace.Import{ + Path: artifact.RemotePath, + Overwrite: true, + Format: workspace.ExportFormatSource, + Language: artifact.Language, + Content: base64.StdEncoding.EncodeToString(raw), + }) + if err != nil { + return nil, fmt.Errorf("unable to import %s: %w", m.name, err) + } + + return nil, nil +} diff --git a/bundle/artifacts/upload.go b/bundle/artifacts/upload.go new file mode 100644 index 00000000..45b70ed0 --- /dev/null +++ b/bundle/artifacts/upload.go @@ -0,0 +1,41 @@ +package artifacts + +import ( + "context" + "fmt" + + "github.com/databricks/bricks/bundle" + "github.com/databricks/bricks/bundle/artifacts/notebook" +) + +func UploadAll() bundle.Mutator { + return &all{ + name: "Upload", + fn: uploadArtifactByName, + } +} + +type upload struct { + name string +} + +func uploadArtifactByName(name string) (bundle.Mutator, error) { + return &upload{name}, nil +} + +func (m *upload) Name() string { + return fmt.Sprintf("artifacts.Upload(%s)", m.name) +} + +func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) { + artifact, ok := b.Config.Artifacts[m.name] + if !ok { + return nil, fmt.Errorf("artifact doesn't exist: %s", m.name) + } + + if artifact.Notebook != nil { + return []bundle.Mutator{notebook.Upload(m.name)}, nil + } + + return nil, nil +} diff --git a/bundle/config/artifact.go b/bundle/config/artifact.go new file mode 100644 index 00000000..2ff6b122 --- /dev/null +++ b/bundle/config/artifact.go @@ -0,0 +1,20 @@ +package config + +import "github.com/databricks/databricks-sdk-go/service/workspace" + +// Artifact defines a single local code artifact that can be +// built/uploaded/referenced in the context of this bundle. +type Artifact struct { + Notebook *NotebookArtifact `json:"notebook,omitempty"` +} + +type NotebookArtifact struct { + Path string `json:"path"` + + // Language is detected during build step. + Language workspace.Language `json:"language,omitempty"` + + // Paths are synthesized during build step. + LocalPath string `json:"local_path,omitempty"` + RemotePath string `json:"remote_path,omitempty"` +} diff --git a/bundle/config/environment.go b/bundle/config/environment.go index f9ea40b4..131c1442 100644 --- a/bundle/config/environment.go +++ b/bundle/config/environment.go @@ -7,5 +7,7 @@ type Environment struct { Workspace *Workspace `json:"workspace,omitempty"` + Artifacts map[string]*Artifact `json:"artifacts,omitempty"` + Resources *Resources `json:"resources,omitempty"` } diff --git a/bundle/config/root.go b/bundle/config/root.go index 620e3e3f..a8f673ad 100644 --- a/bundle/config/root.go +++ b/bundle/config/root.go @@ -32,6 +32,9 @@ type Root struct { // and paths in the workspace tree to use for this bundle. Workspace Workspace `json:"workspace"` + // Artifacts contains a description of all code artifacts in this bundle. + Artifacts map[string]*Artifact `json:"artifacts,omitempty"` + // Resources contains a description of all Databricks resources // to deploy in this bundle (e.g. jobs, pipelines, etc.). Resources Resources `json:"resources"` @@ -98,6 +101,13 @@ func (r *Root) MergeEnvironment(env *Environment) error { } } + if env.Artifacts != nil { + err = mergo.Merge(&r.Artifacts, env.Artifacts, mergo.WithAppendSlice) + if err != nil { + return err + } + } + if env.Resources != nil { err = mergo.Merge(&r.Resources, env.Resources, mergo.WithAppendSlice) if err != nil { diff --git a/bundle/config/workspace.go b/bundle/config/workspace.go index 45b244d9..1b637176 100644 --- a/bundle/config/workspace.go +++ b/bundle/config/workspace.go @@ -4,6 +4,14 @@ import ( "github.com/databricks/databricks-sdk-go" ) +type PathLike struct { + // Workspace contains a WSFS path. + Workspace *string `json:"workspace,omitempty"` + + // DBFS contains a DBFS path. + DBFS *string `json:"dbfs,omitempty"` +} + // Workspace defines configurables at the workspace level. type Workspace struct { // Unified authentication attributes. @@ -28,6 +36,12 @@ type Workspace struct { AzureTenantID string `json:"azure_tenant_id,omitempty"` AzureEnvironment string `json:"azure_environment,omitempty"` AzureLoginAppID string `json:"azure_login_app_id,omitempty"` + + // Remote path for artifacts. + // This can specify a workspace path, a DBFS path, or both. + // Some artifacts must be stored in the workspace (e.g. notebooks). + // Some artifacts must be stored on DBFS (e.g. wheels, JARs). + ArtifactPath PathLike `json:"artifact_path"` } func (w *Workspace) Client() (*databricks.WorkspaceClient, error) {