package testcli

import (
	"bufio"
	"bytes"
	"context"
	"encoding/json"
	"io"
	"strings"
	"sync"
	"time"

	"github.com/stretchr/testify/require"

	"github.com/databricks/cli/cmd"
	"github.com/databricks/cli/cmd/root"
	"github.com/databricks/cli/internal/testutil"
	"github.com/databricks/cli/libs/cmdio"
	"github.com/databricks/cli/libs/flags"
)

// Helper for running the root command in the background.
// It ensures that the background goroutine terminates upon
// test completion through cancelling the command context.
type Runner struct {
	testutil.TestingT

	args   []string
	stdout bytes.Buffer
	stderr bytes.Buffer
	stdinR *io.PipeReader
	stdinW *io.PipeWriter

	ctx context.Context

	// Line-by-line output.
	// Background goroutines populate these channels by reading from stdout/stderr pipes.
	StdoutLines <-chan string
	StderrLines <-chan string

	errch <-chan error
}

func consumeLines(ctx context.Context, wg *sync.WaitGroup, r io.Reader) <-chan string {
	ch := make(chan string, 30000)
	wg.Add(1)
	go func() {
		defer close(ch)
		defer wg.Done()
		scanner := bufio.NewScanner(r)
		for scanner.Scan() {
			// We expect to be able to always send these lines into the channel.
			// If we can't, it means the channel is full and likely there is a problem
			// in either the test or the code under test.
			select {
			case <-ctx.Done():
				return
			case ch <- scanner.Text():
				continue
			default:
				panic("line buffer is full")
			}
		}
	}()
	return ch
}

// Like [Runner.Eventually], but more specific
func (r *Runner) WaitForTextPrinted(text string, timeout time.Duration) {
	r.Eventually(func() bool {
		currentStdout := r.stdout.String()
		return strings.Contains(currentStdout, text)
	}, timeout, 50*time.Millisecond)
}

func (r *Runner) WaitForOutput(text string, timeout time.Duration) {
	require.Eventually(r, func() bool {
		currentStdout := r.stdout.String()
		currentErrout := r.stderr.String()
		return strings.Contains(currentStdout, text) || strings.Contains(currentErrout, text)
	}, timeout, 50*time.Millisecond)
}

func (r *Runner) WithStdin() {
	reader, writer := io.Pipe()
	r.stdinR = reader
	r.stdinW = writer
}

func (r *Runner) CloseStdin() {
	if r.stdinW == nil {
		panic("no standard input configured")
	}
	r.stdinW.Close()
}

func (r *Runner) SendText(text string) {
	if r.stdinW == nil {
		panic("no standard input configured")
	}
	_, err := r.stdinW.Write([]byte(text + "\n"))
	if err != nil {
		panic("Failed to to write to t.stdinW")
	}
}

func (r *Runner) RunBackground() {
	var stdoutR, stderrR io.Reader
	var stdoutW, stderrW io.WriteCloser
	stdoutR, stdoutW = io.Pipe()
	stderrR, stderrW = io.Pipe()
	ctx := cmdio.NewContext(r.ctx, &cmdio.Logger{
		Mode:   flags.ModeAppend,
		Reader: bufio.Reader{},
		Writer: stderrW,
	})

	cli := cmd.New(ctx)
	cli.SetOut(stdoutW)
	cli.SetErr(stderrW)
	cli.SetArgs(r.args)
	if r.stdinW != nil {
		cli.SetIn(r.stdinR)
	}

	errch := make(chan error)
	ctx, cancel := context.WithCancel(ctx)

	// Tee stdout/stderr to buffers.
	stdoutR = io.TeeReader(stdoutR, &r.stdout)
	stderrR = io.TeeReader(stderrR, &r.stderr)

	// Consume stdout/stderr line-by-line.
	var wg sync.WaitGroup
	r.StdoutLines = consumeLines(ctx, &wg, stdoutR)
	r.StderrLines = consumeLines(ctx, &wg, stderrR)

	// Run command in background.
	go func() {
		err := root.Execute(ctx, cli)
		if err != nil {
			r.Logf("Error running command: %s", err)
		}

		// Close pipes to signal EOF.
		stdoutW.Close()
		stderrW.Close()

		// Wait for the [consumeLines] routines to finish now that
		// the pipes they're reading from have closed.
		wg.Wait()

		if r.stdout.Len() > 0 {
			// Make a copy of the buffer such that it remains "unread".
			scanner := bufio.NewScanner(bytes.NewBuffer(r.stdout.Bytes()))
			for scanner.Scan() {
				r.Logf("[databricks stdout]: %s", scanner.Text())
			}
		}

		if r.stderr.Len() > 0 {
			// Make a copy of the buffer such that it remains "unread".
			scanner := bufio.NewScanner(bytes.NewBuffer(r.stderr.Bytes()))
			for scanner.Scan() {
				r.Logf("[databricks stderr]: %s", scanner.Text())
			}
		}

		// Make caller aware of error.
		errch <- err
		close(errch)
	}()

	// Ensure command terminates upon test completion (success or failure).
	r.Cleanup(func() {
		// Signal termination of command.
		cancel()
		// Wait for goroutine to finish.
		<-errch
	})

	r.errch = errch
}

func (r *Runner) Run() (bytes.Buffer, bytes.Buffer, error) {
	r.Helper()
	var stdout, stderr bytes.Buffer
	ctx := cmdio.NewContext(r.ctx, &cmdio.Logger{
		Mode:   flags.ModeAppend,
		Reader: bufio.Reader{},
		Writer: &stderr,
	})

	cli := cmd.New(ctx)
	cli.SetOut(&stdout)
	cli.SetErr(&stderr)
	cli.SetArgs(r.args)

	r.Logf("  args: %s", strings.Join(r.args, ", "))

	err := root.Execute(ctx, cli)
	if err != nil {
		r.Logf(" error: %s", err)
	}

	if stdout.Len() > 0 {
		// Make a copy of the buffer such that it remains "unread".
		scanner := bufio.NewScanner(bytes.NewBuffer(stdout.Bytes()))
		for scanner.Scan() {
			r.Logf("stdout: %s", scanner.Text())
		}
	}

	if stderr.Len() > 0 {
		// Make a copy of the buffer such that it remains "unread".
		scanner := bufio.NewScanner(bytes.NewBuffer(stderr.Bytes()))
		for scanner.Scan() {
			r.Logf("stderr: %s", scanner.Text())
		}
	}

	return stdout, stderr, err
}

// Like [require.Eventually] but errors if the underlying command has failed.
func (r *Runner) Eventually(condition func() bool, waitFor, tick time.Duration, msgAndArgs ...any) {
	r.Helper()
	ch := make(chan bool, 1)

	timer := time.NewTimer(waitFor)
	defer timer.Stop()

	ticker := time.NewTicker(tick)
	defer ticker.Stop()

	// Kick off condition check immediately.
	go func() { ch <- condition() }()

	for tick := ticker.C; ; {
		select {
		case err := <-r.errch:
			require.Fail(r, "Command failed", err)
			return
		case <-timer.C:
			require.Fail(r, "Condition never satisfied", msgAndArgs...)
			return
		case <-tick:
			tick = nil
			go func() { ch <- condition() }()
		case v := <-ch:
			if v {
				return
			}
			tick = ticker.C
		}
	}
}

func (r *Runner) RunAndExpectOutput(heredoc string) {
	r.Helper()
	stdout, _, err := r.Run()
	require.NoError(r, err)
	require.Equal(r, cmdio.Heredoc(heredoc), strings.TrimSpace(stdout.String()))
}

func (r *Runner) RunAndParseJSON(v any) {
	r.Helper()
	stdout, _, err := r.Run()
	require.NoError(r, err)
	err = json.Unmarshal(stdout.Bytes(), &v)
	require.NoError(r, err)
}

func NewRunner(t testutil.TestingT, ctx context.Context, args ...string) *Runner {
	return &Runner{
		TestingT: t,

		ctx:  ctx,
		args: args,
	}
}

func RequireSuccessfulRun(t testutil.TestingT, ctx context.Context, args ...string) (bytes.Buffer, bytes.Buffer) {
	t.Helper()
	r := NewRunner(t, ctx, args...)
	stdout, stderr, err := r.Run()
	require.NoError(t, err)
	return stdout, stderr
}

func RequireErrorRun(t testutil.TestingT, ctx context.Context, args ...string) (bytes.Buffer, bytes.Buffer, error) {
	t.Helper()
	r := NewRunner(t, ctx, args...)
	stdout, stderr, err := r.Run()
	require.Error(t, err)
	return stdout, stderr, err
}