Model code artifacts (#107)

This adds:
* Top level "artifacts" configuration key
* Support for notebooks (does language detection and upload)
* Merge of per-environment artifacts (or artifact overrides) into top level
This commit is contained in:
Pieter Noordhuis 2022-11-30 14:15:22 +01:00 committed by GitHub
parent c6b3b35e98
commit e1669b0352
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 339 additions and 0 deletions

41
bundle/artifacts/all.go Normal file
View File

@ -0,0 +1,41 @@
package artifacts
import (
"context"
"fmt"
"github.com/databricks/bricks/bundle"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
)
// all is an internal proxy for producing a list of mutators for all artifacts.
// It is used to produce the [BuildAll] and [UploadAll] mutators.
type all struct {
name string
fn func(name string) (bundle.Mutator, error)
}
func (m *all) Name() string {
return fmt.Sprintf("artifacts.%sAll", m.name)
}
func (m *all) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
var out []bundle.Mutator
// Iterate with stable ordering.
keys := maps.Keys(b.Config.Artifacts)
slices.Sort(keys)
for _, name := range keys {
m, err := m.fn(name)
if err != nil {
return nil, err
}
if m != nil {
out = append(out, m)
}
}
return out, nil
}

41
bundle/artifacts/build.go Normal file
View File

@ -0,0 +1,41 @@
package artifacts
import (
"context"
"fmt"
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/artifacts/notebook"
)
func BuildAll() bundle.Mutator {
return &all{
name: "Build",
fn: buildArtifactByName,
}
}
type build struct {
name string
}
func buildArtifactByName(name string) (bundle.Mutator, error) {
return &build{name}, nil
}
func (m *build) Name() string {
return fmt.Sprintf("artifacts.Build(%s)", m.name)
}
func (m *build) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
artifact, ok := b.Config.Artifacts[m.name]
if !ok {
return nil, fmt.Errorf("artifact doesn't exist: %s", m.name)
}
if artifact.Notebook != nil {
return []bundle.Mutator{notebook.Build(m.name)}, nil
}
return nil, nil
}

View File

@ -0,0 +1,81 @@
package notebook
import (
"context"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"github.com/databricks/bricks/bundle"
"github.com/databricks/databricks-sdk-go/service/workspace"
)
type build struct {
name string
}
func Build(name string) bundle.Mutator {
return &build{
name: name,
}
}
func (m *build) Name() string {
return fmt.Sprintf("notebook.Build(%s)", m.name)
}
func (m *build) Apply(_ context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
a, ok := b.Config.Artifacts[m.name]
if !ok {
return nil, fmt.Errorf("artifact doesn't exist: %s", m.name)
}
artifact := a.Notebook
// Check if the filetype is supported.
switch ext := strings.ToLower(filepath.Ext(artifact.Path)); ext {
case ".py":
artifact.Language = workspace.LanguagePython
case ".scala":
artifact.Language = workspace.LanguageScala
case ".sql":
artifact.Language = workspace.LanguageSql
default:
return nil, fmt.Errorf("invalid notebook extension: %s", ext)
}
// Open underlying file.
f, err := os.Open(filepath.Join(b.Config.Path, artifact.Path))
if err != nil {
return nil, fmt.Errorf("unable to open artifact file %s: %w", artifact.Path, errors.Unwrap(err))
}
defer f.Close()
// Check that the file contains the notebook marker on its first line.
ok, err = hasMarker(artifact.Language, f)
if err != nil {
return nil, fmt.Errorf("unable to read artifact file %s: %s", artifact.Path, errors.Unwrap(err))
}
if !ok {
return nil, fmt.Errorf("notebook marker not found in %s", artifact.Path)
}
// Check that an artifact path is defined.
remotePath := b.Config.Workspace.ArtifactPath.Workspace
if remotePath == nil {
return nil, fmt.Errorf("remote artifact path not configured")
}
// Store absolute paths.
artifact.LocalPath = filepath.Join(b.Config.Path, artifact.Path)
artifact.RemotePath = path.Join(*remotePath, stripExtension(artifact.Path))
return nil, nil
}
func stripExtension(path string) string {
ext := filepath.Ext(path)
return path[0 : len(path)-len(ext)]
}

View File

@ -0,0 +1,29 @@
package notebook
import (
"bufio"
"io"
"strings"
"github.com/databricks/databricks-sdk-go/service/workspace"
)
func hasMarker(l workspace.Language, r io.Reader) (bool, error) {
scanner := bufio.NewScanner(r)
ok := scanner.Scan()
if !ok {
return false, scanner.Err()
}
line := strings.TrimSpace(scanner.Text())
switch l {
case workspace.LanguagePython:
return line == "# Databricks notebook source", nil
case workspace.LanguageScala:
return line == "// Databricks notebook source", nil
case workspace.LanguageSql:
return line == "-- Databricks notebook source", nil
default:
panic("language not handled: " + l)
}
}

View File

@ -0,0 +1,60 @@
package notebook
import (
"context"
"encoding/base64"
"errors"
"fmt"
"os"
"path"
"github.com/databricks/bricks/bundle"
"github.com/databricks/databricks-sdk-go/service/workspace"
)
type upload struct {
name string
}
func Upload(name string) bundle.Mutator {
return &upload{
name: name,
}
}
func (m *upload) Name() string {
return fmt.Sprintf("notebook.Upload(%s)", m.name)
}
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
a, ok := b.Config.Artifacts[m.name]
if !ok {
return nil, fmt.Errorf("artifact doesn't exist: %s", m.name)
}
artifact := a.Notebook
raw, err := os.ReadFile(artifact.LocalPath)
if err != nil {
return nil, fmt.Errorf("unable to read %s: %w", m.name, errors.Unwrap(err))
}
// Make sure target directory exists.
err = b.WorkspaceClient().Workspace.MkdirsByPath(ctx, path.Dir(artifact.RemotePath))
if err != nil {
return nil, fmt.Errorf("unable to create directory for %s: %w", m.name, err)
}
// Import to workspace.
err = b.WorkspaceClient().Workspace.Import(ctx, workspace.Import{
Path: artifact.RemotePath,
Overwrite: true,
Format: workspace.ExportFormatSource,
Language: artifact.Language,
Content: base64.StdEncoding.EncodeToString(raw),
})
if err != nil {
return nil, fmt.Errorf("unable to import %s: %w", m.name, err)
}
return nil, nil
}

View File

@ -0,0 +1,41 @@
package artifacts
import (
"context"
"fmt"
"github.com/databricks/bricks/bundle"
"github.com/databricks/bricks/bundle/artifacts/notebook"
)
func UploadAll() bundle.Mutator {
return &all{
name: "Upload",
fn: uploadArtifactByName,
}
}
type upload struct {
name string
}
func uploadArtifactByName(name string) (bundle.Mutator, error) {
return &upload{name}, nil
}
func (m *upload) Name() string {
return fmt.Sprintf("artifacts.Upload(%s)", m.name)
}
func (m *upload) Apply(ctx context.Context, b *bundle.Bundle) ([]bundle.Mutator, error) {
artifact, ok := b.Config.Artifacts[m.name]
if !ok {
return nil, fmt.Errorf("artifact doesn't exist: %s", m.name)
}
if artifact.Notebook != nil {
return []bundle.Mutator{notebook.Upload(m.name)}, nil
}
return nil, nil
}

20
bundle/config/artifact.go Normal file
View File

@ -0,0 +1,20 @@
package config
import "github.com/databricks/databricks-sdk-go/service/workspace"
// Artifact defines a single local code artifact that can be
// built/uploaded/referenced in the context of this bundle.
type Artifact struct {
Notebook *NotebookArtifact `json:"notebook,omitempty"`
}
type NotebookArtifact struct {
Path string `json:"path"`
// Language is detected during build step.
Language workspace.Language `json:"language,omitempty"`
// Paths are synthesized during build step.
LocalPath string `json:"local_path,omitempty"`
RemotePath string `json:"remote_path,omitempty"`
}

View File

@ -7,5 +7,7 @@ type Environment struct {
Workspace *Workspace `json:"workspace,omitempty"`
Artifacts map[string]*Artifact `json:"artifacts,omitempty"`
Resources *Resources `json:"resources,omitempty"`
}

View File

@ -32,6 +32,9 @@ type Root struct {
// and paths in the workspace tree to use for this bundle.
Workspace Workspace `json:"workspace"`
// Artifacts contains a description of all code artifacts in this bundle.
Artifacts map[string]*Artifact `json:"artifacts,omitempty"`
// Resources contains a description of all Databricks resources
// to deploy in this bundle (e.g. jobs, pipelines, etc.).
Resources Resources `json:"resources"`
@ -98,6 +101,13 @@ func (r *Root) MergeEnvironment(env *Environment) error {
}
}
if env.Artifacts != nil {
err = mergo.Merge(&r.Artifacts, env.Artifacts, mergo.WithAppendSlice)
if err != nil {
return err
}
}
if env.Resources != nil {
err = mergo.Merge(&r.Resources, env.Resources, mergo.WithAppendSlice)
if err != nil {

View File

@ -4,6 +4,14 @@ import (
"github.com/databricks/databricks-sdk-go"
)
type PathLike struct {
// Workspace contains a WSFS path.
Workspace *string `json:"workspace,omitempty"`
// DBFS contains a DBFS path.
DBFS *string `json:"dbfs,omitempty"`
}
// Workspace defines configurables at the workspace level.
type Workspace struct {
// Unified authentication attributes.
@ -28,6 +36,12 @@ type Workspace struct {
AzureTenantID string `json:"azure_tenant_id,omitempty"`
AzureEnvironment string `json:"azure_environment,omitempty"`
AzureLoginAppID string `json:"azure_login_app_id,omitempty"`
// Remote path for artifacts.
// This can specify a workspace path, a DBFS path, or both.
// Some artifacts must be stored in the workspace (e.g. notebooks).
// Some artifacts must be stored on DBFS (e.g. wheels, JARs).
ArtifactPath PathLike `json:"artifact_path"`
}
func (w *Workspace) Client() (*databricks.WorkspaceClient, error) {