From 000a7fef8c902a01d0ae577d0140070f1ba39f3c Mon Sep 17 00:00:00 2001 From: "Lennart Kats (databricks)" Date: Mon, 22 Apr 2024 12:36:39 +0200 Subject: [PATCH] Enable job queueing by default (#1385) ## Changes This enable queueing for jobs by default, following the behavior from API 2.2+. Queing is a best practice and will be the default in API 2.2. Since we're still using API 2.1 which has queueing disabled by default, this PR enables queuing using a mutator. Customers can manually turn off queueing for any job by adding the following to their job spec: ``` queue: enabled: false ``` ## Tests Unit tests, manual confirmation of property after deployment. --------- Co-authored-by: Pieter Noordhuis --- bundle/config/mutator/default_queueing.go | 38 ++++++++ .../config/mutator/default_queueing_test.go | 95 +++++++++++++++++++ bundle/phases/initialize.go | 1 + 3 files changed, 134 insertions(+) create mode 100644 bundle/config/mutator/default_queueing.go create mode 100644 bundle/config/mutator/default_queueing_test.go diff --git a/bundle/config/mutator/default_queueing.go b/bundle/config/mutator/default_queueing.go new file mode 100644 index 00000000..ead77c7a --- /dev/null +++ b/bundle/config/mutator/default_queueing.go @@ -0,0 +1,38 @@ +package mutator + +import ( + "context" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/diag" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +type defaultQueueing struct{} + +func DefaultQueueing() bundle.Mutator { + return &defaultQueueing{} +} + +func (m *defaultQueueing) Name() string { + return "DefaultQueueing" +} + +// Enable queueing for jobs by default, following the behavior from API 2.2+. +// As of 2024-04, we're still using API 2.1 which has queueing disabled by default. +// This mutator makes sure queueing is enabled by default before we can adopt API 2.2. +func (m *defaultQueueing) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { + r := b.Config.Resources + for i := range r.Jobs { + if r.Jobs[i].JobSettings == nil { + r.Jobs[i].JobSettings = &jobs.JobSettings{} + } + if r.Jobs[i].Queue != nil { + continue + } + r.Jobs[i].Queue = &jobs.QueueSettings{ + Enabled: true, + } + } + return nil +} diff --git a/bundle/config/mutator/default_queueing_test.go b/bundle/config/mutator/default_queueing_test.go new file mode 100644 index 00000000..ea60daf7 --- /dev/null +++ b/bundle/config/mutator/default_queueing_test.go @@ -0,0 +1,95 @@ +package mutator + +import ( + "context" + "testing" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/config" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" +) + +func TestDefaultQueueing(t *testing.T) { + m := DefaultQueueing() + assert.IsType(t, &defaultQueueing{}, m) +} + +func TestDefaultQueueingName(t *testing.T) { + m := DefaultQueueing() + assert.Equal(t, "DefaultQueueing", m.Name()) +} + +func TestDefaultQueueingApplyNoJobs(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{}, + }, + } + d := bundle.Apply(context.Background(), b, DefaultQueueing()) + assert.Len(t, d, 0) + assert.Len(t, b.Config.Resources.Jobs, 0) +} + +func TestDefaultQueueingApplyJobsAlreadyEnabled(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": { + JobSettings: &jobs.JobSettings{ + Queue: &jobs.QueueSettings{Enabled: true}, + }, + }, + }, + }, + }, + } + d := bundle.Apply(context.Background(), b, DefaultQueueing()) + assert.Len(t, d, 0) + assert.True(t, b.Config.Resources.Jobs["job"].Queue.Enabled) +} + +func TestDefaultQueueingApplyEnableQueueing(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job": {}, + }, + }, + }, + } + d := bundle.Apply(context.Background(), b, DefaultQueueing()) + assert.Len(t, d, 0) + assert.NotNil(t, b.Config.Resources.Jobs["job"].Queue) + assert.True(t, b.Config.Resources.Jobs["job"].Queue.Enabled) +} + +func TestDefaultQueueingApplyWithMultipleJobs(t *testing.T) { + b := &bundle.Bundle{ + Config: config.Root{ + Resources: config.Resources{ + Jobs: map[string]*resources.Job{ + "job1": { + JobSettings: &jobs.JobSettings{ + Queue: &jobs.QueueSettings{Enabled: false}, + }, + }, + "job2": {}, + "job3": { + JobSettings: &jobs.JobSettings{ + Queue: &jobs.QueueSettings{Enabled: true}, + }, + }, + }, + }, + }, + } + d := bundle.Apply(context.Background(), b, DefaultQueueing()) + assert.Len(t, d, 0) + assert.False(t, b.Config.Resources.Jobs["job1"].Queue.Enabled) + assert.True(t, b.Config.Resources.Jobs["job2"].Queue.Enabled) + assert.True(t, b.Config.Resources.Jobs["job3"].Queue.Enabled) +} diff --git a/bundle/phases/initialize.go b/bundle/phases/initialize.go index d6a1b95d..2f5eab30 100644 --- a/bundle/phases/initialize.go +++ b/bundle/phases/initialize.go @@ -38,6 +38,7 @@ func Initialize() bundle.Mutator { mutator.SetRunAs(), mutator.OverrideCompute(), mutator.ProcessTargetMode(), + mutator.DefaultQueueing(), mutator.ExpandPipelineGlobPaths(), mutator.TranslatePaths(), python.WrapperWarning(),