import {
  all,
  call,
  cancel,
  fork,
  put,
  select,
  take,
  takeEvery,
  takeLeading,
} from 'typed-redux-saga/macro';

import { resetState as audioInputResetState } from 'features/AudioInput/slice';

import { addConversationChunk, markConversationChunk } from './slice';
import actions from 'features/Chat/actions';
import { startListeningWithStream } from 'app/audio/Input/sagas/startListeningWithStream';
import audioInputActions from 'app/audio/Input/actions';
import {
  controlledTranscriptionSocket,
  OnMarkCallback,
  OnTranscriptCallback,
  uncontrolledTranscriptionSocket,
} from 'app/api/stt/transcription';

import { watchReview } from './Review/sagas';
import { OnCloseBeforeMarkCallback } from 'app/api/stt/sttSocket';
import {
  getConversationMethod,
  getConversationSpeed,
} from 'features/ConversationControl/selectors';
import selectors, { getSequenceMessage } from 'features/Chat/selectors';
import { CONVERSATION_SPEED } from 'features/ConversationControl/constants';
import { isIOS } from 'utils/constants';

function* watchForAudioInput() {
  while (true) {
    const { payload } = yield* take(audioInputActions.setState);
    const conversationMethod = yield* select(getConversationMethod);

    if (conversationMethod === 'hands_free') {
      if (payload === 'pending') {
        yield* put(actions.setHandsFreeState('pending'));
      }

      if (payload === 'active') {
        yield* put(actions.setHandsFreeState('listening'));
        break;
      }
    } else {
      if (payload === 'pending') {
        yield* put(actions.setPushToTalkState('pending'));
      }

      if (payload === 'active') {
        yield* put(actions.setPushToTalkState('listening'));
        break;
      }
    }
  }
}

function* startControlledListeningWorker() {
  const id = yield* select(selectors.getConversationId);
  const sequenceNumber = yield* select(selectors.getSequenceNumber);

  yield* takeEvery(actions.stopListening, function* () {
    yield* put(audioInputActions.stopListening());
    yield* put(actions.setPushToTalkState('transcribing'));
  });

  const stream = yield* call(startListeningWithStream);

  yield* fork(watchForAudioInput);

  const { sendAudio, sendMark, onTranscript, onMark, onCloseBeforeMark, close } = yield* call(
    controlledTranscriptionSocket,
    {
      id: id!,
      sequenceNumber: sequenceNumber!,
    } as const
  );

  const closeSocketOnChatResetTask = yield* fork(function* closeSocketOnChatReset() {
    yield* take(actions.resetState);
    yield* call(close);
  });

  yield* fork(onTranscript, function* ({ chunkNumber, text }) {
    yield* put(
      addConversationChunk({
        id: id!,
        sequenceNumber: sequenceNumber!,
        chunkNumber,
        text,
        entity: 'user',
        isLast: false,
      })
    );
  } as OnTranscriptCallback);

  yield* fork(onMark, function* ({ chunkNumber }) {
    yield* put(
      markConversationChunk({
        id: id!,
        sequenceNumber: sequenceNumber!,
        chunkNumber,
        entity: 'user',
      })
    );

    const fullMessage = yield* select(
      getSequenceMessage({ id: id!, sequenceNumber: sequenceNumber!, entity: 'user' })
    );

    if (!!fullMessage.trim()) {
      yield* put(actions.setPushToTalkState('disabled'));
    } else {
      yield* put(actions.setPushToTalkState('idle'));
      yield* put(
        actions.resetConversationEntity({
          id: id!,
          sequenceNumber: sequenceNumber!,
          entity: 'user',
        })
      );
    }
  } as OnMarkCallback);

  yield* fork(onCloseBeforeMark, function* () {
    const fullMessage = yield* select(
      getSequenceMessage({ id: id!, sequenceNumber: sequenceNumber!, entity: 'user' })
    );

    if (!!fullMessage.trim()) {
      yield* put(actions.setPushToTalkState('disabled'));
    } else {
      yield* put(actions.setPushToTalkState('idle'));
      yield* put(
        actions.resetConversationEntity({
          id: id!,
          sequenceNumber: sequenceNumber!,
          entity: 'user',
        })
      );
    }
  } as OnCloseBeforeMarkCallback);

  yield* call(function* () {
    const reader = stream.getReader();
    try {
      while (true) {
        const { done, value: audio } = yield* call([reader, reader.read]);

        if (done) {
          break;
        } else {
          yield* call(sendAudio, audio);
        }
      }
    } finally {
      yield* call(sendMark);
      reader.releaseLock();
    }
  });

  yield* cancel(closeSocketOnChatResetTask);
}

function* startUncontrolledListeningWorker() {
  const id = yield* select(selectors.getConversationId);
  const sequenceNumber = yield* select(selectors.getSequenceNumber);
  const conversationSpeed = yield* select(getConversationSpeed);

  const detectionBufferMs = (() => {
    switch (conversationSpeed) {
      case CONVERSATION_SPEED.SLOW: {
        return 2500;
      }
      case CONVERSATION_SPEED.FAST: {
        return 500;
      }
      case CONVERSATION_SPEED.NORMAL:
      default: {
        return 1250;
      }
    }
  })();

  const stream = yield* call(startListeningWithStream);

  yield* fork(watchForAudioInput);

  const { sendAudio, onTranscript, onMark, onCloseBeforeMark, close } = yield* call(
    uncontrolledTranscriptionSocket,
    {
      id: id!,
      sequenceNumber: sequenceNumber!,
      detectionBufferMs,
    } as const
  );

  const closeSocketOnChatResetTask = yield* fork(function* closeSocketOnChatReset() {
    yield* take(actions.resetState);
    yield* call(close);
  });

  yield* fork(onTranscript, function* ({ chunkNumber, text }) {
    yield* put(
      addConversationChunk({
        id: id!,
        sequenceNumber: sequenceNumber!,
        chunkNumber,
        text,
        entity: 'user',
        isLast: false,
      })
    );
  } as OnTranscriptCallback);

  yield* fork(onMark, function* ({ chunkNumber }) {
    yield* put(
      markConversationChunk({
        id: id!,
        sequenceNumber: sequenceNumber!,
        chunkNumber,
        entity: 'user',
      })
    );
    yield* put(audioInputActions.stopListening());

    const fullMessage = yield* select(
      getSequenceMessage({ id: id!, sequenceNumber: sequenceNumber!, entity: 'user' })
    );

    if (!!fullMessage.trim()) {
      yield* put(actions.setHandsFreeState('disabled'));
    } else {
      yield* put(actions.setHandsFreeState('idle'));
      yield* put(
        actions.resetConversationEntity({
          id: id!,
          sequenceNumber: sequenceNumber!,
          entity: 'user',
        })
      );
    }
  } as OnMarkCallback);

  yield* fork(onCloseBeforeMark, function* () {
    yield* put(audioInputActions.stopListening());

    const fullMessage = yield* select(
      getSequenceMessage({ id: id!, sequenceNumber: sequenceNumber!, entity: 'user' })
    );

    if (!!fullMessage.trim()) {
      yield* put(actions.setHandsFreeState('disabled'));
    } else {
      yield* put(actions.setHandsFreeState('idle'));
      yield* put(
        actions.resetConversationEntity({
          id: id!,
          sequenceNumber: sequenceNumber!,
          entity: 'user',
        })
      );
    }
  } as OnCloseBeforeMarkCallback);

  yield* call(function* () {
    const reader = stream.getReader();
    try {
      while (true) {
        const { done, value: audio } = yield* call([reader, reader.read]);

        if (done) {
          break;
        } else {
          yield* call(sendAudio, audio);
        }
      }
    } finally {
      reader.releaseLock();
    }
  });

  yield* cancel(closeSocketOnChatResetTask);
}

function* startConversation() {
  const conversationMethod = yield* select(getConversationMethod);

  if (conversationMethod === 'hands_free') {
    yield* put(actions.setPushToTalkState(null));
    yield* put(actions.setHandsFreeState('disabled'));
  } else {
    yield* put(actions.setPushToTalkState('disabled'));
    yield* put(actions.setHandsFreeState(null));
  }

  let conversationId = yield* select(selectors.getConversationId);

  while (!conversationId) {
    const { payload: newConversationId } = yield* take(actions.setConversationId);

    conversationId = newConversationId;
  }

  if (conversationMethod === 'hands_free') {
    yield* put(actions.setHandsFreeState('idle'));
  } else {
    yield* put(actions.setPushToTalkState('idle'));
  }
}

function* watchConversationIdleStateTriggerFromBotMark() {
  while (true) {
    const { payload } = yield* take(actions.markConversationChunk);

    if (payload.entity === 'bot') {
      const conversationMethod = yield* select(getConversationMethod);

      if (conversationMethod === 'hands_free') {
        yield* put(actions.setHandsFreeState('idle'));
      } else {
        yield* put(actions.setPushToTalkState('idle'));
      }
    }
  }
}

function* watchConversationSequenceMarksTriggerOnPlayedState() {
  let playedOnce = false;
  while (!playedOnce) {
    const { payload } = yield* take(actions.setChunkPlayedState);

    if (!payload) {
      const sequenceChunks = yield* select(selectors.getSequenceChunks);

      return isIOS ? sequenceChunks : null;
    } else {
      playedOnce = true;
      const id = yield* select(selectors.getConversationId);
      const sequenceMessage = yield* select(selectors.getSequenceMessage);

      return id ? sequenceMessage : null;
    }
  }
}

export function* watchChat() {
  yield* all([
    fork(watchReview),
    fork(watchConversationIdleStateTriggerFromBotMark),
    fork(watchConversationSequenceMarksTriggerOnPlayedState),
    takeEvery(actions.startControlledListening, startControlledListeningWorker),
    takeLeading(actions.startUncontrolledListening, startUncontrolledListeningWorker),
    takeLeading(actions.startConversation, startConversation),
    takeEvery(actions.resetState, function* () {
      yield* put(audioInputResetState());
    }),
  ]);
}
