/* eslint-disable jsdoc/require-jsdoc */
import { defer, finalize, Observable, Subject } from 'rxjs';
import { IntegrationId } from '../../internal-common/src/public-types/communication.public-types';
import { QueueMessageToClient } from '../../internal-common/src/types/socket.types';
import { LockManager } from '../../internal-common/src/utils/lock.manager';
import { DestructManager } from './destruct.manager';
import { RpcManager } from './rpc.manager';
import { SocketManager } from './socket.manager';
import { TopicName } from '../../internal-common/src/public-types/backend.public-types';

/** @internal */
export class QueueManagerFactory {
  private readonly queueManagers: Map<IntegrationId, Map<TopicName, QueueManagerImpl>> = new Map();

  constructor(
    private readonly rpcManager: RpcManager,
    private readonly socketManager: SocketManager,
    destructManager: DestructManager,
  ) {
    this.socketManager.observeNotifications<QueueMessageToClient>().subscribe(message => {
      const queueManager = this.getOrUndefined(message.integrationId, message.topicName);
      if (!queueManager) {
        return;
      }
      queueManager.onMessages(message.payload);
    });

    destructManager.onPreDestruct(() => {
      for (const queueManagersForIntegration of this.queueManagers.values()) {
        for (const queueManager of queueManagersForIntegration.values()) {
          queueManager.destruct();
        }
      }
    });
  }

  get<T>(integrationId: IntegrationId, topicName: string): QueueManager<T> {
    let integrationQueueManagers = this.queueManagers.get(integrationId);
    if (!integrationQueueManagers) {
      integrationQueueManagers = new Map();
      this.queueManagers.set(integrationId, integrationQueueManagers);
    }

    let queueManager = integrationQueueManagers.get(topicName);
    if (!queueManager) {
      queueManager = new QueueManagerImpl(integrationId, topicName, this.rpcManager, this.socketManager);
      integrationQueueManagers.set(topicName, queueManager);
    }

    return queueManager;
  }

  private getOrUndefined(integrationId: IntegrationId, topicName: string): QueueManagerImpl | undefined {
    return this.queueManagers.get(integrationId)?.get(topicName);
  }
}

/**
 * Interface defining the operations available for a queue manager.
 * Allows producing and consuming messages from a queue.
 * @category Queue
 */
export interface QueueManager<T> {
  /** Publish messages to the queue */
  produce(messages: T[]): Promise<void>;

  /** Consume messages from the queue */
  consume(): Observable<T>;
}

const SUBSCRIPTION_MUTEX = 'subscriptionMutex';

/** @internal */
export class QueueManagerImpl<T = any> implements QueueManager<T> {
  private messagesSubject = new Subject<unknown>();
  private subscriberCount = 0;
  private readonly lockManager = new LockManager();

  constructor(
    private readonly integrationId: IntegrationId,
    private readonly topicName: string,
    private readonly rpcManager: RpcManager,
    private readonly socketManager: SocketManager,
  ) {}

  async produce(messages: T[]): Promise<void> {
    await this.lockManager.lock(SUBSCRIPTION_MUTEX);
    try {
      await this.rpcManager.post('queue/produceMessages', {
        integrationId: this.integrationId,
        topicName: this.topicName,
        messages,
      });
    } finally {
      this.lockManager.release(SUBSCRIPTION_MUTEX);
    }
  }

  consume(): Observable<T> {
    this.socketManager.notifyWebSocketIsNeeded();
    return defer(() => {
      this.subscriberCount++;
      if (this.subscriberCount === 1) {
        void this.performSubscribe();
      }

      return this.messagesSubject.asObservable().pipe(
        finalize(() => {
          this.subscriberCount--;
          if (this.subscriberCount === 0) {
            void this.performUnsubscribe();
          }
        }),
      );
    }) as Observable<T>;
  }

  private async performSubscribe(): Promise<void> {
    await this.lockManager.lock(SUBSCRIPTION_MUTEX);

    try {
      await this.rpcManager.post('queue/subscribe', {
        integrationId: this.integrationId,
        topicName: this.topicName,
      });
    } catch (e) {
      this.messagesSubject.error(e);
      this.messagesSubject.complete();
      this.subscriberCount = 0;
      this.messagesSubject = new Subject<unknown>();
    } finally {
      this.lockManager.release(SUBSCRIPTION_MUTEX);
    }
  }

  private async performUnsubscribe(): Promise<void> {
    await this.lockManager.lock(SUBSCRIPTION_MUTEX);

    try {
      await this.rpcManager.post('queue/unsubscribe', {
        integrationId: this.integrationId,
        topicName: this.topicName,
      });
    } finally {
      this.lockManager.release(SUBSCRIPTION_MUTEX);
    }
  }

  onMessages(payload: string[]): void {
    for (const message of payload) {
      this.messagesSubject.next(message);
    }
  }

  destruct(): void {
    this.messagesSubject.complete();
  }
}
