2023-09-27 08:26:59 +00:00
package python
import (
"context"
"strings"
"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
2024-03-25 14:18:47 +00:00
"github.com/databricks/cli/libs/diag"
2023-09-29 12:19:05 +00:00
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go"
2023-09-27 08:26:59 +00:00
"golang.org/x/mod/semver"
)
type wrapperWarning struct {
}
func WrapperWarning ( ) bundle . Mutator {
return & wrapperWarning { }
}
2024-03-25 14:18:47 +00:00
func ( m * wrapperWarning ) Apply ( ctx context . Context , b * bundle . Bundle ) diag . Diagnostics {
2023-10-20 12:32:04 +00:00
if isPythonWheelWrapperOn ( b ) {
return nil
}
2023-09-27 08:26:59 +00:00
if hasIncompatibleWheelTasks ( ctx , b ) {
2024-03-25 14:18:47 +00:00
return diag . Errorf ( "python wheel tasks with local libraries require compute with DBR 13.1+. Please change your cluster configuration or set experimental 'python_wheel_wrapper' setting to 'true'" )
2023-09-27 08:26:59 +00:00
}
return nil
}
2023-10-20 12:32:04 +00:00
func isPythonWheelWrapperOn ( b * bundle . Bundle ) bool {
return b . Config . Experimental != nil && b . Config . Experimental . PythonWheelWrapper
}
2023-09-27 08:26:59 +00:00
func hasIncompatibleWheelTasks ( ctx context . Context , b * bundle . Bundle ) bool {
tasks := libraries . FindAllWheelTasksWithLocalLibraries ( b )
for _ , task := range tasks {
if task . NewCluster != nil {
if lowerThanExpectedVersion ( ctx , task . NewCluster . SparkVersion ) {
return true
}
}
if task . JobClusterKey != "" {
for _ , job := range b . Config . Resources . Jobs {
for _ , cluster := range job . JobClusters {
2024-04-03 10:39:53 +00:00
if task . JobClusterKey == cluster . JobClusterKey && cluster . NewCluster . SparkVersion != "" {
2023-09-27 08:26:59 +00:00
if lowerThanExpectedVersion ( ctx , cluster . NewCluster . SparkVersion ) {
return true
}
}
}
}
}
2023-09-29 12:19:05 +00:00
if task . ExistingClusterId != "" {
version , err := getSparkVersionForCluster ( ctx , b . WorkspaceClient ( ) , task . ExistingClusterId )
// If there's error getting spark version for cluster, do not mark it as incompatible
if err != nil {
log . Warnf ( ctx , "unable to get spark version for cluster %s, err: %s" , task . ExistingClusterId , err . Error ( ) )
return false
}
if lowerThanExpectedVersion ( ctx , version ) {
return true
}
}
2023-09-27 08:26:59 +00:00
}
return false
}
func lowerThanExpectedVersion ( ctx context . Context , sparkVersion string ) bool {
parts := strings . Split ( sparkVersion , "." )
if len ( parts ) < 2 {
return false
}
2023-10-23 08:19:26 +00:00
if parts [ 1 ] [ 0 ] == 'x' { // treat versions like 13.x as the very latest minor (13.99)
parts [ 1 ] = "99"
}
2023-09-27 08:26:59 +00:00
v := "v" + parts [ 0 ] + "." + parts [ 1 ]
return semver . Compare ( v , "v13.1" ) < 0
}
// Name implements bundle.Mutator.
func ( m * wrapperWarning ) Name ( ) string {
return "PythonWrapperWarning"
}
2023-09-29 12:19:05 +00:00
func getSparkVersionForCluster ( ctx context . Context , w * databricks . WorkspaceClient , clusterId string ) ( string , error ) {
details , err := w . Clusters . GetByClusterId ( ctx , clusterId )
if err != nil {
return "" , err
}
return details . SparkVersion , nil
}