Implement readahead cache for Workspace API calls (#1582)

## Changes

The reason this readahead cache exists is that we frequently need to
recursively find all files in the bundle root directory, especially for
sync include and exclude processing. By caching the response for every
file/directory and frontloading the latency cost of these calls, we
significantly improve performance and eliminate redundant operations.

## Tests

* [ ] Working on unit tests
This commit is contained in:
Pieter Noordhuis 2024-07-18 11:45:10 +02:00 committed by GitHub
parent c6c2692368
commit af0114a5a6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 434 additions and 1 deletions

View File

@ -0,0 +1,428 @@
package filer
import (
"context"
"fmt"
"io"
"io/fs"
"path"
"slices"
"sync"
"time"
"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/service/workspace"
)
// This readahead cache is designed to optimize file system operations by caching the results of
// directory reads (ReadDir) and file/directory metadata reads (Stat). This cache aims to eliminate
// redundant operations and improve performance by storing the results of these operations and
// reusing them when possible. Additionally, the cache performs readahead on ReadDir calls,
// proactively caching information about files and subdirectories to speed up future access.
//
// The cache maintains two primary maps: one for ReadDir results and another for Stat results.
// When a directory read or a stat operation is requested, the cache first checks if the result
// is already available. If it is, the cached result is returned immediately. If not, the
// operation is queued for execution, and the result is stored in the cache once the operation
// completes. In cases where the result is not immediately available, the caller may need to wait
// for the cache entry to be populated. However, because the queue is processed in order by a
// fixed number of worker goroutines, we are guaranteed that the required cache entry will be
// populated and available once the queue processes the corresponding task.
//
// The cache uses a worker pool to process the queued operations concurrently. This is
// implemented using a fixed number of worker goroutines that continually pull tasks from a
// queue and execute them. The queue itself is logically unbounded in the sense that it needs to
// accommodate all the new tasks that may be generated dynamically during the execution of ReadDir
// calls. Specifically, a single ReadDir call can add an unknown number of new Stat and ReadDir
// tasks to the queue because each directory entry may represent a file or subdirectory that
// requires further processing.
//
// For practical reasons, we are not using an unbounded queue but a channel with a maximum size
// of 10,000. This helps prevent excessive memory usage and ensures that the system remains
// responsive under load. If we encounter real examples of subtrees with more than 10,000
// elements, we can consider addressing this limitation in the future. For now, this approach
// balances the need for readahead efficiency with practical constraints.
//
// It is crucial to note that each ReadDir and Stat call is executed only once. The result of a
// Stat call can be served from the cache if the information was already returned by an earlier
// ReadDir call. This helps to avoid redundant operations and ensures that the system remains
// efficient even under high load.
const (
kMaxQueueSize = 10_000
// Number of worker goroutines to process the queue.
// These workers share the same HTTP client and therefore the same rate limiter.
// If this number is increased, the rate limiter should be modified as well.
kNumCacheWorkers = 1
)
// queueFullError is returned when the queue is at capacity.
type queueFullError struct {
name string
}
// Error returns the error message.
func (e queueFullError) Error() string {
return fmt.Sprintf("queue is at capacity (%d); cannot enqueue work for %q", kMaxQueueSize, e.name)
}
// Common type for all cacheable calls.
type cacheEntry struct {
// Channel to signal that the operation has completed.
done chan struct{}
// The (cleaned) name of the file or directory being operated on.
name string
// Return values of the operation.
err error
}
// String returns the path of the file or directory being operated on.
func (e *cacheEntry) String() string {
return e.name
}
// Mark this entry as errored.
func (e *cacheEntry) markError(err error) {
e.err = err
close(e.done)
}
// readDirEntry is the cache entry for a [ReadDir] call.
type readDirEntry struct {
cacheEntry
// Return values of a [ReadDir] call.
entries []fs.DirEntry
}
// Create a new readDirEntry.
func newReadDirEntry(name string) *readDirEntry {
return &readDirEntry{cacheEntry: cacheEntry{done: make(chan struct{}), name: name}}
}
// Execute the operation and signal completion.
func (e *readDirEntry) execute(ctx context.Context, c *cache) {
t1 := time.Now()
e.entries, e.err = c.f.ReadDir(ctx, e.name)
t2 := time.Now()
log.Tracef(ctx, "readdir for %s took %f", e.name, t2.Sub(t1).Seconds())
// Finalize the read call by adding all directory entries to the stat cache.
c.completeReadDir(e.name, e.entries)
// Signal that the operation has completed.
// The return value can now be used by routines waiting on it.
close(e.done)
}
// Wait for the operation to complete and return the result.
func (e *readDirEntry) wait(ctx context.Context) ([]fs.DirEntry, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.done:
// Note: return a copy of the slice to prevent the caller from modifying the cache.
// The underlying elements are values (see [wsfsDirEntry]) so a shallow copy is sufficient.
return slices.Clone(e.entries), e.err
}
}
// statEntry is the cache entry for a [Stat] call.
type statEntry struct {
cacheEntry
// Return values of a [Stat] call.
info fs.FileInfo
}
// Create a new stat entry.
func newStatEntry(name string) *statEntry {
return &statEntry{cacheEntry: cacheEntry{done: make(chan struct{}), name: name}}
}
// Execute the operation and signal completion.
func (e *statEntry) execute(ctx context.Context, c *cache) {
t1 := time.Now()
e.info, e.err = c.f.Stat(ctx, e.name)
t2 := time.Now()
log.Tracef(ctx, "stat for %s took %f", e.name, t2.Sub(t1).Seconds())
// Signal that the operation has completed.
// The return value can now be used by routines waiting on it.
close(e.done)
}
// Wait for the operation to complete and return the result.
func (e *statEntry) wait(ctx context.Context) (fs.FileInfo, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.done:
return e.info, e.err
}
}
// Mark the stat entry as done.
func (e *statEntry) markDone(info fs.FileInfo, err error) {
e.info = info
e.err = err
close(e.done)
}
// executable is the interface all cacheable calls must implement.
type executable interface {
fmt.Stringer
execute(ctx context.Context, c *cache)
}
// cache stores all entries for cacheable Workspace File System calls.
// We care about caching only [ReadDir] and [Stat] calls.
type cache struct {
f Filer
m sync.Mutex
readDir map[string]*readDirEntry
stat map[string]*statEntry
// Queue of operations to execute.
queue chan executable
// For tracking the number of active goroutines.
wg sync.WaitGroup
}
func newWorkspaceFilesReadaheadCache(f Filer) *cache {
c := &cache{
f: f,
readDir: make(map[string]*readDirEntry),
stat: make(map[string]*statEntry),
queue: make(chan executable, kMaxQueueSize),
}
ctx := context.Background()
for range kNumCacheWorkers {
c.wg.Add(1)
go c.work(ctx)
}
return c
}
// work until the queue is closed.
func (c *cache) work(ctx context.Context) {
defer c.wg.Done()
for e := range c.queue {
e.execute(ctx, c)
}
}
// enqueue adds an operation to the queue.
// If the context is canceled, an error is returned.
// If the queue is full, an error is returned.
//
// Its caller is holding the lock so it cannot block.
func (c *cache) enqueue(ctx context.Context, e executable) error {
select {
case <-ctx.Done():
return ctx.Err()
case c.queue <- e:
return nil
default:
return queueFullError{e.String()}
}
}
func (c *cache) completeReadDirForDir(name string, dirEntry fs.DirEntry) {
// Add to the stat cache if not already present.
if _, ok := c.stat[name]; !ok {
e := newStatEntry(name)
e.markDone(dirEntry.Info())
c.stat[name] = e
}
// Queue a [ReadDir] call for the directory if not already present.
if _, ok := c.readDir[name]; !ok {
// Create a new cache entry and queue the operation.
e := newReadDirEntry(name)
err := c.enqueue(context.Background(), e)
if err != nil {
e.markError(err)
}
// Add the entry to the cache, even if has an error.
c.readDir[name] = e
}
}
func (c *cache) completeReadDirForFile(name string, dirEntry fs.DirEntry) {
// Skip if this entry is already in the cache.
if _, ok := c.stat[name]; ok {
return
}
// Create a new cache entry.
e := newStatEntry(name)
// Depending on the object type, we either have to perform a real
// stat call, or we can use the [fs.DirEntry] info directly.
switch dirEntry.(wsfsDirEntry).ObjectType {
case workspace.ObjectTypeNotebook:
// Note: once the list API returns `repos_export_format` we can avoid this additional stat call.
// This is the only (?) case where this implementation is tied to the workspace filer.
// Queue a [Stat] call for the file.
err := c.enqueue(context.Background(), e)
if err != nil {
e.markError(err)
}
default:
// Use the [fs.DirEntry] info directly.
e.markDone(dirEntry.Info())
}
// Add the entry to the cache, even if has an error.
c.stat[name] = e
}
func (c *cache) completeReadDir(dir string, entries []fs.DirEntry) {
c.m.Lock()
defer c.m.Unlock()
for _, e := range entries {
name := path.Join(dir, e.Name())
if e.IsDir() {
c.completeReadDirForDir(name, e)
} else {
c.completeReadDirForFile(name, e)
}
}
}
// Cleanup closes the queue and waits for all goroutines to exit.
func (c *cache) Cleanup() {
close(c.queue)
c.wg.Wait()
}
// Write passes through to the underlying Filer.
func (c *cache) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
return c.f.Write(ctx, name, reader, mode...)
}
// Read passes through to the underlying Filer.
func (c *cache) Read(ctx context.Context, name string) (io.ReadCloser, error) {
return c.f.Read(ctx, name)
}
// Delete passes through to the underlying Filer.
func (c *cache) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
return c.f.Delete(ctx, name, mode...)
}
// Mkdir passes through to the underlying Filer.
func (c *cache) Mkdir(ctx context.Context, name string) error {
return c.f.Mkdir(ctx, name)
}
// ReadDir returns the entries in a directory.
// If the directory is already in the cache, the cached value is returned.
func (c *cache) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
name = path.Clean(name)
// Lock before R/W access to the cache.
c.m.Lock()
// If the directory is already in the cache, wait for and return the cached value.
if e, ok := c.readDir[name]; ok {
c.m.Unlock()
return e.wait(ctx)
}
// Otherwise, create a new cache entry and queue the operation.
e := newReadDirEntry(name)
err := c.enqueue(ctx, e)
if err != nil {
c.m.Unlock()
return nil, err
}
c.readDir[name] = e
c.m.Unlock()
// Wait for the operation to complete.
return e.wait(ctx)
}
// statFromReadDir returns the file info for a file or directory.
// If the file info is already in the cache, the cached value is returned.
func (c *cache) statFromReadDir(ctx context.Context, name string, entry *readDirEntry) (fs.FileInfo, error) {
_, err := entry.wait(ctx)
if err != nil {
return nil, err
}
// Upon completion of a [ReadDir] call, all directory entries are added to the stat cache and
// enqueue a [Stat] call if necessary (entries for notebooks are incomplete and require a
// real stat call).
//
// This means that the file or directory we're trying to stat, either
//
// - is present in the stat cache
// - doesn't exist.
//
c.m.Lock()
e, ok := c.stat[name]
c.m.Unlock()
if ok {
return e.wait(ctx)
}
return nil, FileDoesNotExistError{name}
}
// Stat returns the file info for a file or directory.
// If the file info is already in the cache, the cached value is returned.
func (c *cache) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
name = path.Clean(name)
// Lock before R/W access to the cache.
c.m.Lock()
// If the file info is already in the cache, wait for and return the cached value.
if e, ok := c.stat[name]; ok {
c.m.Unlock()
return e.wait(ctx)
}
// If the parent directory is in the cache (or queued to be read),
// wait for it to complete to avoid redundant stat calls.
dir := path.Dir(name)
if dir != name {
if e, ok := c.readDir[dir]; ok {
c.m.Unlock()
return c.statFromReadDir(ctx, name, e)
}
}
// Otherwise, create a new cache entry and queue the operation.
e := newStatEntry(name)
err := c.enqueue(ctx, e)
if err != nil {
c.m.Unlock()
return nil, err
}
c.stat[name] = e
c.m.Unlock()
// Wait for the operation to complete.
return e.wait(ctx)
}

View File

@ -84,6 +84,10 @@ func (info wsfsFileInfo) Sys() any {
return info.ObjectInfo return info.ObjectInfo
} }
func (info wsfsFileInfo) WorkspaceObjectInfo() workspace.ObjectInfo {
return info.ObjectInfo
}
// UnmarshalJSON is a custom unmarshaller for the wsfsFileInfo struct. // UnmarshalJSON is a custom unmarshaller for the wsfsFileInfo struct.
// It must be defined for this type because otherwise the implementation // It must be defined for this type because otherwise the implementation
// of the embedded ObjectInfo type will be used. // of the embedded ObjectInfo type will be used.

View File

@ -162,10 +162,11 @@ func NewWorkspaceFilesExtensionsClient(w *databricks.WorkspaceClient, root strin
return nil, err return nil, err
} }
cache := newWorkspaceFilesReadaheadCache(filer)
return &workspaceFilesExtensionsClient{ return &workspaceFilesExtensionsClient{
workspaceClient: w, workspaceClient: w,
wsfs: filer, wsfs: cache,
root: root, root: root,
}, nil }, nil
} }