import { SocketManager } from './socket.manager';
import { DestructManager } from './destruct.manager';
import { filter, firstValueFrom, Observable, race, Subject, take, timer } from 'rxjs';
import { map } from 'rxjs/operators';
import { generateId } from '../../internal-common/src/public-utils/id-utils';
import {
  AcquireLockMessage,
  LockAcquiredResponseMessageToClient,
  LockReleasedResponseMessageToClient,
  ReleaseLockMessage,
} from '../../internal-common/src/types/socket.types';

import { ClientRequestId } from '../../internal-common/src/public-types/communication.public-types';

/**
 * A handler for a distributed lock that can be released.
 * @category Platform
 */
export interface DistributedLock {
  /** Releases the lock. */
  release(): void;

  /**
   * Whether the lock has been released.
   * @returns True if the lock has been released.
   */
  isReleased(): boolean;

  /**
   * Observes when the lock is released (It may be released due to a connection issue)
   * @returns An observable that emits when the lock is released.
   */
  observeRelease(): Observable<void>;
}

/**
 * @internal
 */
export class DistributedLockManager {
  private readonly ongoingLocks: Record<string, DistributedLock> = {};
  private readonly acquireLockMessagesFromServer = this.socketManager
    .observeNotifications()
    .pipe(filter(message => message.type === 'lockAcquired')) as Observable<LockAcquiredResponseMessageToClient>;
  private readonly releaseLockMessagesFromServer = this.socketManager
    .observeNotifications()
    .pipe(filter(message => message.type === 'lockReleased')) as Observable<LockReleasedResponseMessageToClient>;

  /** @internal. */
  constructor(
    private readonly socketManager: SocketManager,
    private readonly destructManager: DestructManager,
  ) {
    destructManager.onPreDestruct(() => {
      this.releaseAllLocks();
    });

    this.socketManager.observeConnectionReady().subscribe(ready => {
      if (ready) return;
      this.releaseAllLocks();
    });

    this.releaseLockMessagesFromServer.subscribe(message => {
      const lock = this.ongoingLocks[message.payload.clientRequestId];
      if (lock === undefined) return;
      lock.release();
    });
  }

  // we may override this value in tests
  private lockWaitForConnectionThreshold = 2000;

  /**
   * Acquires a lock.
   * @param mutex The mutex to lock.
   * @param timeoutMillis The maximum time to wait for the lock to be acquired on the server.
   */
  async lock(mutex: string, timeoutMillis = 1000): Promise<DistributedLock> {
    /**
     * Wait up to 2 seconds for the connection to be ready and if it is not, consider the client to be not connected.
     * This is useful because right after the Squid client is created, the socket is still not connected but the user
     * may want to acquire a lock; it is a valid situation. Also, there may be short disconnect and in this case we
     * still want to wait for the connection to be ready.
     */
    this.socketManager.notifyWebSocketIsNeeded();
    const isConnected = await firstValueFrom(
      race(
        timer(this.lockWaitForConnectionThreshold).pipe(map(() => false)),
        this.socketManager.observeConnectionReady().pipe(filter(Boolean)),
        this.destructManager.observeIsDestructing(),
      ),
    );

    if (!isConnected) {
      return Promise.reject('CLIENT_NOT_CONNECTED');
    }
    const clientRequestId = generateId();
    const acquireLockMessage: AcquireLockMessage = {
      type: 'acquireLock',
      payload: {
        mutex,
        timeoutMillis,
        clientRequestId,
      },
    };
    this.socketManager.sendMessage(acquireLockMessage);
    const result = await firstValueFrom(
      race(
        timer(timeoutMillis + 4000).pipe(
          take(1),
          map(() => {
            return {
              payload: {
                error: 'TIMEOUT_GETTING_LOCK',
                lockId: undefined,
              },
            };
          }),
        ),
        this.acquireLockMessagesFromServer.pipe(filter(message => message.payload.clientRequestId === clientRequestId)),
      ),
    );
    if (this.destructManager.isDestructing) {
      throw new Error('Destructing');
    }
    if (!result.payload.lockId) {
      throw new Error(`Failed to acquire lock: ${result.payload.error}`);
    }
    const lockId = result.payload.lockId;
    const lock = new DistributedLockImpl(lockId, clientRequestId, this.ongoingLocks, this.socketManager);
    this.ongoingLocks[lockId] = lock;
    return lock;
  }

  private releaseAllLocks(): void {
    for (const [lockId, lock] of Object.entries(this.ongoingLocks)) {
      lock.release();
      delete this.ongoingLocks[lockId];
    }
  }
}

/**
 * @internal
 */
export class DistributedLockImpl implements DistributedLock {
  private released = false;
  private readonly onReleaseSubject = new Subject<void>();

  /** @internal. */
  constructor(
    private readonly lockId: string,
    private readonly clientRequestId: ClientRequestId,
    private readonly ongoingLocks: Record<string, DistributedLock>,
    private readonly socketManager: SocketManager,
  ) {}

  /**
   * Releases the lock if it hasn't been released already.
   * Sends a release message to the server and notifies observers of the release event.
   */
  release(): void {
    if (this.released) return;
    this.released = true;
    delete this.ongoingLocks[this.lockId];
    const releaseLockMessage: ReleaseLockMessage = {
      type: 'releaseLock',
      payload: {
        lockId: this.lockId,
        clientRequestId: this.clientRequestId,
      },
    };
    this.socketManager.sendMessage(releaseLockMessage);
    this.onReleaseSubject.next();
  }

  /**
   * Observes the release event for this lock.
   * This observable emits when the lock is released either explicitly or due to a server-side event.
   *
   * @returns An observable that emits once when the lock is released.
   */
  observeRelease(): Observable<void> {
    return this.onReleaseSubject.asObservable();
  }

  /**
   * Indicates whether the lock has been released.
   *
   * @returns `true` if the lock has already been released, otherwise `false`.
   */
  isReleased(): boolean {
    return this.released;
  }
}
