package filer

import (
	"context"
	"fmt"
	"io"
	"io/fs"
	"path"
	"slices"
	"sync"
	"time"

	"github.com/databricks/cli/libs/log"
	"github.com/databricks/databricks-sdk-go/service/workspace"
)

// This readahead cache is designed to optimize file system operations by caching the results of
// directory reads (ReadDir) and file/directory metadata reads (Stat). This cache aims to eliminate
// redundant operations and improve performance by storing the results of these operations and
// reusing them when possible. Additionally, the cache performs readahead on ReadDir calls,
// proactively caching information about files and subdirectories to speed up future access.
//
// The cache maintains two primary maps: one for ReadDir results and another for Stat results.
// When a directory read or a stat operation is requested, the cache first checks if the result
// is already available. If it is, the cached result is returned immediately. If not, the
// operation is queued for execution, and the result is stored in the cache once the operation
// completes. In cases where the result is not immediately available, the caller may need to wait
// for the cache entry to be populated. However, because the queue is processed in order by a
// fixed number of worker goroutines, we are guaranteed that the required cache entry will be
// populated and available once the queue processes the corresponding task.
//
// The cache uses a worker pool to process the queued operations concurrently. This is
// implemented using a fixed number of worker goroutines that continually pull tasks from a
// queue and execute them. The queue itself is logically unbounded in the sense that it needs to
// accommodate all the new tasks that may be generated dynamically during the execution of ReadDir
// calls. Specifically, a single ReadDir call can add an unknown number of new Stat and ReadDir
// tasks to the queue because each directory entry may represent a file or subdirectory that
// requires further processing.
//
// For practical reasons, we are not using an unbounded queue but a channel with a maximum size
// of 10,000. This helps prevent excessive memory usage and ensures that the system remains
// responsive under load. If we encounter real examples of subtrees with more than 10,000
// elements, we can consider addressing this limitation in the future. For now, this approach
// balances the need for readahead efficiency with practical constraints.
//
// It is crucial to note that each ReadDir and Stat call is executed only once. The result of a
// Stat call can be served from the cache if the information was already returned by an earlier
// ReadDir call. This helps to avoid redundant operations and ensures that the system remains
// efficient even under high load.

const (
	kMaxQueueSize = 10_000

	// Number of worker goroutines to process the queue.
	// These workers share the same HTTP client and therefore the same rate limiter.
	// If this number is increased, the rate limiter should be modified as well.
	kNumCacheWorkers = 1
)

// queueFullError is returned when the queue is at capacity.
type queueFullError struct {
	name string
}

// Error returns the error message.
func (e queueFullError) Error() string {
	return fmt.Sprintf("queue is at capacity (%d); cannot enqueue work for %q", kMaxQueueSize, e.name)
}

// Common type for all cacheable calls.
type cacheEntry struct {
	// Channel to signal that the operation has completed.
	done chan struct{}

	// The (cleaned) name of the file or directory being operated on.
	name string

	// Return values of the operation.
	err error
}

// String returns the path of the file or directory being operated on.
func (e *cacheEntry) String() string {
	return e.name
}

// Mark this entry as errored.
func (e *cacheEntry) markError(err error) {
	e.err = err
	close(e.done)
}

// readDirEntry is the cache entry for a [ReadDir] call.
type readDirEntry struct {
	cacheEntry

	// Return values of a [ReadDir] call.
	entries []fs.DirEntry
}

// Create a new readDirEntry.
func newReadDirEntry(name string) *readDirEntry {
	return &readDirEntry{cacheEntry: cacheEntry{done: make(chan struct{}), name: name}}
}

// Execute the operation and signal completion.
func (e *readDirEntry) execute(ctx context.Context, c *cache) {
	t1 := time.Now()
	e.entries, e.err = c.f.ReadDir(ctx, e.name)
	t2 := time.Now()
	log.Tracef(ctx, "readdir for %s took %f", e.name, t2.Sub(t1).Seconds())

	// Finalize the read call by adding all directory entries to the stat cache.
	c.completeReadDir(e.name, e.entries)

	// Signal that the operation has completed.
	// The return value can now be used by routines waiting on it.
	close(e.done)
}

// Wait for the operation to complete and return the result.
func (e *readDirEntry) wait(ctx context.Context) ([]fs.DirEntry, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-e.done:
		// Note: return a copy of the slice to prevent the caller from modifying the cache.
		// The underlying elements are values (see [wsfsDirEntry]) so a shallow copy is sufficient.
		return slices.Clone(e.entries), e.err
	}
}

// statEntry is the cache entry for a [Stat] call.
type statEntry struct {
	cacheEntry

	// Return values of a [Stat] call.
	info fs.FileInfo
}

// Create a new stat entry.
func newStatEntry(name string) *statEntry {
	return &statEntry{cacheEntry: cacheEntry{done: make(chan struct{}), name: name}}
}

// Execute the operation and signal completion.
func (e *statEntry) execute(ctx context.Context, c *cache) {
	t1 := time.Now()
	e.info, e.err = c.f.Stat(ctx, e.name)
	t2 := time.Now()
	log.Tracef(ctx, "stat for %s took %f", e.name, t2.Sub(t1).Seconds())

	// Signal that the operation has completed.
	// The return value can now be used by routines waiting on it.
	close(e.done)
}

// Wait for the operation to complete and return the result.
func (e *statEntry) wait(ctx context.Context) (fs.FileInfo, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	case <-e.done:
		return e.info, e.err
	}
}

// Mark the stat entry as done.
func (e *statEntry) markDone(info fs.FileInfo, err error) {
	e.info = info
	e.err = err
	close(e.done)
}

// executable is the interface all cacheable calls must implement.
type executable interface {
	fmt.Stringer

	execute(ctx context.Context, c *cache)
}

// cache stores all entries for cacheable Workspace File System calls.
// We care about caching only [ReadDir] and [Stat] calls.
type cache struct {
	f Filer
	m sync.Mutex

	readDir map[string]*readDirEntry
	stat    map[string]*statEntry

	// Queue of operations to execute.
	queue chan executable

	// For tracking the number of active goroutines.
	wg sync.WaitGroup
}

func newWorkspaceFilesReadaheadCache(f Filer) *cache {
	c := &cache{
		f: f,

		readDir: make(map[string]*readDirEntry),
		stat:    make(map[string]*statEntry),

		queue: make(chan executable, kMaxQueueSize),
	}

	ctx := context.Background()
	for range kNumCacheWorkers {
		c.wg.Add(1)
		go c.work(ctx)
	}

	return c
}

// work until the queue is closed.
func (c *cache) work(ctx context.Context) {
	defer c.wg.Done()

	for e := range c.queue {
		e.execute(ctx, c)
	}
}

// enqueue adds an operation to the queue.
// If the context is canceled, an error is returned.
// If the queue is full, an error is returned.
//
// Its caller is holding the lock so it cannot block.
func (c *cache) enqueue(ctx context.Context, e executable) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case c.queue <- e:
		return nil
	default:
		return queueFullError{e.String()}
	}
}

func (c *cache) completeReadDirForDir(name string, dirEntry fs.DirEntry) {
	// Add to the stat cache if not already present.
	if _, ok := c.stat[name]; !ok {
		e := newStatEntry(name)
		e.markDone(dirEntry.Info())
		c.stat[name] = e
	}

	// Queue a [ReadDir] call for the directory if not already present.
	if _, ok := c.readDir[name]; !ok {
		// Create a new cache entry and queue the operation.
		e := newReadDirEntry(name)
		err := c.enqueue(context.Background(), e)
		if err != nil {
			e.markError(err)
		}

		// Add the entry to the cache, even if has an error.
		c.readDir[name] = e
	}
}

func (c *cache) completeReadDirForFile(name string, dirEntry fs.DirEntry) {
	// Skip if this entry is already in the cache.
	if _, ok := c.stat[name]; ok {
		return
	}

	// Create a new cache entry.
	e := newStatEntry(name)

	// Depending on the object type, we either have to perform a real
	// stat call, or we can use the [fs.DirEntry] info directly.
	switch dirEntry.(wsfsDirEntry).ObjectType {
	case workspace.ObjectTypeNotebook:
		// Note: once the list API returns `repos_export_format` we can avoid this additional stat call.
		// This is the only (?) case where this implementation is tied to the workspace filer.

		// Queue a [Stat] call for the file.
		err := c.enqueue(context.Background(), e)
		if err != nil {
			e.markError(err)
		}
	default:
		// Use the [fs.DirEntry] info directly.
		e.markDone(dirEntry.Info())
	}

	// Add the entry to the cache, even if has an error.
	c.stat[name] = e
}

func (c *cache) completeReadDir(dir string, entries []fs.DirEntry) {
	c.m.Lock()
	defer c.m.Unlock()

	for _, e := range entries {
		name := path.Join(dir, e.Name())

		if e.IsDir() {
			c.completeReadDirForDir(name, e)
		} else {
			c.completeReadDirForFile(name, e)
		}
	}
}

// Cleanup closes the queue and waits for all goroutines to exit.
func (c *cache) Cleanup() {
	close(c.queue)
	c.wg.Wait()
}

// Write passes through to the underlying Filer.
func (c *cache) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
	return c.f.Write(ctx, name, reader, mode...)
}

// Read passes through to the underlying Filer.
func (c *cache) Read(ctx context.Context, name string) (io.ReadCloser, error) {
	return c.f.Read(ctx, name)
}

// Delete passes through to the underlying Filer.
func (c *cache) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
	return c.f.Delete(ctx, name, mode...)
}

// Mkdir passes through to the underlying Filer.
func (c *cache) Mkdir(ctx context.Context, name string) error {
	return c.f.Mkdir(ctx, name)
}

// ReadDir returns the entries in a directory.
// If the directory is already in the cache, the cached value is returned.
func (c *cache) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
	name = path.Clean(name)

	// Lock before R/W access to the cache.
	c.m.Lock()

	// If the directory is already in the cache, wait for and return the cached value.
	if e, ok := c.readDir[name]; ok {
		c.m.Unlock()
		return e.wait(ctx)
	}

	// Otherwise, create a new cache entry and queue the operation.
	e := newReadDirEntry(name)
	err := c.enqueue(ctx, e)
	if err != nil {
		c.m.Unlock()
		return nil, err
	}

	c.readDir[name] = e
	c.m.Unlock()

	// Wait for the operation to complete.
	return e.wait(ctx)
}

// statFromReadDir returns the file info for a file or directory.
// If the file info is already in the cache, the cached value is returned.
func (c *cache) statFromReadDir(ctx context.Context, name string, entry *readDirEntry) (fs.FileInfo, error) {
	_, err := entry.wait(ctx)
	if err != nil {
		return nil, err
	}

	// Upon completion of a [ReadDir] call, all directory entries are added to the stat cache and
	// enqueue a [Stat] call if necessary (entries for notebooks are incomplete and require a
	// real stat call).
	//
	// This means that the file or directory we're trying to stat, either
	//
	//   - is present in the stat cache
	//   - doesn't exist.
	//
	c.m.Lock()
	e, ok := c.stat[name]
	c.m.Unlock()
	if ok {
		return e.wait(ctx)
	}

	return nil, FileDoesNotExistError{name}
}

// Stat returns the file info for a file or directory.
// If the file info is already in the cache, the cached value is returned.
func (c *cache) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
	name = path.Clean(name)

	// Lock before R/W access to the cache.
	c.m.Lock()

	// If the file info is already in the cache, wait for and return the cached value.
	if e, ok := c.stat[name]; ok {
		c.m.Unlock()
		return e.wait(ctx)
	}

	// If the parent directory is in the cache (or queued to be read),
	// wait for it to complete to avoid redundant stat calls.
	dir := path.Dir(name)
	if dir != name {
		if e, ok := c.readDir[dir]; ok {
			c.m.Unlock()
			return c.statFromReadDir(ctx, name, e)
		}
	}

	// Otherwise, create a new cache entry and queue the operation.
	e := newStatEntry(name)
	err := c.enqueue(ctx, e)
	if err != nil {
		c.m.Unlock()
		return nil, err
	}

	c.stat[name] = e
	c.m.Unlock()

	// Wait for the operation to complete.
	return e.wait(ctx)
}