Fixed merging task libraries from targets (#868)

## Changes
Previous we (erroneously) kept the reference and merged into the
original tasks and not the copies which we later used to replace
existing tasks. Thus the merging of slices and references was incorrect.

Fixes #864 

## Tests
Added a regression test
This commit is contained in:
Andrew Nester 2023-10-16 10:48:32 +02:00 committed by GitHub
parent b2cb691988
commit 30c4d2e8a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 6 deletions

View File

@ -41,7 +41,7 @@ func (j *Job) MergeJobClusters() error {
ref, ok := keys[key] ref, ok := keys[key]
if !ok { if !ok {
output = append(output, j.JobClusters[i]) output = append(output, j.JobClusters[i])
keys[key] = &j.JobClusters[i] keys[key] = &output[len(output)-1]
continue continue
} }
@ -74,7 +74,7 @@ func (j *Job) MergeTasks() error {
ref, ok := keys[key] ref, ok := keys[key]
if !ok { if !ok {
tasks = append(tasks, j.Tasks[i]) tasks = append(tasks, j.Tasks[i])
keys[key] = &j.Tasks[i] keys[key] = &tasks[len(tasks)-1]
continue continue
} }

View File

@ -67,6 +67,9 @@ func TestJobMergeTasks(t *testing.T) {
NodeTypeId: "i3.xlarge", NodeTypeId: "i3.xlarge",
NumWorkers: 2, NumWorkers: 2,
}, },
Libraries: []compute.Library{
{Whl: "package1"},
},
}, },
{ {
TaskKey: "bar", TaskKey: "bar",
@ -80,6 +83,11 @@ func TestJobMergeTasks(t *testing.T) {
NodeTypeId: "i3.2xlarge", NodeTypeId: "i3.2xlarge",
NumWorkers: 4, NumWorkers: 4,
}, },
Libraries: []compute.Library{
{Pypi: &compute.PythonPyPiLibrary{
Package: "package2",
}},
},
}, },
}, },
}, },
@ -93,10 +101,14 @@ func TestJobMergeTasks(t *testing.T) {
assert.Equal(t, "bar", j.Tasks[1].TaskKey) assert.Equal(t, "bar", j.Tasks[1].TaskKey)
// This task was merged with a subsequent one. // This task was merged with a subsequent one.
task0 := j.Tasks[0].NewCluster task0 := j.Tasks[0]
assert.Equal(t, "13.3.x-scala2.12", task0.SparkVersion) cluster := task0.NewCluster
assert.Equal(t, "i3.2xlarge", task0.NodeTypeId) assert.Equal(t, "13.3.x-scala2.12", cluster.SparkVersion)
assert.Equal(t, 4, task0.NumWorkers) assert.Equal(t, "i3.2xlarge", cluster.NodeTypeId)
assert.Equal(t, 4, cluster.NumWorkers)
assert.Len(t, task0.Libraries, 2)
assert.Equal(t, task0.Libraries[0].Whl, "package1")
assert.Equal(t, task0.Libraries[1].Pypi.Package, "package2")
// This task was left untouched. // This task was left untouched.
task1 := j.Tasks[1].NewCluster task1 := j.Tasks[1].NewCluster