package run

import (
	"bytes"
	"context"
	"testing"
	"time"

	"github.com/databricks/cli/bundle"
	"github.com/databricks/cli/bundle/config"
	"github.com/databricks/cli/bundle/config/resources"
	"github.com/databricks/cli/libs/cmdio"
	"github.com/databricks/cli/libs/flags"
	"github.com/databricks/databricks-sdk-go/experimental/mocks"
	"github.com/databricks/databricks-sdk-go/service/jobs"
	"github.com/stretchr/testify/mock"
	"github.com/stretchr/testify/require"
)

func TestConvertPythonParams(t *testing.T) {
	job := &resources.Job{
		JobSettings: &jobs.JobSettings{
			Tasks: []jobs.Task{
				{PythonWheelTask: &jobs.PythonWheelTask{
					PackageName: "my_test_code",
					EntryPoint:  "run",
				}},
			},
		},
	}
	b := &bundle.Bundle{
		Config: config.Root{
			Resources: config.Resources{
				Jobs: map[string]*resources.Job{
					"test_job": job,
				},
			},
		},
	}
	runner := jobRunner{key: "test", bundle: b, job: job}

	opts := &Options{
		Job: JobOptions{},
	}
	runner.convertPythonParams(opts)
	require.NotContains(t, opts.Job.notebookParams, "__python_params")

	opts = &Options{
		Job: JobOptions{
			pythonParams: []string{"param1", "param2", "param3"},
		},
	}
	runner.convertPythonParams(opts)
	require.Contains(t, opts.Job.notebookParams, "__python_params")
	require.Equal(t, opts.Job.notebookParams["__python_params"], `["param1","param2","param3"]`)
}

func TestJobRunnerCancel(t *testing.T) {
	job := &resources.Job{
		ID: "123",
	}
	b := &bundle.Bundle{
		Config: config.Root{
			Resources: config.Resources{
				Jobs: map[string]*resources.Job{
					"test_job": job,
				},
			},
		},
	}

	runner := jobRunner{key: "test", bundle: b, job: job}

	m := mocks.NewMockWorkspaceClient(t)
	b.SetWorkpaceClient(m.WorkspaceClient)

	jobApi := m.GetMockJobsAPI()
	jobApi.EXPECT().ListRunsAll(mock.Anything, jobs.ListRunsRequest{
		ActiveOnly: true,
		JobId:      123,
	}).Return([]jobs.BaseRun{
		{RunId: 1},
		{RunId: 2},
	}, nil)

	mockWait := &jobs.WaitGetRunJobTerminatedOrSkipped[struct{}]{
		Poll: func(time time.Duration, f func(j *jobs.Run)) (*jobs.Run, error) {
			return nil, nil
		},
	}
	jobApi.EXPECT().CancelRun(mock.Anything, jobs.CancelRun{
		RunId: 1,
	}).Return(mockWait, nil)
	jobApi.EXPECT().CancelRun(mock.Anything, jobs.CancelRun{
		RunId: 2,
	}).Return(mockWait, nil)

	err := runner.Cancel(context.Background())
	require.NoError(t, err)
}

func TestJobRunnerCancelWithNoActiveRuns(t *testing.T) {
	job := &resources.Job{
		ID: "123",
	}
	b := &bundle.Bundle{
		Config: config.Root{
			Resources: config.Resources{
				Jobs: map[string]*resources.Job{
					"test_job": job,
				},
			},
		},
	}

	runner := jobRunner{key: "test", bundle: b, job: job}

	m := mocks.NewMockWorkspaceClient(t)
	b.SetWorkpaceClient(m.WorkspaceClient)

	jobApi := m.GetMockJobsAPI()
	jobApi.EXPECT().ListRunsAll(mock.Anything, jobs.ListRunsRequest{
		ActiveOnly: true,
		JobId:      123,
	}).Return([]jobs.BaseRun{}, nil)

	jobApi.AssertNotCalled(t, "CancelRun")

	err := runner.Cancel(context.Background())
	require.NoError(t, err)
}

func TestJobRunnerRestart(t *testing.T) {
	for _, jobSettings := range []*jobs.JobSettings{
		{},
		{
			Continuous: &jobs.Continuous{
				PauseStatus: jobs.PauseStatusPaused,
			},
		},
	} {
		job := &resources.Job{
			ID:          "123",
			JobSettings: jobSettings,
		}
		b := &bundle.Bundle{
			Config: config.Root{
				Resources: config.Resources{
					Jobs: map[string]*resources.Job{
						"test_job": job,
					},
				},
			},
		}

		runner := jobRunner{key: "test", bundle: b, job: job}

		m := mocks.NewMockWorkspaceClient(t)
		b.SetWorkpaceClient(m.WorkspaceClient)
		ctx := context.Background()
		ctx = cmdio.InContext(ctx, cmdio.NewIO(flags.OutputText, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, "", ""))
		ctx = cmdio.NewContext(ctx, cmdio.NewLogger(flags.ModeAppend))

		jobApi := m.GetMockJobsAPI()
		jobApi.EXPECT().ListRunsAll(mock.Anything, jobs.ListRunsRequest{
			ActiveOnly: true,
			JobId:      123,
		}).Return([]jobs.BaseRun{
			{RunId: 1},
			{RunId: 2},
		}, nil)

		// Mock the runner cancelling existing job runs.
		mockWait := &jobs.WaitGetRunJobTerminatedOrSkipped[struct{}]{
			Poll: func(time time.Duration, f func(j *jobs.Run)) (*jobs.Run, error) {
				return nil, nil
			},
		}
		jobApi.EXPECT().CancelRun(mock.Anything, jobs.CancelRun{
			RunId: 1,
		}).Return(mockWait, nil)
		jobApi.EXPECT().CancelRun(mock.Anything, jobs.CancelRun{
			RunId: 2,
		}).Return(mockWait, nil)

		// Mock the runner triggering a job run
		mockWaitForRun := &jobs.WaitGetRunJobTerminatedOrSkipped[jobs.RunNowResponse]{
			Poll: func(d time.Duration, f func(*jobs.Run)) (*jobs.Run, error) {
				return &jobs.Run{
					State: &jobs.RunState{
						ResultState: jobs.RunResultStateSuccess,
					},
				}, nil
			},
		}
		jobApi.EXPECT().RunNow(mock.Anything, jobs.RunNow{
			JobId: 123,
		}).Return(mockWaitForRun, nil)

		// Mock the runner getting the job output
		jobApi.EXPECT().GetRun(mock.Anything, jobs.GetRunRequest{}).Return(&jobs.Run{}, nil)

		_, err := runner.Restart(ctx, &Options{})
		require.NoError(t, err)
	}
}

func TestJobRunnerRestartForContinuousUnpausedJobs(t *testing.T) {
	job := &resources.Job{
		ID: "123",
		JobSettings: &jobs.JobSettings{
			Continuous: &jobs.Continuous{
				PauseStatus: jobs.PauseStatusUnpaused,
			},
		},
	}
	b := &bundle.Bundle{
		Config: config.Root{
			Resources: config.Resources{
				Jobs: map[string]*resources.Job{
					"test_job": job,
				},
			},
		},
	}

	runner := jobRunner{key: "test", bundle: b, job: job}

	m := mocks.NewMockWorkspaceClient(t)
	b.SetWorkpaceClient(m.WorkspaceClient)
	ctx := context.Background()
	ctx = cmdio.InContext(ctx, cmdio.NewIO(flags.OutputText, &bytes.Buffer{}, &bytes.Buffer{}, &bytes.Buffer{}, "", "..."))
	ctx = cmdio.NewContext(ctx, cmdio.NewLogger(flags.ModeAppend))

	jobApi := m.GetMockJobsAPI()

	// The runner should not try and cancel existing job runs for unpaused continuous jobs.
	jobApi.AssertNotCalled(t, "ListRunsAll")
	jobApi.AssertNotCalled(t, "CancelRun")

	// Mock the runner triggering a job run
	mockWaitForRun := &jobs.WaitGetRunJobTerminatedOrSkipped[jobs.RunNowResponse]{
		Poll: func(d time.Duration, f func(*jobs.Run)) (*jobs.Run, error) {
			return &jobs.Run{
				State: &jobs.RunState{
					ResultState: jobs.RunResultStateSuccess,
				},
			}, nil
		},
	}
	jobApi.EXPECT().RunNow(mock.Anything, jobs.RunNow{
		JobId: 123,
	}).Return(mockWaitForRun, nil)

	// Mock the runner getting the job output
	jobApi.EXPECT().GetRun(mock.Anything, jobs.GetRunRequest{}).Return(&jobs.Run{}, nil)

	_, err := runner.Restart(ctx, &Options{})
	require.NoError(t, err)
}