import {
  YDocSyncMessage,
  YDocSyncMessageType,
  YDocWSMessageType,
  deserializeUint8Array,
  serializeUint8Array,
} from '@rmvw/x-common';
import { Observable } from 'lib0/observable';
import debounce from 'lodash.debounce';
import { Awareness, applyAwarenessUpdate, encodeAwarenessUpdate, removeAwarenessStates } from 'y-protocols/awareness';
import * as Y from 'yjs';

import Logger from '../observability/Logger';

import YDocBCTransport from './YDocBCTransport';
import YDocWSTransport from './YDocWSTransport';

type DocUpdateMiddlewareHandler = (doc: Y.Doc, data: Uint8Array, origin: any, next: () => void) => void;

const DISPATCH_WS_DOC_UPDATES_DELAY_MS = 100;

/**
 * Responsible for synchronizing YDoc state across browser instances and with back-end
 */
export default class YDocSynchronizer extends Observable<'synced'> {
  private _awareness: Awareness;
  private _bcTransport?: YDocBCTransport;
  private _doc: Y.Doc;
  private _docId: string;
  private _docUpdateMiddleware: DocUpdateMiddlewareHandler[];
  private _queuedWSDocUpdates: Uint8Array[] = [];
  private _wsTransport?: YDocWSTransport;

  private _bcEventListener = (msg: YDocSyncMessage) => {
    switch (msg.type) {
      case YDocSyncMessageType.AWARENESS_SYNC_REQUEST: {
        this._broadcastMessageBC({
          type: YDocSyncMessageType.AWARENESS_UPDATE,
          docId: this._docId,
          update: serializeUint8Array(encodeAwarenessUpdate(this._awareness, [...this._awareness.getStates().keys()])),
        });
        break;
      }

      case YDocSyncMessageType.AWARENESS_UPDATE: {
        applyAwarenessUpdate(this._awareness, deserializeUint8Array(msg.update), this);
        break;
      }

      case YDocSyncMessageType.DOC_SYNC_REQUEST: {
        // Reply to peer SYNC request with UPDATE diff
        this._broadcastMessageBC({
          type: YDocSyncMessageType.DOC_UPDATE,
          docId: this._docId,
          diff: serializeUint8Array(Y.encodeStateAsUpdate(this._doc, deserializeUint8Array(msg.state))),
          syncInProgress: true,
        });
        break;
      }

      // Apply UPDATE diff
      case YDocSyncMessageType.DOC_UPDATE: {
        try {
          // Execute the middleware stack
          const _getNextMiddleware = (i = 0): (() => void) =>
            this._docUpdateMiddleware[i]
              ? () =>
                  this._docUpdateMiddleware[i](
                    this._doc,
                    deserializeUint8Array(msg.diff),
                    this,
                    _getNextMiddleware(i + 1)
                  )
              : () => null;
          _getNextMiddleware()();
        } catch (e) {
          Logger.error(e as Error, '[YDocSynchronizer] DOC_UPDATE error');
        }
        break;
      }

      default: {
        Logger.error(`[YDocSynchronizer] Unknown message: ${msg}`);
        break;
      }
    }
  };

  private _wsEventListener = (msg: YDocSyncMessage) => {
    switch (msg.type) {
      case YDocSyncMessageType.AWARENESS_SYNC_REQUEST: {
        this._broadcastMessageWS({
          type: YDocSyncMessageType.AWARENESS_UPDATE,
          docId: this._docId,
          update: serializeUint8Array(encodeAwarenessUpdate(this._awareness, [...this._awareness.getStates().keys()])),
        });
        break;
      }

      case YDocSyncMessageType.AWARENESS_UPDATE: {
        applyAwarenessUpdate(this._awareness, deserializeUint8Array(msg.update), this);
        break;
      }

      case YDocSyncMessageType.DOC_SYNC_REQUEST: {
        // Reply to server SYNC request with UPDATE diff
        this._broadcastMessageWS({
          type: YDocSyncMessageType.DOC_UPDATE,
          docId: this._docId,
          diff: serializeUint8Array(Y.encodeStateAsUpdate(this._doc, deserializeUint8Array(msg.state))),
          syncInProgress: true,
        });
        break;
      }

      // Apply UPDATE diff from server
      case YDocSyncMessageType.DOC_UPDATE: {
        try {
          // Execute the middleware stack
          const _getNextMiddleware = (i = 0): (() => void) => {
            return this._docUpdateMiddleware[i]
              ? () =>
                  this._docUpdateMiddleware[i](
                    this._doc,
                    deserializeUint8Array(msg.diff),
                    this,
                    _getNextMiddleware(i + 1)
                  )
              : () => {
                  // As final step, fire a synced event when we've received a sync completion message
                  if (msg.syncInProgress) {
                    this.emit('synced', []);
                  }
                };
          };
          _getNextMiddleware()();
        } catch (e) {
          Logger.error(e as Error, '[YDocSynchronizer] DOC_UPDATE error');
        }
        break;
      }

      default: {
        Logger.error(`[YDocSynchronizer] Unknown message: ${msg}`);
        break;
      }
    }
  };

  private _dispatchWSDocUpdatesImmediately = () => {
    // Merge queued updates and forward to server
    this._broadcastMessageWS({
      type: YDocSyncMessageType.DOC_UPDATE,
      docId: this._docId,
      diff: serializeUint8Array(Y.mergeUpdates(this._queuedWSDocUpdates)),
    });

    this._queuedWSDocUpdates = [];
  };

  private _dispatchWSDocUpdatesDebounced = debounce(
    this._dispatchWSDocUpdatesImmediately,
    DISPATCH_WS_DOC_UPDATES_DELAY_MS,
    { maxWait: DISPATCH_WS_DOC_UPDATES_DELAY_MS }
  );

  private _ydocUpdateHandler = (update: Uint8Array, origin: any) => {
    if (origin !== this) {
      // Queue WS updates to avoid overloading server
      this._queuedWSDocUpdates.push(update);
      this._dispatchWSDocUpdatesDebounced();

      // But forward BC updates to other tabs/windows immediately
      this._broadcastMessageBC({
        type: YDocSyncMessageType.DOC_UPDATE,
        docId: this._docId,
        diff: serializeUint8Array(update),
      });
    }
  };

  private _awarenessUpdateHandler = (
    { added, updated, removed }: { added: number[]; updated: number[]; removed: number[] },
    origin: any
  ) => {
    if (origin !== this) {
      const modifiedClients = [...added, ...updated, ...removed];
      this._broadcastMessage({
        type: YDocSyncMessageType.AWARENESS_UPDATE,
        docId: this._docId,
        update: serializeUint8Array(encodeAwarenessUpdate(this._awareness, modifiedClients)),
      });
    }
  };

  constructor(
    docId: string,
    yDoc: Y.Doc,
    transports: {
      bcTransport: YDocBCTransport;
      wsTransport?: YDocWSTransport;
    },
    options?: {
      autoConnect?: boolean;
      docUpdateMiddleware?: DocUpdateMiddlewareHandler[];
    }
  ) {
    super();

    this._docId = docId;
    this._doc = yDoc;

    this._bcTransport = transports.bcTransport;
    this._wsTransport = transports.wsTransport;

    this._awareness = new Awareness(this._doc);
    this._docUpdateMiddleware = [
      ...(options?.docUpdateMiddleware ?? []),

      // Default middleware
      (doc, data, origin, next) => {
        Y.applyUpdate(doc, data, origin);
        next();
      },
    ];

    if (options?.autoConnect) {
      this.connect();
    }
  }

  private _broadcastMessage(msg: YDocSyncMessage) {
    this._broadcastMessageBC(msg);
    this._broadcastMessageWS(msg);
  }

  private _broadcastMessageBC(msg: YDocSyncMessage) {
    this._bcTransport?.broadcast(msg);
  }

  private _broadcastMessageWS(msg: YDocSyncMessage) {
    this._wsTransport?.send({ type: YDocWSMessageType.SYNC_DOC, docId: msg.docId, msg });
  }

  connect(): this {
    this._doc.on('update', this._ydocUpdateHandler);
    this._awareness.on('update', this._awarenessUpdateHandler);
    this._connectBC();
    this._connectWS();

    // Solicit peers + server for latest state
    this._broadcastMessage({
      type: YDocSyncMessageType.DOC_SYNC_REQUEST,
      docId: this._docId,
      state: serializeUint8Array(Y.encodeStateVector(this._doc)),
    });

    // Send client awareness state to peers + server
    this._broadcastMessage({
      type: YDocSyncMessageType.AWARENESS_UPDATE,
      docId: this._docId,
      update: serializeUint8Array(encodeAwarenessUpdate(this._awareness, [this._doc.clientID])),
    });

    return this;
  }

  private _connectBC() {
    this._bcTransport?.addEventListener(this._docId, this._bcEventListener);

    // Proactively broadcast our latest state to BC peers
    this._broadcastMessageBC({
      type: YDocSyncMessageType.DOC_UPDATE,
      docId: this._docId,
      diff: serializeUint8Array(Y.encodeStateAsUpdate(this._doc)),
    });
  }

  private _connectWS() {
    this._wsTransport?.addEventListener(this._docId, this._wsEventListener);
  }

  disconnect() {
    this.destroy();
  }

  private _disconnectBC() {
    this._bcTransport?.removeEventListener(this._docId, this._bcEventListener);
  }

  private _disconnectWS() {
    this._wsTransport?.removeEventListener(this._docId, this._wsEventListener);
  }

  override destroy(): void {
    // Destroy self awareness state
    removeAwarenessStates(this._awareness, [this._doc.clientID], 'destroy');

    // Flush any pending updates
    this._dispatchWSDocUpdatesDebounced.flush();

    super.destroy();
    this._disconnectWS();
    this._disconnectBC();
    this._awareness.off('update', this._awarenessUpdateHandler);
    this._doc.off('update', this._ydocUpdateHandler);
  }

  disableLocalAwarenessState(): this {
    this._awareness.setLocalState(null);
    return this;
  }

  enableLocalAwarenessState(): this {
    this._awareness.setLocalState({});
    return this;
  }

  getAwareness(): Awareness {
    return this._awareness;
  }

  getDoc(): Y.Doc {
    return this._doc;
  }

  getDocId(): string {
    return this._docId;
  }

  addDocUpdateMiddleware(middleware: DocUpdateMiddlewareHandler[]): this {
    this._docUpdateMiddleware = [...middleware, ...this._docUpdateMiddleware];
    return this;
  }
}
