Extend testserver for deployment (#2299)

## Changes
Extend testserver for bundle deployment:

- Allocate a new workspace per test case to isolate test cases from each
other
- Support jobs get/list/create
- Support creation and listing of workspace files

## Tests
Using existing acceptance tests
This commit is contained in:
Gleb Kanterov 2025-02-07 11:26:20 +01:00 committed by GitHub
parent 27eb0c4072
commit 75127fe42e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 332 additions and 40 deletions

View File

@ -19,6 +19,8 @@ import (
"time" "time"
"unicode/utf8" "unicode/utf8"
"github.com/google/uuid"
"github.com/databricks/cli/internal/testutil" "github.com/databricks/cli/internal/testutil"
"github.com/databricks/cli/libs/env" "github.com/databricks/cli/libs/env"
"github.com/databricks/cli/libs/testdiff" "github.com/databricks/cli/libs/testdiff"
@ -123,7 +125,6 @@ func testAccept(t *testing.T, InprocessMode bool, singleTest string) int {
AddHandlers(defaultServer) AddHandlers(defaultServer)
// Redirect API access to local server: // Redirect API access to local server:
t.Setenv("DATABRICKS_HOST", defaultServer.URL) t.Setenv("DATABRICKS_HOST", defaultServer.URL)
t.Setenv("DATABRICKS_TOKEN", "dapi1234")
homeDir := t.TempDir() homeDir := t.TempDir()
// Do not read user's ~/.databrickscfg // Do not read user's ~/.databrickscfg
@ -146,7 +147,15 @@ func testAccept(t *testing.T, InprocessMode bool, singleTest string) int {
// do it last so that full paths match first: // do it last so that full paths match first:
repls.SetPath(buildDir, "[BUILD_DIR]") repls.SetPath(buildDir, "[BUILD_DIR]")
workspaceClient, err := databricks.NewWorkspaceClient() var config databricks.Config
if cloudEnv == "" {
// use fake token for local tests
config = databricks.Config{Token: "dbapi1234"}
} else {
// non-local tests rely on environment variables
config = databricks.Config{}
}
workspaceClient, err := databricks.NewWorkspaceClient(&config)
require.NoError(t, err) require.NoError(t, err)
user, err := workspaceClient.CurrentUser.Me(ctx) user, err := workspaceClient.CurrentUser.Me(ctx)
@ -264,7 +273,7 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont
for _, stub := range config.Server { for _, stub := range config.Server {
require.NotEmpty(t, stub.Pattern) require.NotEmpty(t, stub.Pattern)
server.Handle(stub.Pattern, func(req *http.Request) (any, int) { server.Handle(stub.Pattern, func(fakeWorkspace *testserver.FakeWorkspace, req *http.Request) (any, int) {
statusCode := http.StatusOK statusCode := http.StatusOK
if stub.Response.StatusCode != 0 { if stub.Response.StatusCode != 0 {
statusCode = stub.Response.StatusCode statusCode = stub.Response.StatusCode
@ -285,6 +294,15 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont
cmd.Env = append(cmd.Env, "GOCOVERDIR="+coverDir) cmd.Env = append(cmd.Env, "GOCOVERDIR="+coverDir)
} }
// Each local test should use a new token that will result into a new fake workspace,
// so that test don't interfere with each other.
if cloudEnv == "" {
tokenSuffix := strings.ReplaceAll(uuid.NewString(), "-", "")
token := "dbapi" + tokenSuffix
cmd.Env = append(cmd.Env, "DATABRICKS_TOKEN="+token)
repls.Set(token, "[DATABRICKS_TOKEN]")
}
// Write combined output to a file // Write combined output to a file
out, err := os.Create(filepath.Join(tmpDir, "output.txt")) out, err := os.Create(filepath.Join(tmpDir, "output.txt"))
require.NoError(t, err) require.NoError(t, err)
@ -303,8 +321,8 @@ func runTest(t *testing.T, dir, coverDir string, repls testdiff.ReplacementsCont
reqJson, err := json.Marshal(req) reqJson, err := json.Marshal(req)
require.NoError(t, err) require.NoError(t, err)
line := fmt.Sprintf("%s\n", reqJson) reqJsonWithRepls := repls.Replace(string(reqJson))
_, err = f.WriteString(line) _, err = f.WriteString(reqJsonWithRepls + "\n")
require.NoError(t, err) require.NoError(t, err)
} }

View File

@ -42,11 +42,9 @@ from myscript.py 0 postbuild: hello stderr!
Executing 'predeploy' script Executing 'predeploy' script
from myscript.py 0 predeploy: hello stdout! from myscript.py 0 predeploy: hello stdout!
from myscript.py 0 predeploy: hello stderr! from myscript.py 0 predeploy: hello stderr!
Error: unable to deploy to /Workspace/Users/[USERNAME]/.bundle/scripts/default/state as [USERNAME]. Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/scripts/default/files...
Please make sure the current user or one of their groups is listed under the permissions of this bundle. Deploying resources...
For assistance, contact the owners of this project. Deployment complete!
They may need to redeploy the bundle to apply the new permissions. Executing 'postdeploy' script
Please refer to https://docs.databricks.com/dev-tools/bundles/permissions.html for more on managing permissions. from myscript.py 0 postdeploy: hello stdout!
from myscript.py 0 postdeploy: hello stderr!
Exit code: 1

View File

@ -15,7 +15,7 @@ import (
func StartCmdServer(t *testing.T) *testserver.Server { func StartCmdServer(t *testing.T) *testserver.Server {
server := testserver.New(t) server := testserver.New(t)
server.Handle("/", func(r *http.Request) (any, int) { server.Handle("/", func(w *testserver.FakeWorkspace, r *http.Request) (any, int) {
q := r.URL.Query() q := r.URL.Query()
args := strings.Split(q.Get("args"), " ") args := strings.Split(q.Get("args"), " ")

View File

@ -1,17 +1,23 @@
package acceptance_test package acceptance_test
import ( import (
"bytes"
"encoding/json"
"fmt"
"net/http" "net/http"
"github.com/databricks/cli/libs/testserver"
"github.com/databricks/databricks-sdk-go/service/catalog" "github.com/databricks/databricks-sdk-go/service/catalog"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/iam" "github.com/databricks/databricks-sdk-go/service/iam"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/cli/libs/testserver"
"github.com/databricks/databricks-sdk-go/service/workspace" "github.com/databricks/databricks-sdk-go/service/workspace"
) )
func AddHandlers(server *testserver.Server) { func AddHandlers(server *testserver.Server) {
server.Handle("GET /api/2.0/policies/clusters/list", func(r *http.Request) (any, int) { server.Handle("GET /api/2.0/policies/clusters/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return compute.ListPoliciesResponse{ return compute.ListPoliciesResponse{
Policies: []compute.Policy{ Policies: []compute.Policy{
{ {
@ -26,7 +32,7 @@ func AddHandlers(server *testserver.Server) {
}, http.StatusOK }, http.StatusOK
}) })
server.Handle("GET /api/2.0/instance-pools/list", func(r *http.Request) (any, int) { server.Handle("GET /api/2.0/instance-pools/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return compute.ListInstancePools{ return compute.ListInstancePools{
InstancePools: []compute.InstancePoolAndStats{ InstancePools: []compute.InstancePoolAndStats{
{ {
@ -37,7 +43,7 @@ func AddHandlers(server *testserver.Server) {
}, http.StatusOK }, http.StatusOK
}) })
server.Handle("GET /api/2.1/clusters/list", func(r *http.Request) (any, int) { server.Handle("GET /api/2.1/clusters/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return compute.ListClustersResponse{ return compute.ListClustersResponse{
Clusters: []compute.ClusterDetails{ Clusters: []compute.ClusterDetails{
{ {
@ -52,31 +58,74 @@ func AddHandlers(server *testserver.Server) {
}, http.StatusOK }, http.StatusOK
}) })
server.Handle("GET /api/2.0/preview/scim/v2/Me", func(r *http.Request) (any, int) { server.Handle("GET /api/2.0/preview/scim/v2/Me", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return iam.User{ return iam.User{
Id: "1000012345", Id: "1000012345",
UserName: "tester@databricks.com", UserName: "tester@databricks.com",
}, http.StatusOK }, http.StatusOK
}) })
server.Handle("GET /api/2.0/workspace/get-status", func(r *http.Request) (any, int) { server.Handle("GET /api/2.0/workspace/get-status", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return workspace.ObjectInfo{ path := r.URL.Query().Get("path")
ObjectId: 1001,
ObjectType: "DIRECTORY", return fakeWorkspace.WorkspaceGetStatus(path)
Path: "",
ResourceId: "1001",
}, http.StatusOK
}) })
server.Handle("GET /api/2.1/unity-catalog/current-metastore-assignment", func(r *http.Request) (any, int) { server.Handle("POST /api/2.0/workspace/mkdirs", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
request := workspace.Mkdirs{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&request)
if err != nil {
return internalError(err)
}
return fakeWorkspace.WorkspaceMkdirs(request)
})
server.Handle("GET /api/2.0/workspace/export", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
path := r.URL.Query().Get("path")
return fakeWorkspace.WorkspaceExport(path)
})
server.Handle("POST /api/2.0/workspace/delete", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
path := r.URL.Query().Get("path")
recursiveStr := r.URL.Query().Get("recursive")
var recursive bool
if recursiveStr == "true" {
recursive = true
} else {
recursive = false
}
return fakeWorkspace.WorkspaceDelete(path, recursive)
})
server.Handle("POST /api/2.0/workspace-files/import-file/{path}", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
path := r.PathValue("path")
body := new(bytes.Buffer)
_, err := body.ReadFrom(r.Body)
if err != nil {
return internalError(err)
}
return fakeWorkspace.WorkspaceFilesImportFile(path, body.Bytes())
})
server.Handle("GET /api/2.1/unity-catalog/current-metastore-assignment", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return catalog.MetastoreAssignment{ return catalog.MetastoreAssignment{
DefaultCatalogName: "main", DefaultCatalogName: "main",
}, http.StatusOK }, http.StatusOK
}) })
server.Handle("GET /api/2.0/permissions/directories/1001", func(r *http.Request) (any, int) { server.Handle("GET /api/2.0/permissions/directories/{objectId}", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
objectId := r.PathValue("objectId")
return workspace.WorkspaceObjectPermissions{ return workspace.WorkspaceObjectPermissions{
ObjectId: "1001", ObjectId: objectId,
ObjectType: "DIRECTORY", ObjectType: "DIRECTORY",
AccessControlList: []workspace.WorkspaceObjectAccessControlResponse{ AccessControlList: []workspace.WorkspaceObjectAccessControlResponse{
{ {
@ -91,7 +140,29 @@ func AddHandlers(server *testserver.Server) {
}, http.StatusOK }, http.StatusOK
}) })
server.Handle("POST /api/2.0/workspace/mkdirs", func(r *http.Request) (any, int) { server.Handle("POST /api/2.1/jobs/create", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return "{}", http.StatusOK request := jobs.CreateJob{}
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&request)
if err != nil {
return internalError(err)
}
return fakeWorkspace.JobsCreate(request)
})
server.Handle("GET /api/2.1/jobs/get", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
jobId := r.URL.Query().Get("job_id")
return fakeWorkspace.JobsGet(jobId)
})
server.Handle("GET /api/2.1/jobs/list", func(fakeWorkspace *testserver.FakeWorkspace, r *http.Request) (any, int) {
return fakeWorkspace.JobsList()
}) })
} }
func internalError(err error) (any, int) {
return fmt.Errorf("internal error: %w", err), http.StatusInternalServerError
}

View File

@ -1 +1 @@
{"headers":{"Authorization":"Bearer dapi1234","User-Agent":"cli/[DEV_VERSION] databricks-sdk-go/[SDK_VERSION] go/[GO_VERSION] os/[OS] cmd/jobs_create cmd-exec-id/[UUID] auth/pat"},"method":"POST","path":"/api/2.1/jobs/create","body":{"name":"abc"}} {"headers":{"Authorization":"Bearer [DATABRICKS_TOKEN]","User-Agent":"cli/[DEV_VERSION] databricks-sdk-go/[SDK_VERSION] go/[GO_VERSION] os/[OS] cmd/jobs_create cmd-exec-id/[UUID] auth/pat"},"method":"POST","path":"/api/2.1/jobs/create","body":{"name":"abc"}}

View File

@ -0,0 +1,169 @@
package testserver
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/workspace"
)
// FakeWorkspace holds a state of a workspace for acceptance tests.
type FakeWorkspace struct {
directories map[string]bool
files map[string][]byte
// normally, ids are not sequential, but we make them sequential for deterministic diff
nextJobId int64
jobs map[int64]jobs.Job
}
func NewFakeWorkspace() *FakeWorkspace {
return &FakeWorkspace{
directories: map[string]bool{
"/Workspace": true,
},
files: map[string][]byte{},
jobs: map[int64]jobs.Job{},
nextJobId: 1,
}
}
func (s *FakeWorkspace) WorkspaceGetStatus(path string) (workspace.ObjectInfo, int) {
if s.directories[path] {
return workspace.ObjectInfo{
ObjectType: "DIRECTORY",
Path: path,
}, http.StatusOK
} else if _, ok := s.files[path]; ok {
return workspace.ObjectInfo{
ObjectType: "FILE",
Path: path,
Language: "SCALA",
}, http.StatusOK
} else {
return workspace.ObjectInfo{}, http.StatusNotFound
}
}
func (s *FakeWorkspace) WorkspaceMkdirs(request workspace.Mkdirs) (string, int) {
s.directories[request.Path] = true
return "{}", http.StatusOK
}
func (s *FakeWorkspace) WorkspaceExport(path string) ([]byte, int) {
file := s.files[path]
if file == nil {
return nil, http.StatusNotFound
}
return file, http.StatusOK
}
func (s *FakeWorkspace) WorkspaceDelete(path string, recursive bool) (string, int) {
if !recursive {
s.files[path] = nil
} else {
for key := range s.files {
if strings.HasPrefix(key, path) {
s.files[key] = nil
}
}
}
return "{}", http.StatusOK
}
func (s *FakeWorkspace) WorkspaceFilesImportFile(path string, body []byte) (any, int) {
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
s.files[path] = body
return "{}", http.StatusOK
}
func (s *FakeWorkspace) JobsCreate(request jobs.CreateJob) (any, int) {
jobId := s.nextJobId
s.nextJobId++
jobSettings := jobs.JobSettings{}
err := jsonConvert(request, &jobSettings)
if err != nil {
return internalError(err)
}
s.jobs[jobId] = jobs.Job{
JobId: jobId,
Settings: &jobSettings,
}
return jobs.CreateResponse{JobId: jobId}, http.StatusOK
}
func (s *FakeWorkspace) JobsGet(jobId string) (any, int) {
id := jobId
jobIdInt, err := strconv.ParseInt(id, 10, 64)
if err != nil {
return internalError(fmt.Errorf("failed to parse job id: %s", err))
}
job, ok := s.jobs[jobIdInt]
if !ok {
return jobs.Job{}, http.StatusNotFound
}
return job, http.StatusOK
}
func (s *FakeWorkspace) JobsList() (any, int) {
list := make([]jobs.BaseJob, 0, len(s.jobs))
for _, job := range s.jobs {
baseJob := jobs.BaseJob{}
err := jsonConvert(job, &baseJob)
if err != nil {
return internalError(fmt.Errorf("failed to convert job to base job: %w", err))
}
list = append(list, baseJob)
}
// sort to have less non-determinism in tests
sort.Slice(list, func(i, j int) bool {
return list[i].JobId < list[j].JobId
})
return jobs.ListJobsResponse{
Jobs: list,
}, http.StatusOK
}
// jsonConvert saves input to a value pointed by output
func jsonConvert(input, output any) error {
writer := new(bytes.Buffer)
encoder := json.NewEncoder(writer)
err := encoder.Encode(input)
if err != nil {
return fmt.Errorf("failed to encode: %w", err)
}
decoder := json.NewDecoder(writer)
err = decoder.Decode(output)
if err != nil {
return fmt.Errorf("failed to decode: %w", err)
}
return nil
}
func internalError(err error) (string, int) {
return fmt.Sprintf("internal error: %s", err), http.StatusInternalServerError
}

View File

@ -6,6 +6,8 @@ import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"slices" "slices"
"strings"
"sync"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -18,6 +20,9 @@ type Server struct {
t testutil.TestingT t testutil.TestingT
fakeWorkspaces map[string]*FakeWorkspace
mu *sync.Mutex
RecordRequests bool RecordRequests bool
IncludeRequestHeaders []string IncludeRequestHeaders []string
@ -37,17 +42,36 @@ func New(t testutil.TestingT) *Server {
t.Cleanup(server.Close) t.Cleanup(server.Close)
return &Server{ return &Server{
Server: server, Server: server,
Mux: mux, Mux: mux,
t: t, t: t,
mu: &sync.Mutex{},
fakeWorkspaces: map[string]*FakeWorkspace{},
} }
} }
type HandlerFunc func(req *http.Request) (resp any, statusCode int) type HandlerFunc func(fakeWorkspace *FakeWorkspace, req *http.Request) (resp any, statusCode int)
func (s *Server) Handle(pattern string, handler HandlerFunc) { func (s *Server) Handle(pattern string, handler HandlerFunc) {
s.Mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) { s.Mux.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
resp, statusCode := handler(r) // For simplicity we process requests sequentially. It's fast enough because
// we don't do any IO except reading and writing request/response bodies.
s.mu.Lock()
defer s.mu.Unlock()
// Each test uses unique DATABRICKS_TOKEN, we simulate each token having
// it's own fake fakeWorkspace to avoid interference between tests.
var fakeWorkspace *FakeWorkspace = nil
token := getToken(r)
if token != "" {
if _, ok := s.fakeWorkspaces[token]; !ok {
s.fakeWorkspaces[token] = NewFakeWorkspace()
}
fakeWorkspace = s.fakeWorkspaces[token]
}
resp, statusCode := handler(fakeWorkspace, r)
if s.RecordRequests { if s.RecordRequests {
body, err := io.ReadAll(r.Body) body, err := io.ReadAll(r.Body)
@ -75,9 +99,10 @@ func (s *Server) Handle(pattern string, handler HandlerFunc) {
var respBytes []byte var respBytes []byte
var err error var err error
respString, ok := resp.(string) if respString, ok := resp.(string); ok {
if ok {
respBytes = []byte(respString) respBytes = []byte(respString)
} else if respBytes0, ok := resp.([]byte); ok {
respBytes = respBytes0
} else { } else {
respBytes, err = json.MarshalIndent(resp, "", " ") respBytes, err = json.MarshalIndent(resp, "", " ")
if err != nil { if err != nil {
@ -92,3 +117,14 @@ func (s *Server) Handle(pattern string, handler HandlerFunc) {
} }
}) })
} }
func getToken(r *http.Request) string {
header := r.Header.Get("Authorization")
prefix := "Bearer "
if !strings.HasPrefix(header, prefix) {
return ""
}
return header[len(prefix):]
}