Added `bundle generate pipeline` command (#1139)

## Changes
Added `bundle generate pipeline` command

Usage as the following

```
databricks bundle generate pipeline --existing-pipeline-id f3b8c580-0a88-4b55-xxxx-yyyyyyyyyy
```

## Tests
Manually + added E2E test
This commit is contained in:
Andrew Nester 2024-01-25 11:35:14 +00:00 committed by GitHub
parent 9c3e4fda7c
commit f269f8015d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 379 additions and 18 deletions

View File

@ -0,0 +1,19 @@
package generate
import (
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/yamlsaver"
"github.com/databricks/databricks-sdk-go/service/pipelines"
)
var pipelineOrder = yamlsaver.NewOrder([]string{"name", "clusters", "configuration", "libraries"})
func ConvertPipelineToValue(pipeline *pipelines.PipelineSpec) (dyn.Value, error) {
value := make(map[string]dyn.Value)
// We ignore the following fields:
// - id: this is a read-only field
// - storage: changes to this field are rare because changing the storage recreates pipeline-related resources
// - edition: this field is rarely changed
return yamlsaver.ConvertToMapValue(pipeline, pipelineOrder, []string{"id", "storage", "edition"}, value)
}

View File

@ -14,5 +14,6 @@ func newGenerateCommand() *cobra.Command {
} }
cmd.AddCommand(generate.NewGenerateJobCommand()) cmd.AddCommand(generate.NewGenerateJobCommand())
cmd.AddCommand(generate.NewGeneratePipelineCommand())
return cmd return cmd
} }

View File

@ -0,0 +1,98 @@
package generate
import (
"bytes"
"context"
"fmt"
"io"
"os"
"path/filepath"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/databricks-sdk-go/experimental/mocks"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
func TestGeneratePipelineCommand(t *testing.T) {
cmd := NewGeneratePipelineCommand()
root := t.TempDir()
b := &bundle.Bundle{
Config: config.Root{
Path: root,
},
}
m := mocks.NewMockWorkspaceClient(t)
b.SetWorkpaceClient(m.WorkspaceClient)
pipelineApi := m.GetMockPipelinesAPI()
pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{PipelineId: "test-pipeline"}).Return(&pipelines.GetPipelineResponse{
PipelineId: "test-pipeline",
Name: "test-pipeline",
Spec: &pipelines.PipelineSpec{
Name: "test-pipeline",
Libraries: []pipelines.PipelineLibrary{
{Notebook: &pipelines.NotebookLibrary{
Path: "/test/notebook",
}},
{File: &pipelines.FileLibrary{
Path: "/test/file.py",
}},
},
},
}, nil)
workspaceApi := m.GetMockWorkspaceAPI()
workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/notebook").Return(&workspace.ObjectInfo{
ObjectType: workspace.ObjectTypeNotebook,
Language: workspace.LanguagePython,
Path: "/test/notebook",
}, nil)
workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/file.py").Return(&workspace.ObjectInfo{
ObjectType: workspace.ObjectTypeFile,
Path: "/test/file.py",
}, nil)
notebookContent := io.NopCloser(bytes.NewBufferString("# Databricks notebook source\nNotebook content"))
pyContent := io.NopCloser(bytes.NewBufferString("Py content"))
workspaceApi.EXPECT().Download(mock.Anything, "/test/notebook", mock.Anything).Return(notebookContent, nil)
workspaceApi.EXPECT().Download(mock.Anything, "/test/file.py", mock.Anything).Return(pyContent, nil)
cmd.SetContext(bundle.Context(context.Background(), b))
cmd.Flag("existing-pipeline-id").Value.Set("test-pipeline")
configDir := filepath.Join(root, "resources")
cmd.Flag("config-dir").Value.Set(configDir)
srcDir := filepath.Join(root, "src")
cmd.Flag("source-dir").Value.Set(srcDir)
err := cmd.RunE(cmd, []string{})
require.NoError(t, err)
data, err := os.ReadFile(filepath.Join(configDir, "pipeline_test_pipeline.yml"))
require.NoError(t, err)
require.Equal(t, fmt.Sprintf(`resources:
pipelines:
pipeline_test_pipeline:
name: test-pipeline
libraries:
- notebook:
path: %s
- file:
path: %s
`, filepath.Join("..", "src", "notebook.py"), filepath.Join("..", "src", "file.py")), string(data))
data, err = os.ReadFile(filepath.Join(srcDir, "notebook.py"))
require.NoError(t, err)
require.Equal(t, "# Databricks notebook source\nNotebook content", string(data))
data, err = os.ReadFile(filepath.Join(srcDir, "file.py"))
require.NoError(t, err)
require.Equal(t, "Py content", string(data))
}

View File

@ -50,9 +50,9 @@ func NewGenerateJobCommand() *cobra.Command {
return err return err
} }
downloader := newNotebookDownloader(w, sourceDir, configDir) downloader := newDownloader(w, sourceDir, configDir)
for _, task := range job.Settings.Tasks { for _, task := range job.Settings.Tasks {
err := downloader.MarkForDownload(ctx, &task) err := downloader.MarkTaskForDownload(ctx, &task)
if err != nil { if err != nil {
return err return err
} }

View File

@ -0,0 +1,91 @@
package generate
import (
"fmt"
"os"
"path/filepath"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/generate"
"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/dyn"
"github.com/databricks/cli/libs/dyn/yamlsaver"
"github.com/databricks/cli/libs/textutil"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/spf13/cobra"
)
func NewGeneratePipelineCommand() *cobra.Command {
var configDir string
var sourceDir string
var pipelineId string
var force bool
cmd := &cobra.Command{
Use: "pipeline",
Short: "Generate bundle configuration for a pipeline",
PreRunE: root.MustConfigureBundle,
}
cmd.Flags().StringVar(&pipelineId, "existing-pipeline-id", "", `ID of the pipeline to generate config for`)
cmd.MarkFlagRequired("existing-pipeline-id")
wd, err := os.Getwd()
if err != nil {
wd = "."
}
cmd.Flags().StringVarP(&configDir, "config-dir", "d", filepath.Join(wd, "resources"), `Dir path where the output config will be stored`)
cmd.Flags().StringVarP(&sourceDir, "source-dir", "s", filepath.Join(wd, "src"), `Dir path where the downloaded files will be stored`)
cmd.Flags().BoolVarP(&force, "force", "f", false, `Force overwrite existing files in the output directory`)
cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
b := bundle.Get(ctx)
w := b.WorkspaceClient()
pipeline, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{PipelineId: pipelineId})
if err != nil {
return err
}
downloader := newDownloader(w, sourceDir, configDir)
for _, lib := range pipeline.Spec.Libraries {
err := downloader.MarkPipelineLibraryForDownload(ctx, &lib)
if err != nil {
return err
}
}
v, err := generate.ConvertPipelineToValue(pipeline.Spec)
if err != nil {
return err
}
jobKey := fmt.Sprintf("pipeline_%s", textutil.NormalizeString(pipeline.Name))
result := map[string]dyn.Value{
"resources": dyn.V(map[string]dyn.Value{
"pipelines": dyn.V(map[string]dyn.Value{
jobKey: v,
}),
}),
}
err = downloader.FlushToDisk(ctx, force)
if err != nil {
return err
}
filename := filepath.Join(configDir, fmt.Sprintf("%s.yml", jobKey))
err = yamlsaver.SaveAsYAML(result, filename, force)
if err != nil {
return err
}
cmdio.LogString(ctx, fmt.Sprintf("Pipeline configuration successfully saved to %s", filename))
return nil
}
return cmd
}

View File

@ -12,32 +12,69 @@ import (
"github.com/databricks/cli/libs/notebook" "github.com/databricks/cli/libs/notebook"
"github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
type notebookDownloader struct { type downloader struct {
notebooks map[string]string files map[string]string
w *databricks.WorkspaceClient w *databricks.WorkspaceClient
sourceDir string sourceDir string
configDir string configDir string
} }
func (n *notebookDownloader) MarkForDownload(ctx context.Context, task *jobs.Task) error { func (n *downloader) MarkTaskForDownload(ctx context.Context, task *jobs.Task) error {
if task.NotebookTask == nil { if task.NotebookTask == nil {
return nil return nil
} }
info, err := n.w.Workspace.GetStatusByPath(ctx, task.NotebookTask.NotebookPath) return n.markNotebookForDownload(ctx, &task.NotebookTask.NotebookPath)
}
func (n *downloader) MarkPipelineLibraryForDownload(ctx context.Context, lib *pipelines.PipelineLibrary) error {
if lib.Notebook != nil {
return n.markNotebookForDownload(ctx, &lib.Notebook.Path)
}
if lib.File != nil {
return n.markFileForDownload(ctx, &lib.File.Path)
}
return nil
}
func (n *downloader) markFileForDownload(ctx context.Context, filePath *string) error {
_, err := n.w.Workspace.GetStatusByPath(ctx, *filePath)
if err != nil {
return err
}
filename := path.Base(*filePath)
targetPath := filepath.Join(n.sourceDir, filename)
n.files[targetPath] = *filePath
rel, err := filepath.Rel(n.configDir, targetPath)
if err != nil {
return err
}
*filePath = rel
return nil
}
func (n *downloader) markNotebookForDownload(ctx context.Context, notebookPath *string) error {
info, err := n.w.Workspace.GetStatusByPath(ctx, *notebookPath)
if err != nil { if err != nil {
return err return err
} }
ext := notebook.GetExtensionByLanguage(info) ext := notebook.GetExtensionByLanguage(info)
filename := path.Base(task.NotebookTask.NotebookPath) + ext filename := path.Base(*notebookPath) + ext
targetPath := filepath.Join(n.sourceDir, filename) targetPath := filepath.Join(n.sourceDir, filename)
n.notebooks[targetPath] = task.NotebookTask.NotebookPath n.files[targetPath] = *notebookPath
// Update the notebook path to be relative to the config dir // Update the notebook path to be relative to the config dir
rel, err := filepath.Rel(n.configDir, targetPath) rel, err := filepath.Rel(n.configDir, targetPath)
@ -45,18 +82,18 @@ func (n *notebookDownloader) MarkForDownload(ctx context.Context, task *jobs.Tas
return err return err
} }
task.NotebookTask.NotebookPath = rel *notebookPath = rel
return nil return nil
} }
func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error { func (n *downloader) FlushToDisk(ctx context.Context, force bool) error {
err := os.MkdirAll(n.sourceDir, 0755) err := os.MkdirAll(n.sourceDir, 0755)
if err != nil { if err != nil {
return err return err
} }
// First check that all files can be written // First check that all files can be written
for targetPath := range n.notebooks { for targetPath := range n.files {
info, err := os.Stat(targetPath) info, err := os.Stat(targetPath)
if err == nil { if err == nil {
if info.IsDir() { if info.IsDir() {
@ -69,11 +106,11 @@ func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error
} }
errs, errCtx := errgroup.WithContext(ctx) errs, errCtx := errgroup.WithContext(ctx)
for k, v := range n.notebooks { for k, v := range n.files {
targetPath := k targetPath := k
notebookPath := v filePath := v
errs.Go(func() error { errs.Go(func() error {
reader, err := n.w.Workspace.Download(errCtx, notebookPath) reader, err := n.w.Workspace.Download(errCtx, filePath)
if err != nil { if err != nil {
return err return err
} }
@ -89,7 +126,7 @@ func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error
return err return err
} }
cmdio.LogString(errCtx, fmt.Sprintf("Notebook successfully saved to %s", targetPath)) cmdio.LogString(errCtx, fmt.Sprintf("File successfully saved to %s", targetPath))
return reader.Close() return reader.Close()
}) })
} }
@ -97,9 +134,9 @@ func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error
return errs.Wait() return errs.Wait()
} }
func newNotebookDownloader(w *databricks.WorkspaceClient, sourceDir string, configDir string) *notebookDownloader { func newDownloader(w *databricks.WorkspaceClient, sourceDir string, configDir string) *downloader {
return &notebookDownloader{ return &downloader{
notebooks: make(map[string]string), files: make(map[string]string),
w: w, w: w,
sourceDir: sourceDir, sourceDir: sourceDir,
configDir: configDir, configDir: configDir,

View File

@ -0,0 +1,115 @@
package bundle
import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"testing"
"github.com/databricks/cli/internal"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)
func TestAccGenerateFromExistingPipelineAndDeploy(t *testing.T) {
env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV")
t.Log(env)
uniqueId := uuid.New().String()
bundleRoot, err := initTestTemplate(t, "with_includes", map[string]any{
"unique_id": uniqueId,
})
require.NoError(t, err)
pipelineId := createTestPipeline(t)
t.Cleanup(func() {
destroyPipeline(t, pipelineId)
require.NoError(t, err)
})
t.Setenv("BUNDLE_ROOT", bundleRoot)
c := internal.NewCobraTestRunner(t, "bundle", "generate", "pipeline",
"--existing-pipeline-id", fmt.Sprint(pipelineId),
"--config-dir", filepath.Join(bundleRoot, "resources"),
"--source-dir", filepath.Join(bundleRoot, "src"))
_, _, err = c.Run()
require.NoError(t, err)
_, err = os.Stat(filepath.Join(bundleRoot, "src", "notebook.py"))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(bundleRoot, "src", "test.py"))
require.NoError(t, err)
matches, err := filepath.Glob(filepath.Join(bundleRoot, "resources", "pipeline_generated_pipeline_*.yml"))
require.NoError(t, err)
require.Len(t, matches, 1)
// check the content of generated yaml
data, err := os.ReadFile(matches[0])
require.NoError(t, err)
generatedYaml := string(data)
require.Contains(t, generatedYaml, "libraries:")
require.Contains(t, generatedYaml, "- notebook:")
require.Contains(t, generatedYaml, fmt.Sprintf("path: %s", filepath.Join("..", "src", "notebook.py")))
require.Contains(t, generatedYaml, "- file:")
require.Contains(t, generatedYaml, fmt.Sprintf("path: %s", filepath.Join("..", "src", "test.py")))
err = deployBundle(t, bundleRoot)
require.NoError(t, err)
err = destroyBundle(t, bundleRoot)
require.NoError(t, err)
}
func createTestPipeline(t *testing.T) string {
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
ctx := context.Background()
tmpdir := internal.TemporaryWorkspaceDir(t, w)
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
require.NoError(t, err)
err = f.Write(ctx, "notebook.py", strings.NewReader("# Databricks notebook source\nprint('Hello world!'))"))
require.NoError(t, err)
err = f.Write(ctx, "test.py", strings.NewReader("print('Hello!')"))
require.NoError(t, err)
resp, err := w.Pipelines.Create(ctx, pipelines.CreatePipeline{
Name: internal.RandomName("generated-pipeline-"),
Libraries: []pipelines.PipelineLibrary{
{
Notebook: &pipelines.NotebookLibrary{
Path: path.Join(tmpdir, "notebook"),
},
},
{
File: &pipelines.FileLibrary{
Path: path.Join(tmpdir, "test.py"),
},
},
},
})
require.NoError(t, err)
return resp.PipelineId
}
func destroyPipeline(t *testing.T, pipelineId string) {
w, err := databricks.NewWorkspaceClient()
require.NoError(t, err)
ctx := context.Background()
err = w.Pipelines.Delete(ctx, pipelines.DeletePipelineRequest{
PipelineId: pipelineId,
})
require.NoError(t, err)
}