package generate import ( "bytes" "context" "fmt" "io" "os" "path/filepath" "testing" "github.com/databricks/cli/bundle" "github.com/databricks/databricks-sdk-go/experimental/mocks" "github.com/databricks/databricks-sdk-go/service/compute" "github.com/databricks/databricks-sdk-go/service/jobs" "github.com/databricks/databricks-sdk-go/service/pipelines" "github.com/databricks/databricks-sdk-go/service/workspace" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) func TestGeneratePipelineCommand(t *testing.T) { cmd := NewGeneratePipelineCommand() root := t.TempDir() b := &bundle.Bundle{ RootPath: root, } m := mocks.NewMockWorkspaceClient(t) b.SetWorkpaceClient(m.WorkspaceClient) pipelineApi := m.GetMockPipelinesAPI() pipelineApi.EXPECT().Get(mock.Anything, pipelines.GetPipelineRequest{PipelineId: "test-pipeline"}).Return(&pipelines.GetPipelineResponse{ PipelineId: "test-pipeline", Name: "test-pipeline", Spec: &pipelines.PipelineSpec{ Name: "test-pipeline", Clusters: []pipelines.PipelineCluster{ { CustomTags: map[string]string{ "Tag1": "24X7-1234", }, }, { SparkConf: map[string]string{ "spark.databricks.delta.preview.enabled": "true", }, }, }, Libraries: []pipelines.PipelineLibrary{ {Notebook: &pipelines.NotebookLibrary{ Path: "/test/notebook", }}, {File: &pipelines.FileLibrary{ Path: "/test/file.py", }}, }, }, }, nil) workspaceApi := m.GetMockWorkspaceAPI() workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/notebook").Return(&workspace.ObjectInfo{ ObjectType: workspace.ObjectTypeNotebook, Language: workspace.LanguagePython, Path: "/test/notebook", }, nil) workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/file.py").Return(&workspace.ObjectInfo{ ObjectType: workspace.ObjectTypeFile, Path: "/test/file.py", }, nil) notebookContent := io.NopCloser(bytes.NewBufferString("# Databricks notebook source\nNotebook content")) pyContent := io.NopCloser(bytes.NewBufferString("Py content")) workspaceApi.EXPECT().Download(mock.Anything, "/test/notebook", mock.Anything).Return(notebookContent, nil) workspaceApi.EXPECT().Download(mock.Anything, "/test/file.py", mock.Anything).Return(pyContent, nil) cmd.SetContext(bundle.Context(context.Background(), b)) cmd.Flag("existing-pipeline-id").Value.Set("test-pipeline") configDir := filepath.Join(root, "resources") cmd.Flag("config-dir").Value.Set(configDir) srcDir := filepath.Join(root, "src") cmd.Flag("source-dir").Value.Set(srcDir) var key string cmd.Flags().StringVar(&key, "key", "test_pipeline", "") err := cmd.RunE(cmd, []string{}) require.NoError(t, err) data, err := os.ReadFile(filepath.Join(configDir, "test_pipeline.yml")) require.NoError(t, err) require.Equal(t, fmt.Sprintf(`resources: pipelines: test_pipeline: name: test-pipeline clusters: - custom_tags: "Tag1": "24X7-1234" - spark_conf: "spark.databricks.delta.preview.enabled": "true" libraries: - notebook: path: %s - file: path: %s `, filepath.Join("..", "src", "notebook.py"), filepath.Join("..", "src", "file.py")), string(data)) data, err = os.ReadFile(filepath.Join(srcDir, "notebook.py")) require.NoError(t, err) require.Equal(t, "# Databricks notebook source\nNotebook content", string(data)) data, err = os.ReadFile(filepath.Join(srcDir, "file.py")) require.NoError(t, err) require.Equal(t, "Py content", string(data)) } func TestGenerateJobCommand(t *testing.T) { cmd := NewGenerateJobCommand() root := t.TempDir() b := &bundle.Bundle{ RootPath: root, } m := mocks.NewMockWorkspaceClient(t) b.SetWorkpaceClient(m.WorkspaceClient) jobsApi := m.GetMockJobsAPI() jobsApi.EXPECT().Get(mock.Anything, jobs.GetJobRequest{JobId: 1234}).Return(&jobs.Job{ Settings: &jobs.JobSettings{ Name: "test-job", JobClusters: []jobs.JobCluster{ {NewCluster: compute.ClusterSpec{ CustomTags: map[string]string{ "Tag1": "24X7-1234", }, }}, {NewCluster: compute.ClusterSpec{ SparkConf: map[string]string{ "spark.databricks.delta.preview.enabled": "true", }, }}, }, Tasks: []jobs.Task{ { TaskKey: "notebook_task", NotebookTask: &jobs.NotebookTask{ NotebookPath: "/test/notebook", }, }, }, }, }, nil) workspaceApi := m.GetMockWorkspaceAPI() workspaceApi.EXPECT().GetStatusByPath(mock.Anything, "/test/notebook").Return(&workspace.ObjectInfo{ ObjectType: workspace.ObjectTypeNotebook, Language: workspace.LanguagePython, Path: "/test/notebook", }, nil) notebookContent := io.NopCloser(bytes.NewBufferString("# Databricks notebook source\nNotebook content")) workspaceApi.EXPECT().Download(mock.Anything, "/test/notebook", mock.Anything).Return(notebookContent, nil) cmd.SetContext(bundle.Context(context.Background(), b)) cmd.Flag("existing-job-id").Value.Set("1234") configDir := filepath.Join(root, "resources") cmd.Flag("config-dir").Value.Set(configDir) srcDir := filepath.Join(root, "src") cmd.Flag("source-dir").Value.Set(srcDir) var key string cmd.Flags().StringVar(&key, "key", "test_job", "") err := cmd.RunE(cmd, []string{}) require.NoError(t, err) data, err := os.ReadFile(filepath.Join(configDir, "test_job.yml")) require.NoError(t, err) require.Equal(t, fmt.Sprintf(`resources: jobs: test_job: name: test-job job_clusters: - new_cluster: custom_tags: "Tag1": "24X7-1234" - new_cluster: spark_conf: "spark.databricks.delta.preview.enabled": "true" tasks: - task_key: notebook_task notebook_task: notebook_path: %s `, filepath.Join("..", "src", "notebook.py")), string(data)) data, err = os.ReadFile(filepath.Join(srcDir, "notebook.py")) require.NoError(t, err) require.Equal(t, "# Databricks notebook source\nNotebook content", string(data)) }