package filer

import (
	"context"
	"errors"
	"fmt"
	"io"
	"io/fs"
	"net/http"
	"net/url"
	"path"
	"slices"
	"sort"
	"strings"
	"time"

	"github.com/databricks/databricks-sdk-go"
	"github.com/databricks/databricks-sdk-go/apierr"
	"github.com/databricks/databricks-sdk-go/client"
	"github.com/databricks/databricks-sdk-go/listing"
	"github.com/databricks/databricks-sdk-go/service/files"
	"golang.org/x/sync/errgroup"
)

// As of 19th Feb 2024, the Files API backend has a rate limit of 10 concurrent
// requests and 100 QPS. We limit the number of concurrent requests to 5 to
// avoid hitting the rate limit.
const maxFilesRequestsInFlight = 5

// Type that implements fs.FileInfo for the Files API.
// This is required for the filer.Stat() method.
type filesApiFileInfo struct {
	absPath      string
	isDir        bool
	fileSize     int64
	lastModified int64
}

func (info filesApiFileInfo) Name() string {
	return path.Base(info.absPath)
}

func (info filesApiFileInfo) Size() int64 {
	return info.fileSize
}

func (info filesApiFileInfo) Mode() fs.FileMode {
	mode := fs.ModePerm
	if info.isDir {
		mode |= fs.ModeDir
	}
	return mode
}

func (info filesApiFileInfo) ModTime() time.Time {
	return time.UnixMilli(info.lastModified)
}

func (info filesApiFileInfo) IsDir() bool {
	return info.isDir
}

func (info filesApiFileInfo) Sys() any {
	return nil
}

// Type that implements fs.DirEntry for the Files API.
// This is required for the filer.ReadDir() method.
type filesApiDirEntry struct {
	i filesApiFileInfo
}

func (e filesApiDirEntry) Name() string {
	return e.i.Name()
}

func (e filesApiDirEntry) IsDir() bool {
	return e.i.IsDir()
}

func (e filesApiDirEntry) Type() fs.FileMode {
	return e.i.Mode()
}

func (e filesApiDirEntry) Info() (fs.FileInfo, error) {
	return e.i, nil
}

// FilesClient implements the [Filer] interface for the Files API backend.
type FilesClient struct {
	workspaceClient *databricks.WorkspaceClient
	apiClient       *client.DatabricksClient

	// File operations will be relative to this path.
	root WorkspaceRootPath
}

func NewFilesClient(w *databricks.WorkspaceClient, root string) (Filer, error) {
	apiClient, err := client.New(w.Config)
	if err != nil {
		return nil, err
	}

	return &FilesClient{
		workspaceClient: w,
		apiClient:       apiClient,

		root: NewWorkspaceRootPath(root),
	}, nil
}

func (w *FilesClient) urlPath(name string) (string, string, error) {
	absPath, err := w.root.Join(name)
	if err != nil {
		return "", "", err
	}

	// The user specified part of the path must be escaped.
	urlPath := fmt.Sprintf(
		"/api/2.0/fs/files/%s",
		url.PathEscape(strings.TrimLeft(absPath, "/")),
	)

	return absPath, urlPath, nil
}

func (w *FilesClient) Write(ctx context.Context, name string, reader io.Reader, mode ...WriteMode) error {
	absPath, urlPath, err := w.urlPath(name)
	if err != nil {
		return err
	}

	// Check that target path exists if CreateParentDirectories mode is not set
	if !slices.Contains(mode, CreateParentDirectories) {
		err := w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, path.Dir(absPath))
		if err != nil {
			var aerr *apierr.APIError
			if !errors.As(err, &aerr) {
				return err
			}

			// This API returns a 404 if the file doesn't exist.
			if aerr.StatusCode == http.StatusNotFound {
				return NoSuchDirectoryError{path.Dir(absPath)}
			}

			return err
		}
	}

	overwrite := slices.Contains(mode, OverwriteIfExists)
	urlPath = fmt.Sprintf("%s?overwrite=%t", urlPath, overwrite)
	headers := map[string]string{"Content-Type": "application/octet-stream"}
	err = w.apiClient.Do(ctx, http.MethodPut, urlPath, headers, reader, nil)

	// Return early on success.
	if err == nil {
		return nil
	}

	// Special handling of this error only if it is an API error.
	var aerr *apierr.APIError
	if !errors.As(err, &aerr) {
		return err
	}

	// This API returns 409 if the file already exists, when the object type is file
	if aerr.StatusCode == http.StatusConflict && aerr.ErrorCode == "ALREADY_EXISTS" {
		return FileAlreadyExistsError{absPath}
	}

	return err
}

func (w *FilesClient) Read(ctx context.Context, name string) (io.ReadCloser, error) {
	absPath, urlPath, err := w.urlPath(name)
	if err != nil {
		return nil, err
	}

	var reader io.ReadCloser
	err = w.apiClient.Do(ctx, http.MethodGet, urlPath, nil, nil, &reader)

	// Return early on success.
	if err == nil {
		return reader, nil
	}

	// Special handling of this error only if it is an API error.
	var aerr *apierr.APIError
	if !errors.As(err, &aerr) {
		return nil, err
	}

	// This API returns a 404 if the specified path does not exist.
	if aerr.StatusCode == http.StatusNotFound {
		// Check if the path is a directory. If so, return not a file error.
		if _, err := w.statDir(ctx, name); err == nil {
			return nil, NotAFile{absPath}
		}

		// No file or directory exists at the specified path. Return no such file error.
		return nil, FileDoesNotExistError{absPath}
	}

	return nil, err
}

func (w *FilesClient) deleteFile(ctx context.Context, name string) error {
	absPath, err := w.root.Join(name)
	if err != nil {
		return err
	}

	// Illegal to delete the root path.
	if absPath == w.root.rootPath {
		return CannotDeleteRootError{}
	}

	err = w.workspaceClient.Files.DeleteByFilePath(ctx, absPath)

	// Return early on success.
	if err == nil {
		return nil
	}

	var aerr *apierr.APIError
	// Special handling of this error only if it is an API error.
	if !errors.As(err, &aerr) {
		return err
	}

	// This files delete API returns a 404 if the specified path does not exist.
	if aerr.StatusCode == http.StatusNotFound {
		return FileDoesNotExistError{absPath}
	}

	return err
}

func (w *FilesClient) deleteDirectory(ctx context.Context, name string) error {
	absPath, err := w.root.Join(name)
	if err != nil {
		return err
	}

	// Illegal to delete the root path.
	if absPath == w.root.rootPath {
		return CannotDeleteRootError{}
	}

	err = w.workspaceClient.Files.DeleteDirectoryByDirectoryPath(ctx, absPath)

	var aerr *apierr.APIError
	// Special handling of this error only if it is an API error.
	if !errors.As(err, &aerr) {
		return err
	}

	// The directory delete API returns a 400 if the directory is not empty
	if aerr.StatusCode == http.StatusBadRequest {
		reasons := []string{}
		for _, detail := range aerr.Details {
			reasons = append(reasons, detail.Reason)
		}
		// Error code 400 is generic and can be returned for other reasons. Make
		// sure one of the reasons for the error is that the directory is not empty.
		if !slices.Contains(reasons, "FILES_API_DIRECTORY_IS_NOT_EMPTY") {
			return err
		}
		return DirectoryNotEmptyError{absPath}
	}

	return err
}

func (w *FilesClient) recursiveDelete(ctx context.Context, name string) error {
	filerFS := NewFS(ctx, w)
	dirsToDelete := make([]string, 0)
	filesToDelete := make([]string, 0)
	callback := func(path string, d fs.DirEntry, err error) error {
		if err != nil {
			return err
		}

		// Files API does not allowing deleting non-empty directories. We instead
		// collect the directories to delete and delete them once all the files have
		// been deleted.
		if d.IsDir() {
			dirsToDelete = append(dirsToDelete, path)
			return nil
		}

		filesToDelete = append(filesToDelete, path)
		return nil
	}

	// Walk the directory and accumulate the files and directories to delete.
	err := fs.WalkDir(filerFS, name, callback)
	if err != nil {
		return err
	}

	// Delete the files in parallel.
	group, groupCtx := errgroup.WithContext(ctx)
	group.SetLimit(maxFilesRequestsInFlight)

	for _, file := range filesToDelete {
		file := file

		// Skip the file if the context has already been cancelled.
		select {
		case <-groupCtx.Done():
			continue
		default:
			// Proceed.
		}

		group.Go(func() error {
			return w.deleteFile(groupCtx, file)
		})
	}

	// Wait for the files to be deleted and return the first non-nil error.
	err = group.Wait()
	if err != nil {
		return err
	}

	// Delete the directories in reverse order to ensure that the parent
	// directories are deleted after the children. This is possible because
	// fs.WalkDir walks the directories in lexicographical order.
	for i := len(dirsToDelete) - 1; i >= 0; i-- {
		err := w.deleteDirectory(ctx, dirsToDelete[i])
		if err != nil {
			return err
		}
	}
	return nil
}

func (w *FilesClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
	if slices.Contains(mode, DeleteRecursively) {
		return w.recursiveDelete(ctx, name)
	}

	// Issue a stat call to determine if the path is a file or directory.
	info, err := w.Stat(ctx, name)
	if err != nil {
		return err
	}

	// Issue the delete call for a directory
	if info.IsDir() {
		return w.deleteDirectory(ctx, name)
	}

	return w.deleteFile(ctx, name)
}

func (w *FilesClient) ReadDir(ctx context.Context, name string) ([]fs.DirEntry, error) {
	absPath, err := w.root.Join(name)
	if err != nil {
		return nil, err
	}

	iter := w.workspaceClient.Files.ListDirectoryContents(ctx, files.ListDirectoryContentsRequest{
		DirectoryPath: absPath,
	})

	files, err := listing.ToSlice(ctx, iter)

	// Return early on success.
	if err == nil {
		entries := make([]fs.DirEntry, len(files))
		for i, file := range files {
			entries[i] = filesApiDirEntry{
				i: filesApiFileInfo{
					absPath:      file.Path,
					isDir:        file.IsDirectory,
					fileSize:     file.FileSize,
					lastModified: file.LastModified,
				},
			}
		}

		// Sort by name for parity with os.ReadDir.
		sort.Slice(entries, func(i, j int) bool { return entries[i].Name() < entries[j].Name() })
		return entries, nil
	}

	// Special handling of this error only if it is an API error.
	var apierr *apierr.APIError
	if !errors.As(err, &apierr) {
		return nil, err
	}

	// This API returns a 404 if the specified path does not exist.
	if apierr.StatusCode == http.StatusNotFound {
		// Check if the path is a file. If so, return not a directory error.
		if _, err := w.statFile(ctx, name); err == nil {
			return nil, NotADirectory{absPath}
		}

		// No file or directory exists at the specified path. Return no such directory error.
		return nil, NoSuchDirectoryError{absPath}
	}
	return nil, err
}

func (w *FilesClient) Mkdir(ctx context.Context, name string) error {
	absPath, err := w.root.Join(name)
	if err != nil {
		return err
	}

	err = w.workspaceClient.Files.CreateDirectory(ctx, files.CreateDirectoryRequest{
		DirectoryPath: absPath,
	})

	// Special handling of this error only if it is an API error.
	var aerr *apierr.APIError
	if errors.As(err, &aerr) && aerr.StatusCode == http.StatusConflict {
		return FileAlreadyExistsError{absPath}
	}

	return err
}

// Get file metadata for a file using the Files API.
func (w *FilesClient) statFile(ctx context.Context, name string) (fs.FileInfo, error) {
	absPath, err := w.root.Join(name)
	if err != nil {
		return nil, err
	}

	fileInfo, err := w.workspaceClient.Files.GetMetadataByFilePath(ctx, absPath)

	// If the HEAD requests succeeds, the file exists.
	if err == nil {
		return filesApiFileInfo{
			absPath:  absPath,
			isDir:    false,
			fileSize: fileInfo.ContentLength,
		}, nil
	}

	// Special handling of this error only if it is an API error.
	var aerr *apierr.APIError
	if !errors.As(err, &aerr) {
		return nil, err
	}

	// This API returns a 404 if the specified path does not exist.
	if aerr.StatusCode == http.StatusNotFound {
		return nil, FileDoesNotExistError{absPath}
	}

	return nil, err
}

// Get file metadata for a directory using the Files API.
func (w *FilesClient) statDir(ctx context.Context, name string) (fs.FileInfo, error) {
	absPath, err := w.root.Join(name)
	if err != nil {
		return nil, err
	}

	err = w.workspaceClient.Files.GetDirectoryMetadataByDirectoryPath(ctx, absPath)

	// If the HEAD requests succeeds, the directory exists.
	if err == nil {
		return filesApiFileInfo{absPath: absPath, isDir: true}, nil
	}

	// Special handling of this error only if it is an API error.
	var aerr *apierr.APIError
	if !errors.As(err, &aerr) {
		return nil, err
	}

	// The directory metadata API returns a 404 if the specified path does not exist.
	if aerr.StatusCode == http.StatusNotFound {
		return nil, NoSuchDirectoryError{absPath}
	}

	return nil, err
}

func (w *FilesClient) Stat(ctx context.Context, name string) (fs.FileInfo, error) {
	// Assume that the path is a directory and issue a stat call.
	dirInfo, err := w.statDir(ctx, name)

	// If the file exists, return early.
	if err == nil {
		return dirInfo, nil
	}

	// Return early if the error is not a NoSuchDirectoryError.
	if !errors.As(err, &NoSuchDirectoryError{}) {
		return nil, err
	}

	// Since the path is not a directory, assume that it is a file and issue a stat call.
	return w.statFile(ctx, name)
}