package sync

import (
	"context"
	"fmt"
	"time"

	"github.com/databricks/cli/libs/filer"
	"github.com/databricks/cli/libs/fileset"
	"github.com/databricks/cli/libs/git"
	"github.com/databricks/cli/libs/log"
	"github.com/databricks/cli/libs/set"
	"github.com/databricks/cli/libs/vfs"
	"github.com/databricks/databricks-sdk-go"
	"github.com/databricks/databricks-sdk-go/service/iam"
)

type SyncOptions struct {
	LocalRoot vfs.Path
	Paths     []string
	Include   []string
	Exclude   []string

	RemotePath string

	Full bool

	SnapshotBasePath string

	PollInterval time.Duration

	WorkspaceClient *databricks.WorkspaceClient

	CurrentUser *iam.User

	Host string
}

type Sync struct {
	*SyncOptions

	fileSet        *git.FileSet
	includeFileSet *fileset.FileSet
	excludeFileSet *fileset.FileSet

	snapshot *Snapshot
	filer    filer.Filer

	// Synchronization progress events are sent to this event notifier.
	notifier EventNotifier
	seq      int
}

// New initializes and returns a new [Sync] instance.
func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
	fileSet, err := git.NewFileSet(opts.LocalRoot, opts.Paths)
	if err != nil {
		return nil, err
	}

	err = fileSet.EnsureValidGitIgnoreExists()
	if err != nil {
		return nil, err
	}

	includeFileSet, err := fileset.NewGlobSet(opts.LocalRoot, opts.Include)
	if err != nil {
		return nil, err
	}

	excludeFileSet, err := fileset.NewGlobSet(opts.LocalRoot, opts.Exclude)
	if err != nil {
		return nil, err
	}

	// Verify that the remote path we're about to synchronize to is valid and allowed.
	err = EnsureRemotePathIsUsable(ctx, opts.WorkspaceClient, opts.RemotePath, opts.CurrentUser)
	if err != nil {
		return nil, err
	}

	// TODO: The host may be late-initialized in certain Azure setups where we
	// specify the workspace by its resource ID. tracked in: https://databricks.atlassian.net/browse/DECO-194
	opts.Host = opts.WorkspaceClient.Config.Host
	if opts.Host == "" {
		return nil, fmt.Errorf("failed to resolve host for snapshot")
	}

	// For full sync, we start with an empty snapshot.
	// For incremental sync, we try to load an existing snapshot to start from.
	var snapshot *Snapshot
	if opts.Full {
		snapshot, err = newSnapshot(ctx, &opts)
		if err != nil {
			return nil, fmt.Errorf("unable to instantiate new sync snapshot: %w", err)
		}
	} else {
		snapshot, err = loadOrNewSnapshot(ctx, &opts)
		if err != nil {
			return nil, fmt.Errorf("unable to load sync snapshot: %w", err)
		}
	}

	filer, err := filer.NewWorkspaceFilesClient(opts.WorkspaceClient, opts.RemotePath)
	if err != nil {
		return nil, err
	}

	return &Sync{
		SyncOptions: &opts,

		fileSet:        fileSet,
		includeFileSet: includeFileSet,
		excludeFileSet: excludeFileSet,
		snapshot:       snapshot,
		filer:          filer,
		notifier:       &NopNotifier{},
		seq:            0,
	}, nil
}

func (s *Sync) Events() <-chan Event {
	ch := make(chan Event, MaxRequestsInFlight)
	s.notifier = &ChannelNotifier{ch}
	return ch
}

func (s *Sync) Close() {
	if s.notifier == nil {
		return
	}
	s.notifier.Close()
	s.notifier = nil
}

func (s *Sync) notifyStart(ctx context.Context, d diff) {
	// If this is not the initial iteration we can ignore no-ops.
	if s.seq > 0 && d.IsEmpty() {
		return
	}
	s.notifier.Notify(ctx, newEventStart(s.seq, d.put, d.delete))
}

func (s *Sync) notifyProgress(ctx context.Context, action EventAction, path string, progress float32) {
	s.notifier.Notify(ctx, newEventProgress(s.seq, action, path, progress))
}

func (s *Sync) notifyComplete(ctx context.Context, d diff) {
	// If this is not the initial iteration we can ignore no-ops.
	if s.seq > 0 && d.IsEmpty() {
		return
	}
	s.notifier.Notify(ctx, newEventComplete(s.seq, d.put, d.delete))
	s.seq++
}

// Upload all files in the file tree rooted at the local path configured in the
// SyncOptions to the remote path configured in the SyncOptions.
//
// Returns the list of files tracked (and synchronized) by the syncer during the run,
// and an error if any occurred.
func (s *Sync) RunOnce(ctx context.Context) ([]fileset.File, error) {
	files, err := s.GetFileList(ctx)
	if err != nil {
		return files, err
	}

	change, err := s.snapshot.diff(ctx, files)
	if err != nil {
		return files, err
	}

	s.notifyStart(ctx, change)
	if change.IsEmpty() {
		s.notifyComplete(ctx, change)
		return files, nil
	}

	err = s.applyDiff(ctx, change)
	if err != nil {
		return files, err
	}

	err = s.snapshot.Save(ctx)
	if err != nil {
		log.Errorf(ctx, "cannot store snapshot: %s", err)
		return files, err
	}

	s.notifyComplete(ctx, change)
	return files, nil
}

func (s *Sync) GetFileList(ctx context.Context) ([]fileset.File, error) {
	// tradeoff: doing portable monitoring only due to macOS max descriptor manual ulimit setting requirement
	// https://github.com/gorakhargosh/watchdog/blob/master/src/watchdog/observers/kqueue.py#L394-L418
	all := set.NewSetF(func(f fileset.File) string {
		return f.Relative
	})
	gitFiles, err := s.fileSet.Files()
	if err != nil {
		log.Errorf(ctx, "cannot list files: %s", err)
		return nil, err
	}
	all.Add(gitFiles...)

	include, err := s.includeFileSet.Files()
	if err != nil {
		log.Errorf(ctx, "cannot list include files: %s", err)
		return nil, err
	}

	all.Add(include...)

	exclude, err := s.excludeFileSet.Files()
	if err != nil {
		log.Errorf(ctx, "cannot list exclude files: %s", err)
		return nil, err
	}

	for _, f := range exclude {
		all.Remove(f)
	}

	return all.Iter(), nil
}

func (s *Sync) RunContinuous(ctx context.Context) error {
	ticker := time.NewTicker(s.PollInterval)
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-ticker.C:
			_, err := s.RunOnce(ctx)
			if err != nil {
				return err
			}
		}
	}
}