Enable job queueing by default (#1385)

## Changes

This enable queueing for jobs by default, following the behavior from
API 2.2+. Queing is a best practice and will be the default in API 2.2.
Since we're still using API 2.1 which has queueing disabled by default,
this PR enables queuing using a mutator.

Customers can manually turn off queueing for any job by adding the
following to their job spec:

```
queue:
  enabled: false
```

## Tests

Unit tests, manual confirmation of property after deployment.

---------

Co-authored-by: Pieter Noordhuis <pcnoordhuis@gmail.com>
This commit is contained in:
Lennart Kats (databricks) 2024-04-22 12:36:39 +02:00 committed by GitHub
parent cd675ded9a
commit 000a7fef8c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 134 additions and 0 deletions

View File

@ -0,0 +1,38 @@
package mutator
import (
"context"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/databricks-sdk-go/service/jobs"
)
type defaultQueueing struct{}
func DefaultQueueing() bundle.Mutator {
return &defaultQueueing{}
}
func (m *defaultQueueing) Name() string {
return "DefaultQueueing"
}
// Enable queueing for jobs by default, following the behavior from API 2.2+.
// As of 2024-04, we're still using API 2.1 which has queueing disabled by default.
// This mutator makes sure queueing is enabled by default before we can adopt API 2.2.
func (m *defaultQueueing) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics {
r := b.Config.Resources
for i := range r.Jobs {
if r.Jobs[i].JobSettings == nil {
r.Jobs[i].JobSettings = &jobs.JobSettings{}
}
if r.Jobs[i].Queue != nil {
continue
}
r.Jobs[i].Queue = &jobs.QueueSettings{
Enabled: true,
}
}
return nil
}

View File

@ -0,0 +1,95 @@
package mutator
import (
"context"
"testing"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
)
func TestDefaultQueueing(t *testing.T) {
m := DefaultQueueing()
assert.IsType(t, &defaultQueueing{}, m)
}
func TestDefaultQueueingName(t *testing.T) {
m := DefaultQueueing()
assert.Equal(t, "DefaultQueueing", m.Name())
}
func TestDefaultQueueingApplyNoJobs(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{},
},
}
d := bundle.Apply(context.Background(), b, DefaultQueueing())
assert.Len(t, d, 0)
assert.Len(t, b.Config.Resources.Jobs, 0)
}
func TestDefaultQueueingApplyJobsAlreadyEnabled(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {
JobSettings: &jobs.JobSettings{
Queue: &jobs.QueueSettings{Enabled: true},
},
},
},
},
},
}
d := bundle.Apply(context.Background(), b, DefaultQueueing())
assert.Len(t, d, 0)
assert.True(t, b.Config.Resources.Jobs["job"].Queue.Enabled)
}
func TestDefaultQueueingApplyEnableQueueing(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job": {},
},
},
},
}
d := bundle.Apply(context.Background(), b, DefaultQueueing())
assert.Len(t, d, 0)
assert.NotNil(t, b.Config.Resources.Jobs["job"].Queue)
assert.True(t, b.Config.Resources.Jobs["job"].Queue.Enabled)
}
func TestDefaultQueueingApplyWithMultipleJobs(t *testing.T) {
b := &bundle.Bundle{
Config: config.Root{
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {
JobSettings: &jobs.JobSettings{
Queue: &jobs.QueueSettings{Enabled: false},
},
},
"job2": {},
"job3": {
JobSettings: &jobs.JobSettings{
Queue: &jobs.QueueSettings{Enabled: true},
},
},
},
},
},
}
d := bundle.Apply(context.Background(), b, DefaultQueueing())
assert.Len(t, d, 0)
assert.False(t, b.Config.Resources.Jobs["job1"].Queue.Enabled)
assert.True(t, b.Config.Resources.Jobs["job2"].Queue.Enabled)
assert.True(t, b.Config.Resources.Jobs["job3"].Queue.Enabled)
}

View File

@ -38,6 +38,7 @@ func Initialize() bundle.Mutator {
mutator.SetRunAs(),
mutator.OverrideCompute(),
mutator.ProcessTargetMode(),
mutator.DefaultQueueing(),
mutator.ExpandPipelineGlobPaths(),
mutator.TranslatePaths(),
python.WrapperWarning(),