Correctly transform libraries in for_each_task block (#1340)

## Changes
Now DABs correctly transforms and deploys libraries in for_each_task
block

```
tasks:
  - task_key: my_loop
     for_each_task:
       inputs: "[1,2,3]"
         task:
           task_key: my_loop_iteration 
           libraries:
             - pypi:
                  package: my_package
```

## Tests
Added regression test
This commit is contained in:
Andrew Nester 2024-04-05 17:52:39 +02:00 committed by GitHub
parent 7d1bab7cf0
commit 77ff994d1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 66 additions and 1 deletions

View File

@ -100,6 +100,16 @@ func BundleToTerraform(config *config.Root) *schema.Root {
t.Library = append(t.Library, l) t.Library = append(t.Library, l)
} }
// Convert for_each_task libraries
if v.ForEachTask != nil {
for _, v_ := range v.ForEachTask.Task.Libraries {
var l schema.ResourceJobTaskForEachTaskTaskLibrary
conv(v_, &l)
t.ForEachTask.Task.Library = append(t.ForEachTask.Task.Library, l)
}
}
dst.Task = append(dst.Task, t) dst.Task = append(dst.Task, t)
} }

View File

@ -140,6 +140,50 @@ func TestBundleToTerraformJobTaskLibraries(t *testing.T) {
bundleToTerraformEquivalenceTest(t, &config) bundleToTerraformEquivalenceTest(t, &config)
} }
func TestBundleToTerraformForEachTaskLibraries(t *testing.T) {
var src = resources.Job{
JobSettings: &jobs.JobSettings{
Name: "my job",
Tasks: []jobs.Task{
{
TaskKey: "key",
ForEachTask: &jobs.ForEachTask{
Inputs: "[1,2,3]",
Task: jobs.Task{
TaskKey: "iteration",
Libraries: []compute.Library{
{
Pypi: &compute.PythonPyPiLibrary{
Package: "mlflow",
},
},
},
},
},
},
},
},
}
var config = config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"my_job": &src,
},
},
}
out := BundleToTerraform(&config)
resource := out.Resource.Job["my_job"].(*schema.ResourceJob)
assert.Equal(t, "my job", resource.Name)
require.Len(t, resource.Task, 1)
require.Len(t, resource.Task[0].ForEachTask.Task.Library, 1)
assert.Equal(t, "mlflow", resource.Task[0].ForEachTask.Task.Library[0].Pypi.Package)
bundleToTerraformEquivalenceTest(t, &config)
}
func TestBundleToTerraformPipeline(t *testing.T) { func TestBundleToTerraformPipeline(t *testing.T) {
var src = resources.Pipeline{ var src = resources.Pipeline{
PipelineSpec: &pipelines.PipelineSpec{ PipelineSpec: &pipelines.PipelineSpec{

View File

@ -45,9 +45,20 @@ func convertJobResource(ctx context.Context, vin dyn.Value) (dyn.Value, error) {
// Modify keys in the "task" blocks // Modify keys in the "task" blocks
vout, err = dyn.Map(vout, "task", dyn.Foreach(func(_ dyn.Path, v dyn.Value) (dyn.Value, error) { vout, err = dyn.Map(vout, "task", dyn.Foreach(func(_ dyn.Path, v dyn.Value) (dyn.Value, error) {
// Modify "library" blocks for for_each_task
vout, err = dyn.Map(v, "for_each_task.task", func(_ dyn.Path, v dyn.Value) (dyn.Value, error) {
return renameKeys(v, map[string]string{ return renameKeys(v, map[string]string{
"libraries": "library", "libraries": "library",
}) })
})
if err != nil {
return dyn.InvalidValue, err
}
return renameKeys(vout, map[string]string{
"libraries": "library",
})
})) }))
if err != nil { if err != nil {
return dyn.InvalidValue, err return dyn.InvalidValue, err