Fixed spark version check for clusters defined in the same bundle (#2374)

## Changes
Previously using python wheel tasks in the tasks with compute referering
to interactive cluster defied in the same bundle would produce a warning
like below

```
GET /api/2.1/clusters/get?cluster_id=${resources.clusters.development_cluster.id}
< HTTP/2.0 400 Bad Request
< {
<   "error_code": "INVALID_PARAMETER_VALUE",
<   "message": "Cluster ${resources.clusters.development_cluster.id} does not exist"
< } pid=14465 mutator=seq mutator=initialize mutator=seq mutator=PythonWrapperWarning sdk=true
```

This PR fixes it by making sure that we check spark version for such
clusters based on its bundle configuration and don't make API calls

## Tests
Added acceptance test
This commit is contained in:
Andrew Nester 2025-02-26 12:04:45 +00:00 committed by GitHub
parent 9bacf221d7
commit cdea775bd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 191 additions and 6 deletions

View File

@ -0,0 +1,37 @@
bundle:
name: trampoline_warning_message
targets:
dev:
mode: development
default: true
prod:
resources:
clusters:
interactive_cluster:
spark_version: 14.2.x-cpu-ml-scala2.12
resources:
clusters:
interactive_cluster:
cluster_name: jobs-as-code-all-purpose-cluster
spark_version: 12.2.x-cpu-ml-scala2.12
node_type_id: r5d.8xlarge
autotermination_minutes: 30
autoscale:
min_workers: 1
max_workers: 1
driver_node_type_id: r5d.8xlarge
jobs:
whl:
name: "wheel-job"
tasks:
- task_key: test_task
python_wheel_task:
package_name: my_package
entry_point: my_module.my_function
existing_cluster_id: ${resources.clusters.interactive_cluster.id}
libraries:
- whl: ./dist/*.whl

View File

@ -0,0 +1,22 @@
>>> errcode [CLI] bundle validate -t dev
Error: Python wheel tasks require compute with DBR 13.3+ to include local libraries. Please change your cluster configuration or use the experimental 'python_wheel_wrapper' setting. See https://docs.databricks.com/dev-tools/bundles/python-wheel.html for more information.
Name: trampoline_warning_message
Target: dev
Workspace:
User: [USERNAME]
Path: /Workspace/Users/[USERNAME]/.bundle/trampoline_warning_message/dev
Found 1 error
Exit code: 1
>>> errcode [CLI] bundle validate -t prod
Name: trampoline_warning_message
Target: prod
Workspace:
User: [USERNAME]
Path: /Workspace/Users/[USERNAME]/.bundle/trampoline_warning_message/prod
Validation OK!

View File

@ -0,0 +1,2 @@
trace errcode $CLI bundle validate -t dev
trace errcode $CLI bundle validate -t prod

View File

@ -0,0 +1,20 @@
bundle:
name: trampoline_warning_message_with_new_spark
targets:
dev:
mode: development
default: true
resources:
jobs:
whl:
name: "wheel-job"
tasks:
- task_key: test_task
python_wheel_task:
package_name: my_package
entry_point: my_module.my_function
existing_cluster_id: "some-test-cluster-id"
libraries:
- whl: ./dist/*.whl

View File

@ -0,0 +1,9 @@
>>> errcode [CLI] bundle validate
Name: trampoline_warning_message_with_new_spark
Target: dev
Workspace:
User: [USERNAME]
Path: /Workspace/Users/[USERNAME]/.bundle/trampoline_warning_message_with_new_spark/dev
Validation OK!

View File

@ -0,0 +1 @@
trace errcode $CLI bundle validate

View File

@ -0,0 +1,16 @@
# Since we use existing cluster id value which is not available in cloud envs, we need to stub the request
# and run this test only locally
LocalOnly = true
[[Server]]
Pattern = "GET /api/2.1/clusters/get"
Response.Body = '''
{
"cluster_id": "some-cluster-id",
"state": "RUNNING",
"spark_version": "13.3.x-scala2.12",
"node_type_id": "Standard_DS3_v2",
"driver_node_type_id": "Standard_DS3_v2",
"cluster_name": "some-cluster-name",
"spark_context_id": 12345
}'''

View File

@ -0,0 +1,20 @@
bundle:
name: trampoline_warning_message_with_old_spark
targets:
dev:
mode: development
default: true
resources:
jobs:
whl:
name: "wheel-job"
tasks:
- task_key: test_task
python_wheel_task:
package_name: my_package
entry_point: my_module.my_function
existing_cluster_id: "some-test-cluster-id"
libraries:
- whl: ./dist/*.whl

View File

@ -0,0 +1,13 @@
>>> errcode [CLI] bundle validate
Error: Python wheel tasks require compute with DBR 13.3+ to include local libraries. Please change your cluster configuration or use the experimental 'python_wheel_wrapper' setting. See https://docs.databricks.com/dev-tools/bundles/python-wheel.html for more information.
Name: trampoline_warning_message_with_old_spark
Target: dev
Workspace:
User: [USERNAME]
Path: /Workspace/Users/[USERNAME]/.bundle/trampoline_warning_message_with_old_spark/dev
Found 1 error
Exit code: 1

View File

@ -0,0 +1 @@
trace errcode $CLI bundle validate

View File

@ -0,0 +1,16 @@
# Since we use existing cluster id value which is not available in cloud envs, we need to stub the request
# and run this test only locally
LocalOnly = true
[[Server]]
Pattern = "GET /api/2.1/clusters/get"
Response.Body = '''
{
"cluster_id": "some-cluster-id",
"state": "RUNNING",
"spark_version": "7.3.x-scala2.12",
"node_type_id": "Standard_DS3_v2",
"driver_node_type_id": "Standard_DS3_v2",
"cluster_name": "some-cluster-name",
"spark_context_id": 12345
}'''

View File

@ -9,6 +9,7 @@ import (
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/libs/diag"
"github.com/databricks/cli/libs/dyn/dynvar"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
"golang.org/x/mod/semver"
@ -60,11 +61,37 @@ func hasIncompatibleWheelTasks(ctx context.Context, b *bundle.Bundle) bool {
}
if task.ExistingClusterId != "" {
version, err := getSparkVersionForCluster(ctx, b.WorkspaceClient(), task.ExistingClusterId)
// If there's error getting spark version for cluster, do not mark it as incompatible
if err != nil {
log.Warnf(ctx, "unable to get spark version for cluster %s, err: %s", task.ExistingClusterId, err.Error())
return false
var version string
var err error
// If the cluster id is a variable and it's not resolved, it means it references a cluster defined in the same bundle.
// So we can get the version from the cluster definition.
// It's defined in a form of resources.clusters.<cluster_key>.id
if strings.HasPrefix(task.ExistingClusterId, "${") {
p, ok := dynvar.PureReferenceToPath(task.ExistingClusterId)
if !ok || len(p) < 3 {
log.Warnf(ctx, "unable to parse cluster key from %s", task.ExistingClusterId)
return false
}
if p[0].Key() != "resources" || p[1].Key() != "clusters" {
log.Warnf(ctx, "incorrect variable reference for cluster id %s", task.ExistingClusterId)
return false
}
clusterKey := p[2].Key()
cluster, ok := b.Config.Resources.Clusters[clusterKey]
if !ok {
log.Warnf(ctx, "unable to find cluster with key %s", clusterKey)
return false
}
version = cluster.SparkVersion
} else {
version, err = getSparkVersionForCluster(ctx, b.WorkspaceClient(), task.ExistingClusterId)
// If there's error getting spark version for cluster, do not mark it as incompatible
if err != nil {
log.Warnf(ctx, "unable to get spark version for cluster %s, err: %s", task.ExistingClusterId, err.Error())
return false
}
}
if lowerThanExpectedVersion(version) {
@ -82,7 +109,7 @@ func lowerThanExpectedVersion(sparkVersion string) bool {
return false
}
if parts[1][0] == 'x' { // treat versions like 13.x as the very latest minor (13.99)
if len(parts[1]) > 0 && parts[1][0] == 'x' { // treat versions like 13.x as the very latest minor (13.99)
parts[1] = "99"
}

View File

@ -346,6 +346,7 @@ func TestSparkVersionLowerThanExpected(t *testing.T) {
"13.x-rc-scala-2.12": false,
"client.1.10-scala2.12": false,
"latest-stable-gpu-scala2.11": false,
"1.": false,
"10.4.x-aarch64-photon-scala2.12": true,
"10.4.x-scala2.12": true,
"13.0.x-scala2.12": true,