package progress

import (
	"context"
	"fmt"
	"strings"

	"github.com/databricks/databricks-sdk-go"
	"github.com/databricks/databricks-sdk-go/service/pipelines"
)

// The dlt backend computes events for pipeline runs which are accessable through
// the 2.0/pipelines/{pipeline_id}/events API
//
// There are 4 levels for these events: ("ERROR", "WARN", "INFO", "METRICS")
//
// Here's short introduction to a few important events we display on the console:
//
// 1. `update_progress`: A state transition occured for the entire pipeline update
// 2. `flow_progress`: A state transition occured for a single flow in the pipeine
type ProgressEvent pipelines.PipelineEvent

func (event *ProgressEvent) String() string {
	result := strings.Builder{}
	result.WriteString(event.Timestamp + " ")

	// Print event type with some padding to make output more pretty
	result.WriteString(fmt.Sprintf("%-15s", event.EventType) + " ")

	result.WriteString(event.Level.String() + " ")
	result.WriteString(fmt.Sprintf(`"%s"`, event.Message))

	// construct error string if level=`Error`
	if event.Level == pipelines.EventLevelError && event.Error != nil {
		for _, exception := range event.Error.Exceptions {
			result.WriteString(fmt.Sprintf("\n%s", exception.Message))
		}
	}
	return result.String()
}

func (event *ProgressEvent) IsInplaceSupported() bool {
	return false
}

// TODO: Add inplace logging to pipelines. https://github.com/databricks/cli/issues/280
type UpdateTracker struct {
	UpdateId             string
	PipelineId           string
	LatestEventTimestamp string
	w                    *databricks.WorkspaceClient
}

func NewUpdateTracker(pipelineId string, updateId string, w *databricks.WorkspaceClient) *UpdateTracker {
	return &UpdateTracker{
		w:                    w,
		PipelineId:           pipelineId,
		UpdateId:             updateId,
		LatestEventTimestamp: "",
	}
}

// To keep the logic simple we do not use pagination. This means that if there are
// more than 100 new events since the last query then we will miss out on progress events.
//
// This is fine because:
// 1. This should happen fairly rarely if ever
// 2. There is no expectation of the console progress logs being a complete representation
//
// # If a user needs the complete logs, they can always visit the run URL
//
// NOTE: Incase we want inplace logging, then we will need to implement pagination
func (l *UpdateTracker) Events(ctx context.Context) ([]ProgressEvent, error) {
	// create filter to fetch only new events
	filter := fmt.Sprintf(`update_id = '%s'`, l.UpdateId)
	if l.LatestEventTimestamp != "" {
		filter = filter + fmt.Sprintf(" AND timestamp > '%s'", l.LatestEventTimestamp)
	}

	// we only check the most recent 100 events for progress
	events, err := l.w.Pipelines.ListPipelineEventsAll(ctx, pipelines.ListPipelineEventsRequest{
		PipelineId: l.PipelineId,
		MaxResults: 100,
		Filter:     filter,
	})
	if err != nil {
		return nil, err
	}

	result := make([]ProgressEvent, 0)
	// we iterate in reverse to return events in chronological order
	for i := len(events) - 1; i >= 0; i-- {
		event := events[i]
		// filter to only include update_progress and flow_progress events
		if event.EventType == "flow_progress" || event.EventType == "update_progress" {
			result = append(result, ProgressEvent(event))
		}
	}

	// update latest event timestamp for next time
	if len(result) > 0 {
		l.LatestEventTimestamp = result[len(result)-1].Timestamp
	}

	return result, nil
}