diff --git a/bundle/config/resources/job.go b/bundle/config/resources/job.go index edda8a921..bf29106a0 100644 --- a/bundle/config/resources/job.go +++ b/bundle/config/resources/job.go @@ -41,7 +41,7 @@ func (j *Job) MergeJobClusters() error { ref, ok := keys[key] if !ok { output = append(output, j.JobClusters[i]) - keys[key] = &j.JobClusters[i] + keys[key] = &output[len(output)-1] continue } @@ -74,7 +74,7 @@ func (j *Job) MergeTasks() error { ref, ok := keys[key] if !ok { tasks = append(tasks, j.Tasks[i]) - keys[key] = &j.Tasks[i] + keys[key] = &tasks[len(tasks)-1] continue } diff --git a/bundle/config/resources/job_test.go b/bundle/config/resources/job_test.go index 818d2ac21..24b82fabb 100644 --- a/bundle/config/resources/job_test.go +++ b/bundle/config/resources/job_test.go @@ -67,6 +67,9 @@ func TestJobMergeTasks(t *testing.T) { NodeTypeId: "i3.xlarge", NumWorkers: 2, }, + Libraries: []compute.Library{ + {Whl: "package1"}, + }, }, { TaskKey: "bar", @@ -80,6 +83,11 @@ func TestJobMergeTasks(t *testing.T) { NodeTypeId: "i3.2xlarge", NumWorkers: 4, }, + Libraries: []compute.Library{ + {Pypi: &compute.PythonPyPiLibrary{ + Package: "package2", + }}, + }, }, }, }, @@ -93,10 +101,14 @@ func TestJobMergeTasks(t *testing.T) { assert.Equal(t, "bar", j.Tasks[1].TaskKey) // This task was merged with a subsequent one. - task0 := j.Tasks[0].NewCluster - assert.Equal(t, "13.3.x-scala2.12", task0.SparkVersion) - assert.Equal(t, "i3.2xlarge", task0.NodeTypeId) - assert.Equal(t, 4, task0.NumWorkers) + task0 := j.Tasks[0] + cluster := task0.NewCluster + assert.Equal(t, "13.3.x-scala2.12", cluster.SparkVersion) + assert.Equal(t, "i3.2xlarge", cluster.NodeTypeId) + assert.Equal(t, 4, cluster.NumWorkers) + assert.Len(t, task0.Libraries, 2) + assert.Equal(t, task0.Libraries[0].Whl, "package1") + assert.Equal(t, task0.Libraries[1].Pypi.Package, "package2") // This task was left untouched. task1 := j.Tasks[1].NewCluster