import { PayloadAction } from '@reduxjs/toolkit';
import { all, put, select, takeLatest } from 'redux-saga/effects';

import { TransactionTag } from '@relationalai/console-state';
import {
  SdkError,
  TransactionAsyncResult,
} from '@relationalai/rai-sdk-javascript/web';

import { handleCellResponse, takeLatestPerKey } from '../common';
import { getRaiContext, RaiContext } from '../raiContext';
import {
  BasePayload,
  CellStatus,
  CellType,
  getAllQueryCells,
  getCell,
} from './notebookSlice';

// Run a query transaction against Rel.
function* queryCell(action: PayloadAction<BasePayload>) {
  const cellId = action.payload.id;
  const cell = getCell(cellId)(yield select());

  // We need to inspect the global Redux state to know if this indeed is a
  // `query` or `update` cell. If not don't do anything.
  if (cell.type !== CellType.QUERY && cell.type !== CellType.UPDATE) {
    return;
  }

  const {
    databaseId,
    sdkClient,
    engineName,
  }: RaiContext = yield getRaiContext();

  // If we don't have a `cellSource` set yet we won't run the transaction. Short
  // circuit the generator.
  if (!cell.source) {
    return;
  }

  const readOnly = cell.type === CellType.QUERY;

  const queryInputs = (cell.inputs || [])
    .filter(ci => ci.file?.content)
    .map(ci => ({ name: ci.relation, value: ci.file?.content || '' }));

  let response: TransactionAsyncResult | undefined;
  let err: SdkError | undefined;

  try {
    yield put({
      type: 'notebook/setTransactionId',
      payload: { id: cellId, transactionId: undefined },
    });

    yield put({
      type: 'notebook/changeStatus',
      payload: { id: cellId, status: CellStatus.COMPUTING },
    });

    response = yield sdkClient.execAsync(
      databaseId,
      engineName,
      cell.source,
      queryInputs,
      readOnly,
      [TransactionTag.CONSOLE_USER],
    );

    if (response && 'transaction' in response) {
      const txnId = response.transaction.id;

      if (!('results' in response)) {
        yield put({
          type: 'notebook/setTransactionId',
          payload: { id: cellId, transactionId: txnId },
        });

        response = yield sdkClient.pollTransaction(txnId);
      }
    }
  } catch (error_: any) {
    err = error_;
  }

  yield handleCellResponse(response, err, cellId, undefined, cell.type, true);

  if (cell.type === CellType.UPDATE) {
    // In case of an `update` transaction we want all query cells to reflect the
    // potential changes. So we rerun them all.
    yield put({
      type: 'notebook/queryAll',
    });
  }

  yield put({
    type: 'notebook/changeStatus',
    payload: { id: cellId, status: CellStatus.DONE },
  });
}

function* queryAllCells() {
  const queryCells = getAllQueryCells(yield select());

  for (const cell of queryCells) {
    yield put({
      type: 'notebook/run',
      payload: {
        id: cell.id,
      },
    });
  }
}

function* cancelTransaction(action: PayloadAction<BasePayload>) {
  const { sdkClient }: RaiContext = yield getRaiContext();

  const cellId = action.payload.id;
  const cell = getCell(cellId)(yield select());

  if (cell && cell.status === CellStatus.COMPUTING && cell.transactionId) {
    try {
      yield put({
        type: 'notebook/changeStatus',
        payload: {
          id: cell.id,
          status: CellStatus.CANCELLING,
        },
      });

      yield sdkClient.cancelTransaction(cell.transactionId);
    } catch (error_: any) {
      yield put({
        type: 'notebook/changeStatus',
        payload: {
          id: cell.id,
          status: CellStatus.COMPUTING,
        },
      });

      if (error_.status === 501) {
        yield put({
          type: 'notebook/setCellError',
          payload: {
            id: cell.id,
            error: {
              message:
                'Transaction cancellation is not implemented on this engine. Try to create a fresh engine.',
            },
          },
        });
      } else {
        yield put({
          type: 'notebook/setCellError',
          payload: {
            id: cell.id,
            error: {
              message: 'Internal error while cancelling transaction.',
            },
          },
        });
      }
    }
  }
}

function* cancelOnDelete(action: PayloadAction<BasePayload>) {
  yield put({
    type: 'notebook/cancelTransaction',
    payload: {
      id: action.payload.id,
    },
  });
}

function* querySaga() {
  // We want to cancel, at least the saga we cannot cancel the raiTransaction
  // yet, running the middleware for a dispatched action as soon as a newer one
  // arrived. But we want to do this on the level of each cell.
  yield all([
    takeLatestPerKey(['notebook/run'], queryCell, 'id'),
    takeLatestPerKey(['notebook/requestSourceChange'], queryCell, 'id'),
    takeLatestPerKey(['notebook/cancelTransaction'], cancelTransaction, 'id'),
    takeLatestPerKey(['notebook/requestDelete'], cancelOnDelete, 'id'),
    takeLatest(['notebook/queryAll'], queryAllCells),
  ]);
}

export default querySaga;
