mirror of https://github.com/databricks/cli.git
Compare commits
39 Commits
Author | SHA1 | Date |
029c292cf3 | |
24ac8d8d59 | |
5d392acbef | |
272ce61302 | |
878fa80322 | |
8d849fe868 | |
e9b0afb337 | |
ee9499bc68 | |
f4623ebbb9 | |
ac37ca0d98 | |
7ab9fb7cec | |
9552131a2a | |
583637aed6 | |
6991dea00b | |
1e2545eedf | |
f70c47253e | |
890b48f70d | |
8ec1e0746d | |
95f41b1f30 | |
92e97ad413 | |
69fdd9736b | |
ee8017357e | |
7084392a0f | |
be62ead7be | |
cf51636faa | |
09bf4fa90c | |
9d8ba099ba | |
932aeee349 | |
63e599ccb2 | |
0ce50fadf0 | |
2717aca239 | |
78b9788bc6 | |
da46c142e0 | |
f690f0a342 | |
04cae2f8cf | |
4b484fdcdc | |
06af01c8f6 | |
91a2dfa0ed | |
c4cea1aeff |
@ -114,3 +114,7 @@ dario.cat/mergo
Copyright (c) 2013 Dario Castañé. All rights reserved.
Copyright (c) 2012 The Go Authors. All rights reserved.
Copyright (c) 2023 The Gorilla Authors. All rights reserved.
@ -11,6 +11,7 @@ import (
@ -26,6 +27,7 @@ import (
@ -72,7 +74,8 @@ func TestInprocessMode(t *testing.T) {
if InprocessMode {
t.Skip("Already tested by TestAccept")
require.Equal(t, 1, testAccept(t, true, "selftest"))
require.Equal(t, 1, testAccept(t, true, "selftest/basic"))
require.Equal(t, 1, testAccept(t, true, "selftest/server"))
func testAccept(t *testing.T, InprocessMode bool, singleTest string) int {
@ -118,14 +121,12 @@ func testAccept(t *testing.T, InprocessMode bool, singleTest string) int {
uvCache := getUVDefaultCacheDir(t)
t.Setenv("UV_CACHE_DIR", uvCache)
ctx := context.Background()
cloudEnv := os.Getenv("CLOUD_ENV")
if cloudEnv == "" {
defaultServer := testserver.New(t)
// Redirect API access to local server:
t.Setenv("DATABRICKS_HOST", defaultServer.URL)
t.Setenv("DATABRICKS_DEFAULT_HOST", defaultServer.URL)
homeDir := t.TempDir()
// Do not read user's ~/.databrickscfg
@ -148,27 +149,12 @@ func testAccept(t *testing.T, InprocessMode bool, singleTest string) int {
// do it last so that full paths match first:
repls.SetPath(buildDir, "[BUILD_DIR]")
var config databricks.Config
if cloudEnv == "" {
// use fake token for local tests
config = databricks.Config{Token: "dbapi1234"}
} else {
// non-local tests rely on environment variables
config = databricks.Config{}
workspaceClient, err := databricks.NewWorkspaceClient(&config)
require.NoError(t, err)
user, err := workspaceClient.CurrentUser.Me(ctx)
require.NoError(t, err)
require.NotNil(t, user)
testdiff.PrepareReplacementsUser(t, &repls, *user)
testdiff.PrepareReplacementsWorkspaceClient(t, &repls, workspaceClient)
testdiff.PrepareReplacementsUUID(t, &repls)
testdiff.PrepareReplacementsDevVersion(t, &repls)
testdiff.PrepareReplacementSdkVersion(t, &repls)
testdiff.PrepareReplacementsGoVersion(t, &repls)
repls.Repls = append(repls.Repls, testdiff.Replacement{Old: regexp.MustCompile("dbapi[0-9a-f]+"), New: "[DATABRICKS_TOKEN]"})
testDirs := getTests(t)
require.NotEmpty(t, testDirs)
@ -180,8 +166,7 @@ func testAccept(t *testing.T, InprocessMode bool, singleTest string) int {
for _, dir := range testDirs {
testName := strings.ReplaceAll(dir, "\\", "/")
t.Run(testName, func(t *testing.T) {
t.Run(dir, func(t *testing.T) {
if !InprocessMode {
@ -203,7 +188,8 @@ func getTests(t *testing.T) []string {
name := filepath.Base(path)
if name == EntryPointScript {
// Presence of 'script' marks a test case in this directory
testDirs = append(testDirs, filepath.Dir(path))
testName := filepath.ToSlash(filepath.Dir(path))
testDirs = append(testDirs, testName)
return nil
@ -239,7 +225,6 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont
repls.SetPathWithParents(tmpDir, "[TMPDIR]")
repls.Repls = append(repls.Repls, config.Repls...)
scriptContents := readMergedScriptContents(t, dir)
testutil.WriteFile(t, filepath.Join(tmpDir, EntryPointScript), scriptContents)
@ -253,38 +238,79 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont
cmd := exec.Command(args[0], args[1:]...)
cmd.Env = os.Environ()
var workspaceClient *databricks.WorkspaceClient
var user iam.User
// Start a new server with a custom configuration if the acceptance test
// specifies a custom server stubs.
var server *testserver.Server
// Start a new server for this test if either:
// 1. A custom server spec is defined in the test configuration.
// 2. The test is configured to record requests and assert on them. We need
// a duplicate of the default server to record requests because the default
// server otherwise is a shared resource.
if len(config.Server) > 0 || config.RecordRequests {
server = testserver.New(t)
server.RecordRequests = config.RecordRequests
server.IncludeRequestHeaders = config.IncludeRequestHeaders
if cloudEnv == "" {
// Start a new server for this test if either:
// 1. A custom server spec is defined in the test configuration.
// 2. The test is configured to record requests and assert on them. We need
// a duplicate of the default server to record requests because the default
// server otherwise is a shared resource.
// If no custom server stubs are defined, add the default handlers.
if len(config.Server) == 0 {
databricksLocalHost := os.Getenv("DATABRICKS_DEFAULT_HOST")
if len(config.Server) > 0 || config.RecordRequests {
server = testserver.New(t)
server.RecordRequests = config.RecordRequests
server.IncludeRequestHeaders = config.IncludeRequestHeaders
for _, stub := range config.Server {
require.NotEmpty(t, stub.Pattern)
items := strings.Split(stub.Pattern, " ")
require.Len(t, items, 2)
server.Handle(items[0], items[1], func(fakeWorkspace *testserver.FakeWorkspace, req *http.Request) (any, int) {
statusCode := http.StatusOK
if stub.Response.StatusCode != 0 {
statusCode = stub.Response.StatusCode
return stub.Response.Body, statusCode
// The earliest handlers take precedence, add default handlers last
databricksLocalHost = server.URL
for _, stub := range config.Server {
require.NotEmpty(t, stub.Pattern)
server.Handle(stub.Pattern, func(fakeWorkspace *testserver.FakeWorkspace, req *http.Request) (any, int) {
statusCode := http.StatusOK
if stub.Response.StatusCode != 0 {
statusCode = stub.Response.StatusCode
return stub.Response.Body, statusCode
// Each local test should use a new token that will result into a new fake workspace,
// so that test don't interfere with each other.
tokenSuffix := strings.ReplaceAll(uuid.NewString(), "-", "")
config := databricks.Config{
Host: databricksLocalHost,
Token: "dbapi" + tokenSuffix,
cmd.Env = append(cmd.Env, "DATABRICKS_HOST="+server.URL)
workspaceClient, err = databricks.NewWorkspaceClient(&config)
require.NoError(t, err)
cmd.Env = append(cmd.Env, "DATABRICKS_HOST="+config.Host)
cmd.Env = append(cmd.Env, "DATABRICKS_TOKEN="+config.Token)
// For the purposes of replacements, use testUser.
// Note, users might have overriden /api/2.0/preview/scim/v2/Me but that should not affect the replacement:
user = testUser
} else {
// Use whatever authentication mechanism is configured by the test runner.
workspaceClient, err = databricks.NewWorkspaceClient(&databricks.Config{})
require.NoError(t, err)
pUser, err := workspaceClient.CurrentUser.Me(context.Background())
require.NoError(t, err, "Failed to get current user")
user = *pUser
testdiff.PrepareReplacementsUser(t, &repls, user)
testdiff.PrepareReplacementsWorkspaceClient(t, &repls, workspaceClient)
// Must be added PrepareReplacementsUser, otherwise conflicts with [USERNAME]
testdiff.PrepareReplacementsUUID(t, &repls)
// User replacements come last:
repls.Repls = append(repls.Repls, config.Repls...)
if coverDir != "" {
// Creating individual coverage directory for each test, because writing to the same one
// results in sporadic failures like this one (only if tests are running in parallel):
@ -295,15 +321,6 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont
cmd.Env = append(cmd.Env, "GOCOVERDIR="+coverDir)
// Each local test should use a new token that will result into a new fake workspace,
// so that test don't interfere with each other.
if cloudEnv == "" {
tokenSuffix := strings.ReplaceAll(uuid.NewString(), "-", "")
token := "dbapi" + tokenSuffix
cmd.Env = append(cmd.Env, "DATABRICKS_TOKEN="+token)
repls.Set(token, "[DATABRICKS_TOKEN]")
// Write combined output to a file
out, err := os.Create(filepath.Join(tmpDir, "output.txt"))
require.NoError(t, err)
@ -320,7 +337,7 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont
for _, req := range server.Requests {
reqJson, err := json.MarshalIndent(req, "", " ")
require.NoError(t, err)
require.NoErrorf(t, err, "Failed to indent: %#v", req)
reqJsonWithRepls := repls.Replace(string(reqJson))
_, err = f.WriteString(reqJsonWithRepls + "\n")
@ -13,13 +13,13 @@
=== Inside the bundle, profile flag not matching bundle host. Badness: should use profile from flag instead and not fail
>>> errcode [CLI] current-user me -p profile_name
Error: cannot resolve bundle auth configuration: config host mismatch: profile uses host https://non-existing-subdomain.databricks.com, but CLI configured to use [DATABRICKS_URL]
Error: cannot resolve bundle auth configuration: config host mismatch: profile uses host https://non-existing-subdomain.databricks.com, but CLI configured to use [DATABRICKS_TARGET]
Exit code: 1
=== Inside the bundle, target and not matching profile
>>> errcode [CLI] current-user me -t dev -p profile_name
Error: cannot resolve bundle auth configuration: config host mismatch: profile uses host https://non-existing-subdomain.databricks.com, but CLI configured to use [DATABRICKS_URL]
Error: cannot resolve bundle auth configuration: config host mismatch: profile uses host https://non-existing-subdomain.databricks.com, but CLI configured to use [DATABRICKS_TARGET]
Exit code: 1
@ -5,4 +5,8 @@ Badness = "When -p flag is used inside the bundle folder for any CLI commands, C
# This is a workaround to replace DATABRICKS_URL with DATABRICKS_HOST
@ -0,0 +1,12 @@
"headers": {
"Authorization": [
"User-Agent": [
"cli/[DEV_VERSION] databricks-sdk-go/[SDK_VERSION] go/[GO_VERSION] os/[OS] cmd/current-user_me cmd-exec-id/[UUID] auth/basic"
"method": "GET",
"path": "/api/2.0/preview/scim/v2/Me"
@ -0,0 +1,4 @@
@ -0,0 +1,8 @@
# Unset the token which is configured by default
# in acceptance tests
$CLI current-user me
@ -0,0 +1,4 @@
# "username:password" in base64 is dXNlcm5hbWU6cGFzc3dvcmQ=, expect to see this in Authorization header
Old = "dXNlcm5hbWU6cGFzc3dvcmQ="
@ -0,0 +1,34 @@
"headers": {
"User-Agent": [
"cli/[DEV_VERSION] databricks-sdk-go/[SDK_VERSION] go/[GO_VERSION] os/[OS]"
"method": "GET",
"path": "/oidc/.well-known/oauth-authorization-server"
"headers": {
"Authorization": [
"User-Agent": [
"cli/[DEV_VERSION] databricks-sdk-go/[SDK_VERSION] go/[GO_VERSION] os/[OS]"
"method": "POST",
"path": "/oidc/v1/token",
"raw_body": "grant_type=client_credentials\u0026scope=all-apis"
"headers": {
"Authorization": [
"Bearer oauth-token"
"User-Agent": [
"cli/[DEV_VERSION] databricks-sdk-go/[SDK_VERSION] go/[GO_VERSION] os/[OS] cmd/current-user_me cmd-exec-id/[UUID] auth/oauth-m2m"
"method": "GET",
"path": "/api/2.0/preview/scim/v2/Me"
@ -0,0 +1,4 @@
@ -0,0 +1,8 @@
# Unset the token which is configured by default
# in acceptance tests
export DATABRICKS_CLIENT_ID=client_id
export DATABRICKS_CLIENT_SECRET=client_secret
$CLI current-user me
@ -0,0 +1,5 @@
# "client_id:client_secret" in base64 is Y2xpZW50X2lkOmNsaWVudF9zZWNyZXQ=, expect to
# see this in Authorization header
Old = "Y2xpZW50X2lkOmNsaWVudF9zZWNyZXQ="
@ -0,0 +1,12 @@
"headers": {
"Authorization": [
"Bearer dapi1234"
"User-Agent": [
"cli/[DEV_VERSION] databricks-sdk-go/[SDK_VERSION] go/[GO_VERSION] os/[OS] cmd/current-user_me cmd-exec-id/[UUID] auth/pat"
"method": "GET",
"path": "/api/2.0/preview/scim/v2/Me"
@ -0,0 +1,4 @@
@ -0,0 +1,3 @@
export DATABRICKS_TOKEN=dapi1234
$CLI current-user me
@ -0,0 +1,20 @@
LocalOnly = true
RecordRequests = true
IncludeRequestHeaders = ["Authorization", "User-Agent"]
Old = '(linux|darwin|windows)'
New = '[OS]'
Old = " upstream/[A-Za-z0-9.-]+"
New = ""
Old = " upstream-version/[A-Za-z0-9.-]+"
New = ""
Old = " cicd/[A-Za-z0-9.-]+"
New = ""
@ -14,11 +14,7 @@ import (
func StartCmdServer(t *testing.T) *testserver.Server {
server := testserver.New(t)
// {$} is a wildcard that only matches the end of the URL. We explicitly use
// /{$} to disambiguate it from the generic handler for '/' which is used to
// identify unhandled API endpoints in the test server.
server.Handle("/{$}", func(w *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("GET", "/", func(_ *testserver.FakeWorkspace, r *http.Request) (any, int) {
q := r.URL.Query()
args := strings.Split(q.Get("args"), " ")
@ -0,0 +1,8 @@
"method": "GET",
"path": "/api/2.0/preview/scim/v2/Me"
"method": "GET",
"path": "/custom/endpoint"
@ -0,0 +1,15 @@
>>> curl -s [DATABRICKS_URL]/api/2.0/preview/scim/v2/Me
"id": "[USERID]",
"userName": "[USERNAME]"
>>> curl -sD - [DATABRICKS_URL]/custom/endpoint?query=param
HTTP/1.1 201 Created
Content-Type: application/json
Date: (redacted)
Content-Length: (redacted)
@ -0,0 +1,2 @@
trace curl -s $DATABRICKS_HOST/api/2.0/preview/scim/v2/Me
trace curl -sD - $DATABRICKS_HOST/custom/endpoint?query=param
@ -0,0 +1,18 @@
LocalOnly = true
RecordRequests = true
Pattern = "GET /custom/endpoint"
Response.Body = '''custom
Response.StatusCode = 201
Old = 'Date: .*'
New = 'Date: (redacted)'
Old = 'Content-Length: [0-9]*'
New = 'Content-Length: (redacted)'
@ -8,6 +8,7 @@ import (
@ -16,8 +17,13 @@ import (
var testUser = iam.User{
Id: "1000012345",
UserName: "tester@databricks.com",
func AddHandlers(server *testserver.Server) {
server.Handle("GET /api/2.0/policies/clusters/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("GET", "/api/2.0/policies/clusters/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return compute.ListPoliciesResponse{
Policies: []compute.Policy{
@ -32,7 +38,7 @@ func AddHandlers(server *testserver.Server) {
}, http.StatusOK
server.Handle("GET /api/2.0/instance-pools/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("GET", "/api/2.0/instance-pools/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return compute.ListInstancePools{
InstancePools: []compute.InstancePoolAndStats{
@ -43,7 +49,7 @@ func AddHandlers(server *testserver.Server) {
}, http.StatusOK
server.Handle("GET /api/2.1/clusters/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("GET", "/api/2.1/clusters/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return compute.ListClustersResponse{
Clusters: []compute.ClusterDetails{
@ -58,20 +64,17 @@ func AddHandlers(server *testserver.Server) {
}, http.StatusOK
server.Handle("GET /api/2.0/preview/scim/v2/Me", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return iam.User{
Id: "1000012345",
UserName: "tester@databricks.com",
}, http.StatusOK
server.Handle("GET", "/api/2.0/preview/scim/v2/Me", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return testUser, http.StatusOK
server.Handle("GET /api/2.0/workspace/get-status", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("GET", "/api/2.0/workspace/get-status", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
path := r.URL.Query().Get("path")
return fakeWorkspace.WorkspaceGetStatus(path)
server.Handle("POST /api/2.0/workspace/mkdirs", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("POST", "/api/2.0/workspace/mkdirs", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
request := workspace.Mkdirs{}
decoder := json.NewDecoder(r.Body)
@ -83,13 +86,13 @@ func AddHandlers(server *testserver.Server) {
return fakeWorkspace.WorkspaceMkdirs(request)
server.Handle("GET /api/2.0/workspace/export", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("GET", "/api/2.0/workspace/export", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
path := r.URL.Query().Get("path")
return fakeWorkspace.WorkspaceExport(path)
server.Handle("POST /api/2.0/workspace/delete", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("POST", "/api/2.0/workspace/delete", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
path := r.URL.Query().Get("path")
recursiveStr := r.URL.Query().Get("recursive")
var recursive bool
@ -103,8 +106,9 @@ func AddHandlers(server *testserver.Server) {
return fakeWorkspace.WorkspaceDelete(path, recursive)
server.Handle("POST /api/2.0/workspace-files/import-file/{path}", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
path := r.PathValue("path")
server.Handle("POST", "/api/2.0/workspace-files/import-file/{path:.*}", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
vars := mux.Vars(r)
path := vars["path"]
body := new(bytes.Buffer)
_, err := body.ReadFrom(r.Body)
@ -115,14 +119,15 @@ func AddHandlers(server *testserver.Server) {
return fakeWorkspace.WorkspaceFilesImportFile(path, body.Bytes())
server.Handle("GET /api/2.1/unity-catalog/current-metastore-assignment", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("GET", "/api/2.1/unity-catalog/current-metastore-assignment", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return catalog.MetastoreAssignment{
DefaultCatalogName: "main",
}, http.StatusOK
server.Handle("GET /api/2.0/permissions/directories/{objectId}", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
objectId := r.PathValue("objectId")
server.Handle("GET", "/api/2.0/permissions/directories/{objectId}", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
vars := mux.Vars(r)
objectId := vars["objectId"]
return workspace.WorkspaceObjectPermissions{
ObjectId: objectId,
@ -140,7 +145,7 @@ func AddHandlers(server *testserver.Server) {
}, http.StatusOK
server.Handle("POST /api/2.1/jobs/create", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("POST", "/api/2.1/jobs/create", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
request := jobs.CreateJob{}
decoder := json.NewDecoder(r.Body)
@ -152,15 +157,31 @@ func AddHandlers(server *testserver.Server) {
return fakeWorkspace.JobsCreate(request)
server.Handle("GET /api/2.1/jobs/get", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("GET", "/api/2.1/jobs/get", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
jobId := r.URL.Query().Get("job_id")
return fakeWorkspace.JobsGet(jobId)
server.Handle("GET /api/2.1/jobs/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
server.Handle("GET", "/api/2.1/jobs/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return fakeWorkspace.JobsList()
server.Handle("GET", "/oidc/.well-known/oauth-authorization-server", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return map[string]string{
"authorization_endpoint": server.URL + "oidc/v1/authorize",
"token_endpoint": server.URL + "/oidc/v1/token",
}, http.StatusOK
server.Handle("POST", "/oidc/v1/token", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return map[string]string{
"access_token": "oauth-token",
"expires_in": "3600",
"scope": "all-apis",
"token_type": "Bearer",
}, http.StatusOK
func internalError(err error) (any, int) {
@ -12,6 +12,7 @@ require (
github.com/databricks/databricks-sdk-go v0.57.0 // Apache 2.0
github.com/fatih/color v1.18.0 // MIT
github.com/google/uuid v1.6.0 // BSD-3-Clause
github.com/gorilla/mux v1.8.1 // BSD 3-Clause
github.com/hashicorp/go-version v1.7.0 // MPL 2.0
github.com/hashicorp/hc-install v0.9.1 // MPL 2.0
github.com/hashicorp/terraform-exec v0.22.0 // MPL 2.0
@ -97,6 +97,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.12.4 h1:9gWcmF85Wvq4ryPFvGFaOgPIs1AQX0d0bcbGw4Z96qg=
github.com/googleapis/gax-go/v2 v2.12.4/go.mod h1:KYEYLorsnIGDi/rPC8b5TdlB9kbKoFubselGIoBMCwI=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
@ -6,7 +6,9 @@ import (
@ -887,3 +889,80 @@ func TestWorkspaceFilesExtensions_ExportFormatIsPreserved(t *testing.T) {
func TestDbfsFilerForStreamingUploads(t *testing.T) {
ctx := context.Background()
f, _ := setupDbfsFiler(t)
// Set MaxDbfsPutFileSize to 1 to force streaming uploads
prevV := filer.MaxDbfsPutFileSize
filer.MaxDbfsPutFileSize = 1
t.Cleanup(func() {
filer.MaxDbfsPutFileSize = prevV
// Write a file to local disk.
tmpDir := t.TempDir()
testutil.WriteFile(t, filepath.Join(tmpDir, "foo.txt"), "foobar")
fd, err := os.Open(filepath.Join(tmpDir, "foo.txt"))
require.NoError(t, err)
defer fd.Close()
// Write a file with streaming upload
err = f.Write(ctx, "foo.txt", fd)
require.NoError(t, err)
// Assert contents
filerTest{t, f}.assertContents(ctx, "foo.txt", "foobar")
// Overwrite the file with streaming upload, and fail
err = f.Write(ctx, "foo.txt", strings.NewReader("barfoo"))
require.ErrorIs(t, err, fs.ErrExist)
// Overwrite the file with streaming upload, and succeed
err = f.Write(ctx, "foo.txt", strings.NewReader("barfoo"), filer.OverwriteIfExists)
require.NoError(t, err)
// Assert contents
filerTest{t, f}.assertContents(ctx, "foo.txt", "barfoo")
func TestDbfsFilerForPutUploads(t *testing.T) {
ctx := context.Background()
f, _ := setupDbfsFiler(t)
// Write a file to local disk.
tmpDir := t.TempDir()
testutil.WriteFile(t, filepath.Join(tmpDir, "foo.txt"), "foobar")
testutil.WriteFile(t, filepath.Join(tmpDir, "bar.txt"), "barfoo")
fdFoo, err := os.Open(filepath.Join(tmpDir, "foo.txt"))
require.NoError(t, err)
defer fdFoo.Close()
fdBar, err := os.Open(filepath.Join(tmpDir, "bar.txt"))
require.NoError(t, err)
defer fdBar.Close()
// Write a file with PUT upload
err = f.Write(ctx, "foo.txt", fdFoo)
require.NoError(t, err)
// Assert contents
filerTest{t, f}.assertContents(ctx, "foo.txt", "foobar")
// Try to overwrite the file, and fail.
err = f.Write(ctx, "foo.txt", fdBar)
require.ErrorIs(t, err, fs.ErrExist)
// Reset the file descriptor.
_, err = fdBar.Seek(0, io.SeekStart)
require.NoError(t, err)
// Overwrite the file with OverwriteIfExists flag
err = f.Write(ctx, "foo.txt", fdBar, filer.OverwriteIfExists)
require.NoError(t, err)
// Assert contents
filerTest{t, f}.assertContents(ctx, "foo.txt", "barfoo")
@ -1,11 +1,15 @@
package filer
import (
@ -14,6 +18,7 @@ import (
@ -63,33 +68,142 @@ func (info dbfsFileInfo) Sys() any {
return info.fi
// Interface to allow mocking of the Databricks API client.
type databricksClient interface {
Do(ctx context.Context, method, path string, headers map[string]string,
requestBody, responseBody any, visitors ...func(*http.Request) error) error
// DbfsClient implements the [Filer] interface for the DBFS backend.
type DbfsClient struct {
workspaceClient *databricks.WorkspaceClient
apiClient databricksClient
// File operations will be relative to this path.
root WorkspaceRootPath
func NewDbfsClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
apiClient, err := client.New(w.Config)
if err != nil {
return nil, fmt.Errorf("failed to create API client: %w", err)
return &DbfsClient{
workspaceClient: w,
apiClient: apiClient,
root: NewWorkspaceRootPath(root),
}, nil
// The PUT API for DBFS requires setting the content length header beforehand in the HTTP
// request.
func contentLength(path, overwriteField string, file *os.File) (int64, error) {
buf := &bytes.Buffer{}
writer := multipart.NewWriter(buf)
err := writer.WriteField("path", path)
if err != nil {
return 0, fmt.Errorf("failed to write field path field in multipart form: %w", err)
err = writer.WriteField("overwrite", overwriteField)
if err != nil {
return 0, fmt.Errorf("failed to write field overwrite field in multipart form: %w", err)
_, err = writer.CreateFormFile("contents", "")
if err != nil {
return 0, fmt.Errorf("failed to write contents field in multipart form: %w", err)
err = writer.Close()
if err != nil {
return 0, fmt.Errorf("failed to close multipart form writer: %w", err)
stat, err := file.Stat()
if err != nil {
return 0, fmt.Errorf("failed to stat file %s: %w", path, err)
return int64(buf.Len()) + stat.Size(), nil
func contentLengthVisitor(path, overwriteField string, file *os.File) func(*http.Request) error {
return func(r *http.Request) error {
cl, err := contentLength(path, overwriteField, file)
if err != nil {
return fmt.Errorf("failed to calculate content length: %w", err)
r.ContentLength = cl
return nil
func (w *DbfsClient) putFile(ctx context.Context, path string, overwrite bool, file *os.File) error {
overwriteField := "False"
if overwrite {
overwriteField = "True"
pr, pw := io.Pipe()
writer := multipart.NewWriter(pw)
go func() {
defer pw.Close()
err := writer.WriteField("path", path)
if err != nil {
pw.CloseWithError(fmt.Errorf("failed to write field path field in multipart form: %w", err))
err = writer.WriteField("overwrite", overwriteField)
if err != nil {
pw.CloseWithError(fmt.Errorf("failed to write field overwrite field in multipart form: %w", err))
contents, err := writer.CreateFormFile("contents", "")
if err != nil {
pw.CloseWithError(fmt.Errorf("failed to write contents field in multipart form: %w", err))
_, err = io.Copy(contents, file)
if err != nil {
pw.CloseWithError(fmt.Errorf("error while streaming file to dbfs: %w", err))
err = writer.Close()
if err != nil {
pw.CloseWithError(fmt.Errorf("failed to close multipart form writer: %w", err))
// Request bodies of Content-Type multipart/form-data are not supported by
// the Go SDK directly for DBFS. So we use the Do method directly.
err := w.apiClient.Do(ctx,
map[string]string{"Content-Type": writer.FormDataContentType()},
contentLengthVisitor(path, overwriteField, file))
var aerr *apierr.APIError
if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" {
return FileAlreadyExistsError{path}
return err
// MaxUploadLimitForPutApi is the maximum size in bytes of a file that can be uploaded
// using the /dbfs/put API. If the file is larger than this limit, the streaming
// API (/dbfs/create and /dbfs/add-block) will be used instead.
var MaxDbfsPutFileSize int64 = 2 * 1024 * 1024 * 1024
func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
absPath, err := w.root.Join(name)
if err != nil {
return err
fileMode := files.FileModeWrite
if slices.Contains(mode, OverwriteIfExists) {
fileMode |= files.FileModeOverwrite
// Issue info call before write because it automatically creates parent directories.
// For discussion: we could decide this is actually convenient, remove the call below,
@ -114,7 +228,36 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m
handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, fileMode)
localFile, ok := reader.(*os.File)
// If the source is not a local file, we'll always use the streaming API endpoint.
if !ok {
return w.streamFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), reader)
stat, err := localFile.Stat()
if err != nil {
return fmt.Errorf("failed to stat file: %w", err)
// If the source is a local file, but is too large then we'll use the streaming API endpoint.
if stat.Size() > MaxDbfsPutFileSize {
return w.streamFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile)
// Use the /dbfs/put API when the file is on the local filesystem
// and is small enough. This is the most common case when users use the
// `databricks fs cp` command.
return w.putFile(ctx, absPath, slices.Contains(mode, OverwriteIfExists), localFile)
func (w *DbfsClient) streamFile(ctx context.Context, path string, overwrite bool, reader io.Reader) error {
fileMode := files.FileModeWrite
if overwrite {
fileMode |= files.FileModeOverwrite
handle, err := w.workspaceClient.Dbfs.Open(ctx, path, fileMode)
if err != nil {
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
@ -124,7 +267,7 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m
// This API returns a 400 if the file already exists.
if aerr.StatusCode == http.StatusBadRequest {
if aerr.ErrorCode == "RESOURCE_ALREADY_EXISTS" {
return FileAlreadyExistsError{absPath}
return FileAlreadyExistsError{path}
@ -136,7 +279,6 @@ func (w *DbfsClient) Write(ctx context.Context, name string, reader io.Reader, m
if err == nil {
err = cerr
return err
@ -0,0 +1,155 @@
package filer
import (
type mockDbfsApiClient struct {
t testutil.TestingT
isCalled bool
func (m *mockDbfsApiClient) Do(ctx context.Context, method, path string,
headers map[string]string, request, response any,
visitors ...func(*http.Request) error,
) error {
m.isCalled = true
require.Equal(m.t, "POST", method)
require.Equal(m.t, "/api/2.0/dbfs/put", path)
require.Contains(m.t, headers["Content-Type"], "multipart/form-data; boundary=")
contents, err := io.ReadAll(request.(io.Reader))
require.NoError(m.t, err)
require.Contains(m.t, string(contents), "hello world")
return nil
func TestDbfsClientForSmallFiles(t *testing.T) {
// write file to local disk
tmp := t.TempDir()
localPath := filepath.Join(tmp, "hello.txt")
testutil.WriteFile(t, localPath, "hello world")
// setup DBFS client with mocks
m := mocks.NewMockWorkspaceClient(t)
mockApiClient := &mockDbfsApiClient{t: t}
dbfsClient := DbfsClient{
apiClient: mockApiClient,
workspaceClient: m.WorkspaceClient,
root: NewWorkspaceRootPath("dbfs:/a/b/c"),
m.GetMockDbfsAPI().EXPECT().GetStatusByPath(mock.Anything, "dbfs:/a/b/c").Return(nil, nil)
// write file to DBFS
fd, err := os.Open(localPath)
require.NoError(t, err)
defer fd.Close()
err = dbfsClient.Write(context.Background(), "hello.txt", fd)
require.NoError(t, err)
// verify mock API client is called
require.True(t, mockApiClient.isCalled)
type mockDbfsHandle struct {
builder strings.Builder
func (h *mockDbfsHandle) Read(data []byte) (n int, err error) { return 0, nil }
func (h *mockDbfsHandle) Close() error { return nil }
func (h *mockDbfsHandle) WriteTo(w io.Writer) (n int64, err error) { return 0, nil }
func (h *mockDbfsHandle) ReadFrom(r io.Reader) (n int64, err error) {
b, err := io.ReadAll(r)
if err != nil {
return 0, err
num, err := h.builder.Write(b)
return int64(num), err
func (h *mockDbfsHandle) Write(data []byte) (n int, err error) {
return h.builder.Write(data)
func TestDbfsClientForLargerFiles(t *testing.T) {
// write file to local disk
tmp := t.TempDir()
localPath := filepath.Join(tmp, "hello.txt")
testutil.WriteFile(t, localPath, "hello world")
// Modify the max file size to 1 byte to simulate
// a large file that needs to be uploaded in chunks.
oldV := MaxDbfsPutFileSize
MaxDbfsPutFileSize = 1
t.Cleanup(func() {
MaxDbfsPutFileSize = oldV
// setup DBFS client with mocks
m := mocks.NewMockWorkspaceClient(t)
mockApiClient := &mockDbfsApiClient{t: t}
dbfsClient := DbfsClient{
apiClient: mockApiClient,
workspaceClient: m.WorkspaceClient,
root: NewWorkspaceRootPath("dbfs:/a/b/c"),
h := &mockDbfsHandle{}
m.GetMockDbfsAPI().EXPECT().GetStatusByPath(mock.Anything, "dbfs:/a/b/c").Return(nil, nil)
m.GetMockDbfsAPI().EXPECT().Open(mock.Anything, "dbfs:/a/b/c/hello.txt", files.FileModeWrite).Return(h, nil)
// write file to DBFS
fd, err := os.Open(localPath)
require.NoError(t, err)
defer fd.Close()
err = dbfsClient.Write(context.Background(), "hello.txt", fd)
require.NoError(t, err)
// verify mock API client is NOT called
require.False(t, mockApiClient.isCalled)
// verify the file content was written to the mock handle
assert.Equal(t, "hello world", h.builder.String())
func TestDbfsClientForNonLocalFiles(t *testing.T) {
// setup DBFS client with mocks
m := mocks.NewMockWorkspaceClient(t)
mockApiClient := &mockDbfsApiClient{t: t}
dbfsClient := DbfsClient{
apiClient: mockApiClient,
workspaceClient: m.WorkspaceClient,
root: NewWorkspaceRootPath("dbfs:/a/b/c"),
h := &mockDbfsHandle{}
m.GetMockDbfsAPI().EXPECT().GetStatusByPath(mock.Anything, "dbfs:/a/b/c").Return(nil, nil)
m.GetMockDbfsAPI().EXPECT().Open(mock.Anything, "dbfs:/a/b/c/hello.txt", files.FileModeWrite).Return(h, nil)
// write file to DBFS
err := dbfsClient.Write(context.Background(), "hello.txt", strings.NewReader("hello world"))
require.NoError(t, err)
// verify mock API client is NOT called
require.False(t, mockApiClient.isCalled)
// verify the file content was written to the mock handle
assert.Equal(t, "hello world", h.builder.String())
@ -9,6 +9,8 @@ import (
@ -17,7 +19,7 @@ import (
type Server struct {
Mux *http.ServeMux
Router *mux.Router
t testutil.TestingT
@ -34,26 +36,25 @@ type Request struct {
Headers http.Header `json:"headers,omitempty"`
Method string `json:"method"`
Path string `json:"path"`
Body any `json:"body"`
Body any `json:"body,omitempty"`
RawBody string `json:"raw_body,omitempty"`
func New(t testutil.TestingT) *Server {
mux := http.NewServeMux()
server := httptest.NewServer(mux)
router := mux.NewRouter()
server := httptest.NewServer(router)
s := &Server{
Server: server,
Mux: mux,
Router: router,
t: t,
mu: &sync.Mutex{},
fakeWorkspaces: map[string]*FakeWorkspace{},
// The server resolves conflicting handlers by using the one with higher
// specificity. This handler is the least specific, so it will be used as a
// fallback when no other handlers match.
s.Handle("/", func(fakeWorkspace *FakeWorkspace, r *http.Request) (any, int) {
// Set up the not found handler as fallback
router.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pattern := r.Method + " " + r.URL.Path
@ -74,9 +75,22 @@ Response.StatusCode = <response status-code here>
`, pattern, pattern)
return apierr.APIError{
w.Header().Set("Content-Type", "application/json")
resp := apierr.APIError{
Message: "No stub found for pattern: " + pattern,
}, http.StatusNotImplemented
respBytes, err := json.Marshal(resp)
if err != nil {
t.Errorf("JSON encoding error: %s", err)
respBytes = []byte("{\"message\": \"JSON encoding error\"}")
if _, err := w.Write(respBytes); err != nil {
t.Errorf("Response write error: %s", err)
return s
@ -84,8 +98,8 @@ Response.StatusCode = <response status-code here>
type HandlerFunc func(fakeWorkspace *FakeWorkspace, req *http.Request) (resp any, statusCode int)
func (s *Server) Handle(pattern string, handler HandlerFunc) {
s.Mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
func (s *Server) Handle(method, path string, handler HandlerFunc) {
s.Router.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
// For simplicity we process requests sequentially. It's fast enough because
// we don't do any IO except reading and writing request/response bodies.
@ -119,13 +133,19 @@ func (s *Server) Handle(pattern string, handler HandlerFunc) {
s.Requests = append(s.Requests, Request{
req := Request{
Headers: headers,
Method: r.Method,
Path: r.URL.Path,
Body: json.RawMessage(body),
if json.Valid(body) {
req.Body = json.RawMessage(body)
} else {
req.RawBody = string(body)
s.Requests = append(s.Requests, req)
w.Header().Set("Content-Type", "application/json")
@ -149,7 +169,7 @@ func (s *Server) Handle(pattern string, handler HandlerFunc) {
http.Error(w, err.Error(), http.StatusInternalServerError)
func getToken(r *http.Request) string {
Reference in New Issue