diff --git a/bundle/config/generate/pipeline.go b/bundle/config/generate/pipeline.go new file mode 100644 index 00000000..ba4aedfa --- /dev/null +++ b/bundle/config/generate/pipeline.go @@ -0,0 +1,19 @@ +package generate + +import ( + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/yamlsaver" + "github.com/databricks/databricks-sdk-go/service/pipelines" +) + +var pipelineOrder = yamlsaver.NewOrder([]string{"name", "clusters", "configuration", "libraries"}) + +func ConvertPipelineToValue(pipeline *pipelines.PipelineSpec) (dyn.Value, error) { + value := make(map[string]dyn.Value) + + // We ignore the following fields: + // - id: this is a read-only field + // - storage: changes to this field are rare because changing the storage recreates pipeline-related resources + // - edition: this field is rarely changed + return yamlsaver.ConvertToMapValue(pipeline, pipelineOrder, []string{"id", "storage", "edition"}, value) +} diff --git a/cmd/bundle/generate.go b/cmd/bundle/generate.go index a593f52f..ca2f8cce 100644 --- a/cmd/bundle/generate.go +++ b/cmd/bundle/generate.go @@ -14,5 +14,6 @@ func newGenerateCommand() *cobra.Command { } cmd.AddCommand(generate.NewGenerateJobCommand()) + cmd.AddCommand(generate.NewGeneratePipelineCommand()) return cmd } diff --git a/cmd/bundle/generate/generate_test.go b/cmd/bundle/generate/generate_test.go new file mode 100644 index 00000000..f0693646 --- /dev/null +++ b/cmd/bundle/generate/generate_test.go @@ -0,0 +1,98 @@ +package generate + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/databricks-sdk-go/experimental/mocks" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/databricks/databricks-sdk-go/service/workspace" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestGeneratePipelineCommand(t *testing.T) { + cmd := NewGeneratePipelineCommand() + + root := t.TempDir() + b := &bundle.Bundle{ + Config: config.Root{ + Path: root, + }, + } + + m := mocks.NewMockWorkspaceClient(t) + b.SetWorkpaceClient(m.WorkspaceClient) + pipelineApi := m.GetMockPipelinesAPI() + pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{PipelineId: "test-pipeline"}).Return(&pipelines.GetPipelineResponse{ + PipelineId: "test-pipeline", + Name: "test-pipeline", + Spec: &pipelines.PipelineSpec{ + Name: "test-pipeline", + Libraries: []pipelines.PipelineLibrary{ + {Notebook: &pipelines.NotebookLibrary{ + Path: "/test/notebook", + }}, + {File: &pipelines.FileLibrary{ + Path: "/test/file.py", + }}, + }, + }, + }, nil) + + workspaceApi := m.GetMockWorkspaceAPI() + workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/notebook").Return(&workspace.ObjectInfo{ + ObjectType: workspace.ObjectTypeNotebook, + Language: workspace.LanguagePython, + Path: "/test/notebook", + }, nil) + + workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/file.py").Return(&workspace.ObjectInfo{ + ObjectType: workspace.ObjectTypeFile, + Path: "/test/file.py", + }, nil) + + notebookContent := io.NopCloser(bytes.NewBufferString("# Databricks notebook source\nNotebook content")) + pyContent := io.NopCloser(bytes.NewBufferString("Py content")) + workspaceApi.EXPECT().Download(mock.Anything, "/test/notebook", mock.Anything).Return(notebookContent, nil) + workspaceApi.EXPECT().Download(mock.Anything, "/test/file.py", mock.Anything).Return(pyContent, nil) + + cmd.SetContext(bundle.Context(context.Background(), b)) + cmd.Flag("existing-pipeline-id").Value.Set("test-pipeline") + + configDir := filepath.Join(root, "resources") + cmd.Flag("config-dir").Value.Set(configDir) + + srcDir := filepath.Join(root, "src") + cmd.Flag("source-dir").Value.Set(srcDir) + err := cmd.RunE(cmd, []string{}) + require.NoError(t, err) + + data, err := os.ReadFile(filepath.Join(configDir, "pipeline_test_pipeline.yml")) + require.NoError(t, err) + require.Equal(t, fmt.Sprintf(`resources: + pipelines: + pipeline_test_pipeline: + name: test-pipeline + libraries: + - notebook: + path: %s + - file: + path: %s +`, filepath.Join("..", "src", "notebook.py"), filepath.Join("..", "src", "file.py")), string(data)) + + data, err = os.ReadFile(filepath.Join(srcDir, "notebook.py")) + require.NoError(t, err) + require.Equal(t, "# Databricks notebook source\nNotebook content", string(data)) + + data, err = os.ReadFile(filepath.Join(srcDir, "file.py")) + require.NoError(t, err) + require.Equal(t, "Py content", string(data)) +} diff --git a/cmd/bundle/generate/job.go b/cmd/bundle/generate/job.go index 8e186cc3..51e4fbae 100644 --- a/cmd/bundle/generate/job.go +++ b/cmd/bundle/generate/job.go @@ -50,9 +50,9 @@ func NewGenerateJobCommand() *cobra.Command { return err } - downloader := newNotebookDownloader(w, sourceDir, configDir) + downloader := newDownloader(w, sourceDir, configDir) for _, task := range job.Settings.Tasks { - err := downloader.MarkForDownload(ctx, &task) + err := downloader.MarkTaskForDownload(ctx, &task) if err != nil { return err } diff --git a/cmd/bundle/generate/pipeline.go b/cmd/bundle/generate/pipeline.go new file mode 100644 index 00000000..efe9cc62 --- /dev/null +++ b/cmd/bundle/generate/pipeline.go @@ -0,0 +1,91 @@ +package generate + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config/generate" + "github.com/databricks/cli/cmd/root" + "github.com/databricks/cli/libs/cmdio" + "github.com/databricks/cli/libs/dyn" + "github.com/databricks/cli/libs/dyn/yamlsaver" + "github.com/databricks/cli/libs/textutil" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/spf13/cobra" +) + +func NewGeneratePipelineCommand() *cobra.Command { + var configDir string + var sourceDir string + var pipelineId string + var force bool + + cmd := &cobra.Command{ + Use: "pipeline", + Short: "Generate bundle configuration for a pipeline", + PreRunE: root.MustConfigureBundle, + } + + cmd.Flags().StringVar(&pipelineId, "existing-pipeline-id", "", `ID of the pipeline to generate config for`) + cmd.MarkFlagRequired("existing-pipeline-id") + + wd, err := os.Getwd() + if err != nil { + wd = "." + } + + cmd.Flags().StringVarP(&configDir, "config-dir", "d", filepath.Join(wd, "resources"), `Dir path where the output config will be stored`) + cmd.Flags().StringVarP(&sourceDir, "source-dir", "s", filepath.Join(wd, "src"), `Dir path where the downloaded files will be stored`) + cmd.Flags().BoolVarP(&force, "force", "f", false, `Force overwrite existing files in the output directory`) + + cmd.RunE = func(cmd *cobra.Command, args []string) error { + ctx := cmd.Context() + b := bundle.Get(ctx) + w := b.WorkspaceClient() + + pipeline, err := w.Pipelines.Get(ctx, pipelines.GetPipelineRequest{PipelineId: pipelineId}) + if err != nil { + return err + } + + downloader := newDownloader(w, sourceDir, configDir) + for _, lib := range pipeline.Spec.Libraries { + err := downloader.MarkPipelineLibraryForDownload(ctx, &lib) + if err != nil { + return err + } + } + + v, err := generate.ConvertPipelineToValue(pipeline.Spec) + if err != nil { + return err + } + + jobKey := fmt.Sprintf("pipeline_%s", textutil.NormalizeString(pipeline.Name)) + result := map[string]dyn.Value{ + "resources": dyn.V(map[string]dyn.Value{ + "pipelines": dyn.V(map[string]dyn.Value{ + jobKey: v, + }), + }), + } + + err = downloader.FlushToDisk(ctx, force) + if err != nil { + return err + } + + filename := filepath.Join(configDir, fmt.Sprintf("%s.yml", jobKey)) + err = yamlsaver.SaveAsYAML(result, filename, force) + if err != nil { + return err + } + + cmdio.LogString(ctx, fmt.Sprintf("Pipeline configuration successfully saved to %s", filename)) + return nil + } + + return cmd +} diff --git a/cmd/bundle/generate/utils.go b/cmd/bundle/generate/utils.go index 8d450088..65f69241 100644 --- a/cmd/bundle/generate/utils.go +++ b/cmd/bundle/generate/utils.go @@ -12,32 +12,69 @@ import ( "github.com/databricks/cli/libs/notebook" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/databricks/databricks-sdk-go/service/pipelines" "golang.org/x/sync/errgroup" ) -type notebookDownloader struct { - notebooks map[string]string +type downloader struct { + files map[string]string w *databricks.WorkspaceClient sourceDir string configDir string } -func (n *notebookDownloader) MarkForDownload(ctx context.Context, task *jobs.Task) error { +func (n *downloader) MarkTaskForDownload(ctx context.Context, task *jobs.Task) error { if task.NotebookTask == nil { return nil } - info, err := n.w.Workspace.GetStatusByPath(ctx, task.NotebookTask.NotebookPath) + return n.markNotebookForDownload(ctx, &task.NotebookTask.NotebookPath) +} + +func (n *downloader) MarkPipelineLibraryForDownload(ctx context.Context, lib *pipelines.PipelineLibrary) error { + if lib.Notebook != nil { + return n.markNotebookForDownload(ctx, &lib.Notebook.Path) + } + + if lib.File != nil { + return n.markFileForDownload(ctx, &lib.File.Path) + } + + return nil +} + +func (n *downloader) markFileForDownload(ctx context.Context, filePath *string) error { + _, err := n.w.Workspace.GetStatusByPath(ctx, *filePath) + if err != nil { + return err + } + + filename := path.Base(*filePath) + targetPath := filepath.Join(n.sourceDir, filename) + + n.files[targetPath] = *filePath + + rel, err := filepath.Rel(n.configDir, targetPath) + if err != nil { + return err + } + + *filePath = rel + return nil +} + +func (n *downloader) markNotebookForDownload(ctx context.Context, notebookPath *string) error { + info, err := n.w.Workspace.GetStatusByPath(ctx, *notebookPath) if err != nil { return err } ext := notebook.GetExtensionByLanguage(info) - filename := path.Base(task.NotebookTask.NotebookPath) + ext + filename := path.Base(*notebookPath) + ext targetPath := filepath.Join(n.sourceDir, filename) - n.notebooks[targetPath] = task.NotebookTask.NotebookPath + n.files[targetPath] = *notebookPath // Update the notebook path to be relative to the config dir rel, err := filepath.Rel(n.configDir, targetPath) @@ -45,18 +82,18 @@ func (n *notebookDownloader) MarkForDownload(ctx context.Context, task *jobs.Tas return err } - task.NotebookTask.NotebookPath = rel + *notebookPath = rel return nil } -func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error { +func (n *downloader) FlushToDisk(ctx context.Context, force bool) error { err := os.MkdirAll(n.sourceDir, 0755) if err != nil { return err } // First check that all files can be written - for targetPath := range n.notebooks { + for targetPath := range n.files { info, err := os.Stat(targetPath) if err == nil { if info.IsDir() { @@ -69,11 +106,11 @@ func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error } errs, errCtx := errgroup.WithContext(ctx) - for k, v := range n.notebooks { + for k, v := range n.files { targetPath := k - notebookPath := v + filePath := v errs.Go(func() error { - reader, err := n.w.Workspace.Download(errCtx, notebookPath) + reader, err := n.w.Workspace.Download(errCtx, filePath) if err != nil { return err } @@ -89,7 +126,7 @@ func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error return err } - cmdio.LogString(errCtx, fmt.Sprintf("Notebook successfully saved to %s", targetPath)) + cmdio.LogString(errCtx, fmt.Sprintf("File successfully saved to %s", targetPath)) return reader.Close() }) } @@ -97,9 +134,9 @@ func (n *notebookDownloader) FlushToDisk(ctx context.Context, force bool) error return errs.Wait() } -func newNotebookDownloader(w *databricks.WorkspaceClient, sourceDir string, configDir string) *notebookDownloader { - return ¬ebookDownloader{ - notebooks: make(map[string]string), +func newDownloader(w *databricks.WorkspaceClient, sourceDir string, configDir string) *downloader { + return &downloader{ + files: make(map[string]string), w: w, sourceDir: sourceDir, configDir: configDir, diff --git a/internal/bundle/generate_pipeline_test.go b/internal/bundle/generate_pipeline_test.go new file mode 100644 index 00000000..85fbbb2f --- /dev/null +++ b/internal/bundle/generate_pipeline_test.go @@ -0,0 +1,115 @@ +package bundle + +import ( + "context" + "fmt" + "os" + "path" + "path/filepath" + "strings" + "testing" + + "github.com/databricks/cli/internal" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/service/pipelines" + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func TestAccGenerateFromExistingPipelineAndDeploy(t *testing.T) { + env := internal.GetEnvOrSkipTest(t, "CLOUD_ENV") + t.Log(env) + + uniqueId := uuid.New().String() + bundleRoot, err := initTestTemplate(t, "with_includes", map[string]any{ + "unique_id": uniqueId, + }) + require.NoError(t, err) + + pipelineId := createTestPipeline(t) + t.Cleanup(func() { + destroyPipeline(t, pipelineId) + require.NoError(t, err) + }) + + t.Setenv("BUNDLE_ROOT", bundleRoot) + c := internal.NewCobraTestRunner(t, "bundle", "generate", "pipeline", + "--existing-pipeline-id", fmt.Sprint(pipelineId), + "--config-dir", filepath.Join(bundleRoot, "resources"), + "--source-dir", filepath.Join(bundleRoot, "src")) + _, _, err = c.Run() + require.NoError(t, err) + + _, err = os.Stat(filepath.Join(bundleRoot, "src", "notebook.py")) + require.NoError(t, err) + + _, err = os.Stat(filepath.Join(bundleRoot, "src", "test.py")) + require.NoError(t, err) + + matches, err := filepath.Glob(filepath.Join(bundleRoot, "resources", "pipeline_generated_pipeline_*.yml")) + require.NoError(t, err) + require.Len(t, matches, 1) + + // check the content of generated yaml + data, err := os.ReadFile(matches[0]) + require.NoError(t, err) + generatedYaml := string(data) + require.Contains(t, generatedYaml, "libraries:") + require.Contains(t, generatedYaml, "- notebook:") + require.Contains(t, generatedYaml, fmt.Sprintf("path: %s", filepath.Join("..", "src", "notebook.py"))) + require.Contains(t, generatedYaml, "- file:") + require.Contains(t, generatedYaml, fmt.Sprintf("path: %s", filepath.Join("..", "src", "test.py"))) + + err = deployBundle(t, bundleRoot) + require.NoError(t, err) + + err = destroyBundle(t, bundleRoot) + require.NoError(t, err) +} + +func createTestPipeline(t *testing.T) string { + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + + ctx := context.Background() + tmpdir := internal.TemporaryWorkspaceDir(t, w) + f, err := filer.NewWorkspaceFilesClient(w, tmpdir) + require.NoError(t, err) + + err = f.Write(ctx, "notebook.py", strings.NewReader("# Databricks notebook source\nprint('Hello world!'))")) + require.NoError(t, err) + + err = f.Write(ctx, "test.py", strings.NewReader("print('Hello!')")) + require.NoError(t, err) + + resp, err := w.Pipelines.Create(ctx, pipelines.CreatePipeline{ + Name: internal.RandomName("generated-pipeline-"), + Libraries: []pipelines.PipelineLibrary{ + { + Notebook: &pipelines.NotebookLibrary{ + Path: path.Join(tmpdir, "notebook"), + }, + }, + { + File: &pipelines.FileLibrary{ + Path: path.Join(tmpdir, "test.py"), + }, + }, + }, + }) + require.NoError(t, err) + + return resp.PipelineId +} + +func destroyPipeline(t *testing.T, pipelineId string) { + w, err := databricks.NewWorkspaceClient() + require.NoError(t, err) + + ctx := context.Background() + err = w.Pipelines.Delete(ctx, pipelines.DeletePipelineRequest{ + PipelineId: pipelineId, + }) + require.NoError(t, err) +}