import type { MasonClient } from './client';
import {
  isAnyMason,
  Mason,
  QueryId,
  QueryIdentifiable,
  QueryMason
} from './definition';
import {
  isInvalidMason,
  isSuccess,
  isFailure,
  InvalidMason,
  Success,
  Failure
} from './protocolMessages';
import { Deferred, createDeferred } from '../../commons/deferred';

import {
  Subject,
  Observable,
  EMPTY as EMPTY_OBSERVABLE,
  of as observableOf
} from 'rxjs';
import { flatMap } from 'rxjs/operators';
import { INT32_MAX } from '../../commons/numerics/limits';

export class GenericMasonClient implements MasonClient {
  private readonly _queries = new Map<QueryId, Deferred<Success | Failure>>();
  private readonly _nonQueryMasons$ = new Subject<Mason>();

  constructor(
    incoming$: Observable<unknown>,
    private _outgoing$: Subject<Mason>
  ) {
    const receiveInvalid = (_: InvalidMason): Observable<never> => {
      // TODO: deal with invalid
      console.warn('Invalid mason received!');
      return EMPTY_OBSERVABLE;
    };
    const receiveSuccessOrFailure = (
      mason: Success | Failure
    ): Observable<never> => {
      // TODO: deal with unknown query ID
      const queryId = mason.queryId;
      const deferred = this._queries.get(queryId);
      if (deferred) {
        deferred.resolve(mason);
      } else {
        console.warn(`Unknown message query ID ${queryId}`);
      }
      return EMPTY_OBSERVABLE;
    };
    const receive = (message: unknown): Observable<Mason> => {
      if (!isAnyMason(message)) {
        // TODO: deal with non-message
        console.log('Received non-mason message');
        return EMPTY_OBSERVABLE;
      }
      if (isInvalidMason(message)) {
        return receiveInvalid(message);
      }
      if (isSuccess(message) || isFailure(message)) {
        // Success and Failure are responses to previously sent query masons from our client.
        // We handle these different - e.g. resolve the Promise returned by `query()`.
        return receiveSuccessOrFailure(message);
      }
      return observableOf(message);
    };

    incoming$.pipe(flatMap(receive)).subscribe(this._nonQueryMasons$);
  }

  public get masons$(): Subject<Mason> {
    return this._nonQueryMasons$;
  }

  public query<TMason extends QueryMason>(
    mason: Omit<TMason, keyof QueryIdentifiable>
  ): Promise<Success | Failure> {
    let queryId: QueryId | undefined;
    do {
      // this will terminate eventually unless we have used all 2^32 possible IDs
      // ... which is _extremely_ unlikely to happen
      queryId = randomIntInclusive(1, INT32_MAX);
    } while (this._queries.has(queryId));

    const deferred = createDeferred<Success | Failure>();
    this._queries.set(queryId, deferred);
    const queryMason: QueryMason = {
      ...mason,
      queryId
    };
    this._outgoing$.next(queryMason);
    return deferred.promise;
  }
}

function randomIntInclusive(min: number, max: number) {
  min = Math.ceil(min);
  max = Math.floor(max);
  return Math.floor(Math.random() * (max - min + 1)) + min;
}
