import {
  TransactionAsync,
  TransactionAsyncState,
} from '@relationalai/rai-sdk-javascript/web';
import { sleep } from '@relationalai/utils';

import {
  TransactionEvent,
  TransactionEventsResponse,
} from '../utils/profiler/types';

const POLL_INTERVAL_MS = 1500;

export type PollerConnector = {
  // it is necessary to resume the poller if it was paused
  initialContinuationToken: string;
  getEvents: (
    txnID: string,
    continuationToken: string,
  ) => Promise<TransactionEventsResponse>;
  getTransaction: () => Promise<TransactionAsync>;
  getPaused: () => boolean;
  addEvents: (
    continuationToken: string,
    version: string | undefined,
    events: TransactionEvent[],
    lastDate: Date,
  ) => void;
  // it is necessary to check timing in the tests
  getTime?: () => Date;
};

export async function pollProfileEvents(connector: PollerConnector) {
  let continuationToken = connector.initialContinuationToken || '';
  let transaction = await connector.getTransaction();

  // eslint-disable-next-line no-constant-condition
  while (true) {
    let response;

    try {
      response = await connector.getEvents(transaction.id, continuationToken);
    } catch (error: any) {
      // If it's a backup error, just wait
      if (error.status === 429) {
        await sleep(POLL_INTERVAL_MS);
        continue;
      }

      // Otherwise, rethrow
      throw error;
    }

    const lastPollFinished = connector.getTime
      ? connector.getTime()
      : new Date();

    // Get latest transaction state
    transaction = await connector.getTransaction();

    // Process events
    const lastTime = getLastTime(
      transaction,
      response.events,
      lastPollFinished,
    );

    connector.addEvents(
      response.continuation_token,
      response.version,
      response.events,
      lastTime,
    );

    // Set continuation token for next poll
    continuationToken = response.continuation_token;

    // Break out if we're done
    if (transaction.state === TransactionAsyncState.ABORTED) {
      break;
    }

    if (continuationToken === '') {
      break;
    }

    if (connector.getPaused()) {
      break;
    }

    // Wait to poll again
    await sleep(POLL_INTERVAL_MS);
  }
}

// export only for testing
export function getLastTime(
  transaction: TransactionAsync,
  latestEvents: TransactionEvent[],
  lastPollFinished: Date,
): Date {
  // if the transaction is completed, we use the timestamp of the last event
  // if the transaction is aborted, we use the finished_at timestamp
  // if the transaction is running, we use lastPollFinished or the current date
  if (transaction.state === TransactionAsyncState.COMPLETED) {
    if (latestEvents.length > 0) {
      return new Date(latestEvents[latestEvents.length - 1].timestamp);
    }

    // We should always have a finished_at if it's completed, but
    // putting this here to appease the typechecker.
    return transaction.finished_at
      ? new Date(transaction.finished_at)
      : new Date();
  } else if (
    transaction.state === TransactionAsyncState.ABORTED &&
    transaction.finished_at
  ) {
    return new Date(transaction.finished_at);
  } else {
    return lastPollFinished;
  }
}
