mirror of https://github.com/databricks/cli.git
429 lines
12 KiB
Go
429 lines
12 KiB
Go
|
package filer
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"io/fs"
|
||
|
"path"
|
||
|
"slices"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/databricks/cli/libs/log"
|
||
|
"github.com/databricks/databricks-sdk-go/service/workspace"
|
||
|
)
|
||
|
|
||
|
// This readahead cache is designed to optimize file system operations by caching the results of
|
||
|
// directory reads (ReadDir) and file/directory metadata reads (Stat). This cache aims to eliminate
|
||
|
// redundant operations and improve performance by storing the results of these operations and
|
||
|
// reusing them when possible. Additionally, the cache performs readahead on ReadDir calls,
|
||
|
// proactively caching information about files and subdirectories to speed up future access.
|
||
|
//
|
||
|
// The cache maintains two primary maps: one for ReadDir results and another for Stat results.
|
||
|
// When a directory read or a stat operation is requested, the cache first checks if the result
|
||
|
// is already available. If it is, the cached result is returned immediately. If not, the
|
||
|
// operation is queued for execution, and the result is stored in the cache once the operation
|
||
|
// completes. In cases where the result is not immediately available, the caller may need to wait
|
||
|
// for the cache entry to be populated. However, because the queue is processed in order by a
|
||
|
// fixed number of worker goroutines, we are guaranteed that the required cache entry will be
|
||
|
// populated and available once the queue processes the corresponding task.
|
||
|
//
|
||
|
// The cache uses a worker pool to process the queued operations concurrently. This is
|
||
|
// implemented using a fixed number of worker goroutines that continually pull tasks from a
|
||
|
// queue and execute them. The queue itself is logically unbounded in the sense that it needs to
|
||
|
// accommodate all the new tasks that may be generated dynamically during the execution of ReadDir
|
||
|
// calls. Specifically, a single ReadDir call can add an unknown number of new Stat and ReadDir
|
||
|
// tasks to the queue because each directory entry may represent a file or subdirectory that
|
||
|
// requires further processing.
|
||
|
//
|
||
|
// For practical reasons, we are not using an unbounded queue but a channel with a maximum size
|
||
|
// of 10,000. This helps prevent excessive memory usage and ensures that the system remains
|
||
|
// responsive under load. If we encounter real examples of subtrees with more than 10,000
|
||
|
// elements, we can consider addressing this limitation in the future. For now, this approach
|
||
|
// balances the need for readahead efficiency with practical constraints.
|
||
|
//
|
||
|
// It is crucial to note that each ReadDir and Stat call is executed only once. The result of a
|
||
|
// Stat call can be served from the cache if the information was already returned by an earlier
|
||
|
// ReadDir call. This helps to avoid redundant operations and ensures that the system remains
|
||
|
// efficient even under high load.
|
||
|
|
||
|
const (
|
||
|
kMaxQueueSize = 10_000
|
||
|
|
||
|
// Number of worker goroutines to process the queue.
|
||
|
// These workers share the same HTTP client and therefore the same rate limiter.
|
||
|
// If this number is increased, the rate limiter should be modified as well.
|
||
|
kNumCacheWorkers = 1
|
||
|
)
|
||
|
|
||
|
// queueFullError is returned when the queue is at capacity.
|
||
|
type queueFullError struct {
|
||
|
name string
|
||
|
}
|
||
|
|
||
|
// Error returns the error message.
|
||
|
func (e queueFullError) Error() string {
|
||
|
return fmt.Sprintf("queue is at capacity (%d); cannot enqueue work for %q", kMaxQueueSize, e.name)
|
||
|
}
|
||
|
|
||
|
// Common type for all cacheable calls.
|
||
|
type cacheEntry struct {
|
||
|
// Channel to signal that the operation has completed.
|
||
|
done chan struct{}
|
||
|
|
||
|
// The (cleaned) name of the file or directory being operated on.
|
||
|
name string
|
||
|
|
||
|
// Return values of the operation.
|
||
|
err error
|
||
|
}
|
||
|
|
||
|
// String returns the path of the file or directory being operated on.
|
||
|
func (e *cacheEntry) String() string {
|
||
|
return e.name
|
||
|
}
|
||
|
|
||
|
// Mark this entry as errored.
|
||
|
func (e *cacheEntry) markError(err error) {
|
||
|
e.err = err
|
||
|
close(e.done)
|
||
|
}
|
||
|
|
||
|
// readDirEntry is the cache entry for a [ReadDir] call.
|
||
|
type readDirEntry struct {
|
||
|
cacheEntry
|
||
|
|
||
|
// Return values of a [ReadDir] call.
|
||
|
entries []fs.DirEntry
|
||
|
}
|
||
|
|
||
|
// Create a new readDirEntry.
|
||
|
func newReadDirEntry(name string) *readDirEntry {
|
||
|
return &readDirEntry{cacheEntry: cacheEntry{done: make(chan struct{}), name: name}}
|
||
|
}
|
||
|
|
||
|
// Execute the operation and signal completion.
|
||
|
func (e *readDirEntry) execute(ctx context.Context, c *cache) {
|
||
|
t1 := time.Now()
|
||
|
e.entries, e.err = c.f.ReadDir(ctx, e.name)
|
||
|
t2 := time.Now()
|
||
|
log.Tracef(ctx, "readdir for %s took %f", e.name, t2.Sub(t1).Seconds())
|
||
|
|
||
|
// Finalize the read call by adding all directory entries to the stat cache.
|
||
|
c.completeReadDir(e.name, e.entries)
|
||
|
|
||
|
// Signal that the operation has completed.
|
||
|
// The return value can now be used by routines waiting on it.
|
||
|
close(e.done)
|
||
|
}
|
||
|
|
||
|
// Wait for the operation to complete and return the result.
|
||
|
func (e *readDirEntry) wait(ctx context.Context) ([]fs.DirEntry, error) {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return nil, ctx.Err()
|
||
|
case <-e.done:
|
||
|
// Note: return a copy of the slice to prevent the caller from modifying the cache.
|
||
|
// The underlying elements are values (see [wsfsDirEntry]) so a shallow copy is sufficient.
|
||
|
return slices.Clone(e.entries), e.err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// statEntry is the cache entry for a [Stat] call.
|
||
|
type statEntry struct {
|
||
|
cacheEntry
|
||
|
|
||
|
// Return values of a [Stat] call.
|
||
|
info fs.FileInfo
|
||
|
}
|
||
|
|
||
|
// Create a new stat entry.
|
||
|
func newStatEntry(name string) *statEntry {
|
||
|
return &statEntry{cacheEntry: cacheEntry{done: make(chan struct{}), name: name}}
|
||
|
}
|
||
|
|
||
|
// Execute the operation and signal completion.
|
||
|
func (e *statEntry) execute(ctx context.Context, c *cache) {
|
||
|
t1 := time.Now()
|
||
|
e.info, e.err = c.f.Stat(ctx, e.name)
|
||
|
t2 := time.Now()
|
||
|
log.Tracef(ctx, "stat for %s took %f", e.name, t2.Sub(t1).Seconds())
|
||
|
|
||
|
// Signal that the operation has completed.
|
||
|
// The return value can now be used by routines waiting on it.
|
||
|
close(e.done)
|
||
|
}
|
||
|
|
||
|
// Wait for the operation to complete and return the result.
|
||
|
func (e *statEntry) wait(ctx context.Context) (fs.FileInfo, error) {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return nil, ctx.Err()
|
||
|
case <-e.done:
|
||
|
return e.info, e.err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Mark the stat entry as done.
|
||
|
func (e *statEntry) markDone(info fs.FileInfo, err error) {
|
||
|
e.info = info
|
||
|
e.err = err
|
||
|
close(e.done)
|
||
|
}
|
||
|
|
||
|
// executable is the interface all cacheable calls must implement.
|
||
|
type executable interface {
|
||
|
fmt.Stringer
|
||
|
|
||
|
execute(ctx context.Context, c *cache)
|
||
|
}
|
||
|
|
||
|
// cache stores all entries for cacheable Workspace File System calls.
|
||
|
// We care about caching only [ReadDir] and [Stat] calls.
|
||
|
type cache struct {
|
||
|
f Filer
|
||
|
m sync.Mutex
|
||
|
|
||
|
readDir map[string]*readDirEntry
|
||
|
stat map[string]*statEntry
|
||
|
|
||
|
// Queue of operations to execute.
|
||
|
queue chan executable
|
||
|
|
||
|
// For tracking the number of active goroutines.
|
||
|
wg sync.WaitGroup
|
||
|
}
|
||
|
|
||
|
func newWorkspaceFilesReadaheadCache(f Filer) *cache {
|
||
|
c := &cache{
|
||
|
f: f,
|
||
|
|
||
|
readDir: make(map[string]*readDirEntry),
|
||
|
stat: make(map[string]*statEntry),
|
||
|
|
||
|
queue: make(chan executable, kMaxQueueSize),
|
||
|
}
|
||
|
|
||
|
ctx := context.Background()
|
||
|
for range kNumCacheWorkers {
|
||
|
c.wg.Add(1)
|
||
|
go c.work(ctx)
|
||
|
}
|
||
|
|
||
|
return c
|
||
|
}
|
||
|
|
||
|
// work until the queue is closed.
|
||
|
func (c *cache) work(ctx context.Context) {
|
||
|
defer c.wg.Done()
|
||
|
|
||
|
for e := range c.queue {
|
||
|
e.execute(ctx, c)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// enqueue adds an operation to the queue.
|
||
|
// If the context is canceled, an error is returned.
|
||
|
// If the queue is full, an error is returned.
|
||
|
//
|
||
|
// Its caller is holding the lock so it cannot block.
|
||
|
func (c *cache) enqueue(ctx context.Context, e executable) error {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
case c.queue <- e:
|
||
|
return nil
|
||
|
default:
|
||
|
return queueFullError{e.String()}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *cache) completeReadDirForDir(name string, dirEntry fs.DirEntry) {
|
||
|
// Add to the stat cache if not already present.
|
||
|
if _, ok := c.stat[name]; !ok {
|
||
|
e := newStatEntry(name)
|
||
|
e.markDone(dirEntry.Info())
|
||
|
c.stat[name] = e
|
||
|
}
|
||
|
|
||
|
// Queue a [ReadDir] call for the directory if not already present.
|
||
|
if _, ok := c.readDir[name]; !ok {
|
||
|
// Create a new cache entry and queue the operation.
|
||
|
e := newReadDirEntry(name)
|
||
|
err := c.enqueue(context.Background(), e)
|
||
|
if err != nil {
|
||
|
e.markError(err)
|
||
|
}
|
||
|
|
||
|
// Add the entry to the cache, even if has an error.
|
||
|
c.readDir[name] = e
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (c *cache) completeReadDirForFile(name string, dirEntry fs.DirEntry) {
|
||
|
// Skip if this entry is already in the cache.
|
||
|
if _, ok := c.stat[name]; ok {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Create a new cache entry.
|
||
|
e := newStatEntry(name)
|
||
|
|
||
|
// Depending on the object type, we either have to perform a real
|
||
|
// stat call, or we can use the [fs.DirEntry] info directly.
|
||
|
switch dirEntry.(wsfsDirEntry).ObjectType {
|
||
|
case workspace.ObjectTypeNotebook:
|
||
|
// Note: once the list API returns `repos_export_format` we can avoid this additional stat call.
|
||
|
// This is the only (?) case where this implementation is tied to the workspace filer.
|
||
|
|
||
|
// Queue a [Stat] call for the file.
|
||
|
err := c.enqueue(context.Background(), e)
|
||
|
if err != nil {
|
||
|
e.markError(err)
|
||
|
}
|
||
|
default:
|
||
|
// Use the [fs.DirEntry] info directly.
|
||
|
e.markDone(dirEntry.Info())
|
||
|
}
|
||
|
|
||
|
// Add the entry to the cache, even if has an error.
|
||
|
c.stat[name] = e
|
||
|
}
|
||
|
|
||
|
func (c *cache) completeReadDir(dir string, entries []fs.DirEntry) {
|
||
|
c.m.Lock()
|
||
|
defer c.m.Unlock()
|
||
|
|
||
|
for _, e := range entries {
|
||
|
name := path.Join(dir, e.Name())
|
||
|
|
||
|
if e.IsDir() {
|
||
|
c.completeReadDirForDir(name, e)
|
||
|
} else {
|
||
|
c.completeReadDirForFile(name, e)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Cleanup closes the queue and waits for all goroutines to exit.
|
||
|
func (c *cache) Cleanup() {
|
||
|
close(c.queue)
|
||
|
c.wg.Wait()
|
||
|
}
|
||
|
|
||
|
// Write passes through to the underlying Filer.
|
||
|
func (c *cache) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
|
||
|
return c.f.Write(ctx, name, reader, mode...)
|
||
|
}
|
||
|
|
||
|
// Read passes through to the underlying Filer.
|
||
|
func (c *cache) Read(ctx context.Context, name string) (io.ReadCloser, error) {
|
||
|
return c.f.Read(ctx, name)
|
||
|
}
|
||
|
|
||
|
// Delete passes through to the underlying Filer.
|
||
|
func (c *cache) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
|
||
|
return c.f.Delete(ctx, name, mode...)
|
||
|
}
|
||
|
|
||
|
// Mkdir passes through to the underlying Filer.
|
||
|
func (c *cache) Mkdir(ctx context.Context, name string) error {
|
||
|
return c.f.Mkdir(ctx, name)
|
||
|
}
|
||
|
|
||
|
// ReadDir returns the entries in a directory.
|
||
|
// If the directory is already in the cache, the cached value is returned.
|
||
|
func (c *cache) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
|
||
|
name = path.Clean(name)
|
||
|
|
||
|
// Lock before R/W access to the cache.
|
||
|
c.m.Lock()
|
||
|
|
||
|
// If the directory is already in the cache, wait for and return the cached value.
|
||
|
if e, ok := c.readDir[name]; ok {
|
||
|
c.m.Unlock()
|
||
|
return e.wait(ctx)
|
||
|
}
|
||
|
|
||
|
// Otherwise, create a new cache entry and queue the operation.
|
||
|
e := newReadDirEntry(name)
|
||
|
err := c.enqueue(ctx, e)
|
||
|
if err != nil {
|
||
|
c.m.Unlock()
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
c.readDir[name] = e
|
||
|
c.m.Unlock()
|
||
|
|
||
|
// Wait for the operation to complete.
|
||
|
return e.wait(ctx)
|
||
|
}
|
||
|
|
||
|
// statFromReadDir returns the file info for a file or directory.
|
||
|
// If the file info is already in the cache, the cached value is returned.
|
||
|
func (c *cache) statFromReadDir(ctx context.Context, name string, entry *readDirEntry) (fs.FileInfo, error) {
|
||
|
_, err := entry.wait(ctx)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// Upon completion of a [ReadDir] call, all directory entries are added to the stat cache and
|
||
|
// enqueue a [Stat] call if necessary (entries for notebooks are incomplete and require a
|
||
|
// real stat call).
|
||
|
//
|
||
|
// This means that the file or directory we're trying to stat, either
|
||
|
//
|
||
|
// - is present in the stat cache
|
||
|
// - doesn't exist.
|
||
|
//
|
||
|
c.m.Lock()
|
||
|
e, ok := c.stat[name]
|
||
|
c.m.Unlock()
|
||
|
if ok {
|
||
|
return e.wait(ctx)
|
||
|
}
|
||
|
|
||
|
return nil, FileDoesNotExistError{name}
|
||
|
}
|
||
|
|
||
|
// Stat returns the file info for a file or directory.
|
||
|
// If the file info is already in the cache, the cached value is returned.
|
||
|
func (c *cache) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
|
||
|
name = path.Clean(name)
|
||
|
|
||
|
// Lock before R/W access to the cache.
|
||
|
c.m.Lock()
|
||
|
|
||
|
// If the file info is already in the cache, wait for and return the cached value.
|
||
|
if e, ok := c.stat[name]; ok {
|
||
|
c.m.Unlock()
|
||
|
return e.wait(ctx)
|
||
|
}
|
||
|
|
||
|
// If the parent directory is in the cache (or queued to be read),
|
||
|
// wait for it to complete to avoid redundant stat calls.
|
||
|
dir := path.Dir(name)
|
||
|
if dir != name {
|
||
|
if e, ok := c.readDir[dir]; ok {
|
||
|
c.m.Unlock()
|
||
|
return c.statFromReadDir(ctx, name, e)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Otherwise, create a new cache entry and queue the operation.
|
||
|
e := newStatEntry(name)
|
||
|
err := c.enqueue(ctx, e)
|
||
|
if err != nil {
|
||
|
c.m.Unlock()
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
c.stat[name] = e
|
||
|
c.m.Unlock()
|
||
|
|
||
|
// Wait for the operation to complete.
|
||
|
return e.wait(ctx)
|
||
|
}
|