import { SquidRegion } from './public-types';
import { assertTruthy } from 'assertic';
import {
  BehaviorSubject,
  debounceTime,
  distinctUntilChanged,
  filter,
  firstValueFrom,
  Observable,
  race,
  skip,
  Subject,
  switchMap,
  tap,
  timer,
} from 'rxjs';
import { AuthManager } from './auth.manager';
import { ClientIdService } from './client-id.service';
import { DestructManager } from './destruct.manager';
import { NOOP_FN } from './types';
import { createWebSocketWrapper, WebSocketWrapper } from '../../internal-common/src/websocket.impl';
import { DebugLogger } from '../../internal-common/src/utils/global.utils';
import { deserializeObj, serializeObj } from '../../internal-common/src/utils/serialization';
import { getApplicationUrl } from '../../internal-common/src/utils/http';
import {
  ClientInfoMessageToClient,
  KillMessage,
  MessageFromClient,
  MessageId,
  MessageToClient,
  SOCKET_RECONNECT_TIMEOUT,
} from '../../internal-common/src/types/socket.types';

/** @internal */
export class SocketManager {
  private readonly webSocketObserver = new Subject<MessageToClient>();
  private readonly allMessagesObserver = new Subject<MessageToClient>();
  private readonly connectionReady = new BehaviorSubject(false);
  private readonly seenMessageIds = new Set<MessageId>();
  private socket: WebSocketWrapper | undefined;
  private destructSubject = new Subject<void>();
  private lastTick: Date;
  private connectedAtLeastOnce = false;

  /**
   * On a client disconnecting, we wait for a bit to see if the client reconnects,
   * if no reconnect happens within the timeout, we consider the client to be too old.
   * This value means we wait for 5 minutes before considering the client to be too old.
   * Note: we make this a function so that we can easily override it in tests.
   */
  private clientTooOldThreshold = 30_000;
  private readonly tickInterval: NodeJS.Timeout;

  constructor(
    private readonly clientIdService: ClientIdService,
    private readonly region: SquidRegion,
    private readonly appId: string,
    private readonly messageNotificationWrapper: (fn: () => any) => any = NOOP_FN,
    private readonly destructManager: DestructManager,
    private readonly authManager: AuthManager,
  ) {
    this.destructManager.onDestruct(() => this.destruct());
    this.setupMessageAcknowledgments();
    this.connect();

    this.lastTick = new Date();
    this.tickInterval = setInterval(() => this.tick(), 5000);

    this.observeConnectionReady()
      .pipe(
        skip(1),
        filter(v => !v),
        switchMap(() => {
          return race(
            timer(this.clientTooOldThreshold),
            this.connectionReady.pipe(filter(Boolean)),
            this.destructManager.observeIsDestructing(),
          );
        }),
      )
      .subscribe(() => {
        if (this.connectionReady.value) {
          DebugLogger.debug(
            this.clientIdService.getClientId(),
            `Client reconnected before becoming too old. Ignoring...`,
          );
          return;
        }
        if (!this.destructManager.isDestructing) {
          DebugLogger.debug(this.clientIdService.getClientId(), `Client disconnected for a long period - refreshing`);
          this.refreshClient();
        }
      });

    this.observeConnectionReady()
      .pipe(filter(Boolean))
      .subscribe(() => {
        if (this.clientIdService.isClientTooOld()) {
          this.clientIdService.notifyClientNotTooOld();
        }
      });

    this.observeNotifications<ClientInfoMessageToClient>()
      .pipe(filter(m => m.type === 'clientInfo'))
      .subscribe(clientInfo => {
        console.log('Client info message received', clientInfo);
      });
  }

  refreshClient(): void {
    if (this.destructManager.isDestructing) {
      DebugLogger.debug(this.clientIdService.getClientId(), `Client too old but is destructed. Ignoring...`);
      return;
    } else if (this.clientIdService.isClientTooOld()) {
      DebugLogger.debug(this.clientIdService.getClientId(), `Client is already marked as too old. Ignoring...`);
      return;
    }
    DebugLogger.debug(this.clientIdService.getClientId(), `Notifying client too old`);
    this.clientIdService.notifyClientTooOld();

    DebugLogger.debug(this.clientIdService.getClientId(), 'Client too old. Reconnecting...');
    this.connect();
  }

  getClientInfo(): void {
    this.sendMessage({ type: 'getClientInfo' });
  }

  private tick(): void {
    const diff = Math.abs(Date.now() - this.lastTick.getTime());
    if (diff > this.clientTooOldThreshold) {
      DebugLogger.debug(
        this.clientIdService.getClientId(),
        'Tick: Client not responding for a long time. Refreshing...',
      );
      this.refreshClient();
    }
    this.lastTick = new Date();
  }

  observeNotifications<T extends MessageToClient>(): Observable<T> {
    return this.webSocketObserver.asObservable() as Observable<T>;
  }

  observeConnectionReady(): Observable<boolean> {
    return this.connectionReady.asObservable().pipe(distinctUntilChanged());
  }

  sendMessage(message: MessageFromClient): void {
    this.sendMessageAsync(message).then();
  }

  private async sendMessageAsync(message: MessageFromClient): Promise<void> {
    await firstValueFrom(this.connectionReady.pipe(filter(Boolean)));
    const authToken = await this.authManager.getToken();
    if (!this.connectionReady.value) {
      // Connection state was changed during `await this.authManager.getToken()` call. Retry.
      await this.sendMessageAsync(message);
      return;
    }
    try {
      assertTruthy(this.socket, 'Socket is undefined in sendMessageAsync');
      const serializedMessage = serializeObj({ message, authToken });
      DebugLogger.debug(this.clientIdService.getClientId(), 'Sending message to socket: ', serializedMessage);
      this.socket.send(serializedMessage);
    } catch (e) {
      if (!this.socket?.connected) {
        // Message sending is failed due to the websocket IO error. Retry.
        this.connectionReady.next(false);
        await this.sendMessageAsync(message);
      } else {
        console.error('Websocket message is ignored due to a non-recoverable error', e);
      }
    }
  }

  /**  Sends 'kill' message ignoring 'connectionReady' observable. */
  private sendKillMessage(): void {
    if (!this.socket?.connected) return;
    const message: KillMessage = { type: 'kill' };
    this.socket.send(serializeObj({ message }));
  }

  /**
   * Calls socket.close() and ignores errors.
   * Used when all we want is to close the websocket regardless of its current state.
   */
  private closeCurrentSocketSilently(): void {
    if (this.socket) {
      try {
        this.socket.close();
      } catch (_) {}
    }
  }

  private connect(): void {
    this.closeCurrentSocketSilently();
    if (this.connectionReady.value) {
      this.connectionReady.next(false);
    }
    const endpoint = getApplicationUrl(this.region, this.appId, 'ws/general')
      .replace('https', 'wss')
      .replace('http', 'ws');

    const clientId = this.clientIdService.getClientId();
    DebugLogger.debug(this.clientIdService.getClientId(), 'Connecting to socket at:', endpoint);

    const socketUri = `${endpoint}?clientId=${clientId}`;
    this.socket = createWebSocketWrapper(socketUri, {
      timeoutMillis: SOCKET_RECONNECT_TIMEOUT,
      onmessage: (e: MessageEvent): void => this.onMessage(e.data),
      onopen: (): void => {
        DebugLogger.debug(
          this.clientIdService.getClientId(),
          `Connection to socket established. Endpoint: ${endpoint}`,
        );
      },
      onreconnect: (): void => {
        DebugLogger.debug(clientId, `WebSocket reconnect event triggered`);
        if (this.clientIdService.getClientId() !== clientId) {
          DebugLogger.debug(
            clientId,
            `WebSocket reconnect event triggered - ignored because the client id changed. Old: ${this.clientIdService.getClientId()}`,
          );
          return;
        }
        if (this.connectionReady.value) {
          this.connectionReady.next(false);
        }
      },
      onclose: (): void => {
        DebugLogger.debug(clientId, `WebSocket onclose event triggered`);
        if (this.clientIdService.getClientId() !== clientId) {
          DebugLogger.debug(
            clientId,
            `WebSocket onclose event triggered - ignored because the client id changed. new: ${this.clientIdService.getClientId()}`,
          );
          return;
        }
        if (this.connectionReady.value) {
          this.connectionReady.next(false);
        }
      },
      onerror: (e: Event) => console.error('WebSocket error:', e),
    });
  }

  disconnectForTest(): void {
    this.connectionReady.next(false);
    this.socket?.close(4998);
  }

  private onConnectionReady(): void {
    this.connectionReady.next(true);
    if (!this.connectedAtLeastOnce) {
      this.connectedAtLeastOnce = true;
      return;
    }
    this.sendMessage({ type: 'catchup' });
  }

  private onMessage(messagesStr: string): void {
    if (messagesStr === 'connectionReady') {
      DebugLogger.debug(this.clientIdService.getClientId(), `Got socket message: connectionReady`);
      this.onConnectionReady();
      return;
    }
    const messages: Array<MessageToClient> = deserializeObj(messagesStr);
    for (const message of messages) {
      this.allMessagesObserver.next(message);
      if (this.seenMessageIds.has(message.messageId)) {
        continue;
      }
      this.seenMessageIds.add(message.messageId);
      DebugLogger.debug(
        this.clientIdService.getClientId(),
        new Date(),
        `Got socket message`,
        JSON.stringify(message, null, 2),
      );
      this.messageNotificationWrapper(() => {
        this.webSocketObserver.next(message);
      });
    }
  }

  private setupMessageAcknowledgments(): void {
    const ackSubject = new Subject<string>();
    this.allMessagesObserver.subscribe((message: MessageToClient) => {
      if (!message?.messageId) return;
      ackSubject.next(message.messageId);
    });

    const collectedMessageIds: string[] = [];
    ackSubject
      .pipe(
        tap(messageId => collectedMessageIds.push(messageId)),
        debounceTime(100),
      )
      .subscribe(async () => {
        const messageIds = [...collectedMessageIds.splice(0)];
        this.sendMessage({ type: 'acknowledge', payload: messageIds });
      });
  }

  private async destruct(): Promise<void> {
    this.sendKillMessage();

    // Wait until VM processes socket message before closing the socket.
    // Without the sleep below, there is a race condition between socket closing
    // and 'kill' message sending, so 'kill' may not be delivered at all.
    await firstValueFrom(timer(0));

    this.connectionReady.next(false);
    // Allow the message to be sent by waiting on a 0 timer.
    await firstValueFrom(timer(0));
    clearInterval(this.tickInterval);
    this.closeCurrentSocketSilently();
    this.webSocketObserver.complete();
    this.allMessagesObserver.complete();
    this.destructSubject.next();
  }
}
