import { EventEmitter } from "@video/events-typed";
import type { Json } from "@video/log-node";
import { types } from "mediasoup-client";
import querystring from "qs";
import { Disposable, ErrorCode, PeerProducerOptions } from "../../../api";
import { device, Feature } from "../../../api/adapter";
import { RTCIceServer, RTCIceTransportPolicy } from "../../../api/adapter/features/media-device";
import { MediaStreamTrack } from "../../../api/adapter/features/media-stream";
import type { CallAPI, DominantSpeaker, SFUJoinOptions, SFUJoinUser, VideoClientStats } from "../../../api/call";
import type { AuthAPI } from "../../../api/auth";
import { createError } from "../../errors";
import {
  CallError,
  ConsumerNotFoundError,
  JoinCallError,
  MediasoupSetupError,
  wrapNativeError,
  WSRequestError,
} from "../../errors-deprecated";
import type { MediasoupSource } from "../../mediasoup-source";
import type { VcContext } from "../../utils/context/vc-context";
import { onceCanceled } from "../../utils/context/context";
import { extendContext } from "../../utils/logger";
import request, { RequestOptions, RequestResponse } from "../../utils/request/request";
import { Support } from "../../utils/support";
import type { CallEvents } from "./call.events";
import { AppData, PeerParameters, PERMISSIONS, WSConsumerSourcesResponse } from "./common";
import { disconnectedMessages, Message, MessageList, offlineMessages, TROUBLESHOOTING } from "./messageList";
import { Peer } from "./peer";
import Stream, { StreamOptions } from "./stream";
import Transport from "./transport";
import Authorization from "../../utils/auth/auth-core";
import type AuthorizationOptions from "../../utils/auth/auth-core";
import { contextId, instanceId } from "../../../utils/common";

const NUM_STUN_SERVERS = 5;

interface WSError {
  reason: string;
  errorMessage?: string;
  causes?: Array<{ name?: string }>;
}

interface ReadyData {
  ip: string;
  peerId: string;
  permissions: string[];
  behaviours: string[];
  version: string;
  scope: string;
  userId?: string;
  displayName?: string;
  rtpCapabilities: types.RtpCapabilities;
}

export interface PeerStats {
  id: string;
  mediaType: types.MediaKind;
  peerId: string;
  streamName: string;
  consumerBitrate?: number;
  consumerFractionLost: number;
  consumerFractionLostMil: number;
  consumerRTT: number;
  producerBitrate: number;
  producerFractionLost: number;
  producerFractionLostMil: number;
}

export interface SFUCallOptions {
  maxProducingViewers: number;
  support: Support;
  iceTransportPolicy: RTCIceTransportPolicy;
  noTurn: boolean;
  statsInterval: number;
  stats?: VideoClientStats;
  receiveVideo: boolean;
  receiveAudio: boolean;
  sendVideo: boolean;
  sendAudio: boolean;
  maxBitrate?: number;
  requireRecv: boolean;
  requireSend: boolean;
  test: boolean;
  retries: number;
  streams: { [key: string]: Partial<StreamOptions> };
  user: SFUJoinUser;
  clientName: string;
  displayName: string;
  clientVersion: string;
  auth: AuthAPI;
  authOptions: AuthorizationOptions;
  rsrc: string; // xcode host
  bpeerId: string; // broadcast peer ID
  xkey: string; // xcode private key
  suspendPlay: boolean;
  backendEndpoint: string;
  failoverUrls: string[];
  sfu?: SFUJoinOptions;
  wsReconnect?: boolean;
  wsReconnectTime?: number;
}

interface SFUCallBody {
  sfu: SFUJoinOptions;
}

export type SFUJoinCall = {
  id: string;
  callUri?: string;
  stun?: {
    servers: Array<{
      urls: string[];
    }>;
  };
  turn: {
    servers: Array<{
      urls: string[];
    }>;
  };
  sfu: {
    region: string;
    uri: string;
    httpUri?: string;
    version?: string;
  };
  support: string;
};

export type Join = {
  call?: SFUJoinCall;
  user: SFUJoinUser;
};

/**
 * @internal
 */
export class Call extends EventEmitter<CallEvents> implements Disposable {
  static readonly displayName = "PvcCall";

  private _permissions: string[];

  _localPeerId: string | null;

  private _behaviours: string[];

  private _messages: MessageList;

  private _peer?: Peer;

  private setPreferredEncodingProcessing = false;

  public get peer(): Peer | undefined {
    return this._peer;
  }

  private readonly _handleOffline: () => void;

  private readonly _handleOnline: () => void;

  private readonly _userOverrides?: SFUJoinUser;

  private readonly _stunServers: string[];

  private readonly _stunIndex: number;

  private readonly __iceServers: () => RTCIceServer[];

  private readonly ctx: VcContext;

  get _iceServers(): () => RTCIceServer[] {
    return this.__iceServers;
  }

  private readonly __edgeIceServers: () => RTCIceServer[];

  get _edgeIceServers(): () => RTCIceServer[] {
    return this.__edgeIceServers;
  }

  private readonly _joinCall: () => Promise<RequestResponse<Join>>;

  private readonly __iceTransportPolicy?: RTCIceTransportPolicy;

  public get _iceTransportPolicy(): "relay" | "all" | undefined {
    return this.__iceTransportPolicy;
  }

  private _maxBitrate: number | undefined;

  get maxBitrate(): number | undefined {
    return this._maxBitrate;
  }

  private readonly __requireRecv: boolean;

  get _requireRecv(): boolean {
    return this.__requireRecv;
  }

  private readonly __requireSend: boolean;

  get _requireSend(): boolean {
    return this.__requireSend;
  }

  private readonly _flushLogs: boolean;

  private readonly _retries: number;

  private __wsTransport: Transport | null = null;

  public get _wsTransport(): Transport | null {
    return this.__wsTransport;
  }

  private _ip: string | null;

  private _initialRun: boolean;

  private __closed: boolean;

  public get _closed(): boolean {
    return this.__closed;
  }

  private _startResolve: (() => void) | null;

  private _startPromise: Promise<void> | null = null;

  private _serverOptions: { call: { defaultMinBitrate: number } } | null;

  private readonly _stableTimeout?: number;

  private _stable: PeerStats | null;

  private readonly _streams: { [key: string]: Stream };

  private readonly _wsReconnect: boolean;

  private readonly _wsReconnectTime: number;

  get streams(): { [key: string]: Stream } {
    return this._streams;
  }

  public _iceTransportPolicyUsed: RTCIceTransportPolicy | null = null;

  public options: {
    call: Partial<SFUCallOptions>;
  };

  public call: SFUJoinCall;

  public user?: SFUJoinUser;

  get permissions(): string[] {
    return this._permissions;
  }

  get peerId(): string | null {
    return this._localPeerId;
  }

  get behaviours(): string[] {
    return this._behaviours;
  }

  get wsReady(): boolean {
    return this.__wsTransport?.connected ?? false;
  }

  // userShould checks the user's permission and behaviours to determine
  // if the user should perform an action
  userShould(permission: string): boolean {
    if (!this.hasPermission(permission)) {
      return false;
    }
    return this._behaviours.includes(permission);
  }

  constructor(
    ctx: VcContext,
    join: Join,
    joinCall: () => Promise<RequestResponse<Join>>,
    options: Partial<SFUCallOptions> = {},
  ) {
    super();

    this.ctx = ctx;
    onceCanceled(ctx).then((reason) => this.dispose(`VideoClient Context Cancelled: ${reason}`));

    ctx.logger.trace("constructor()", { options: options as any });

    // TODO: find all client options we want to send -
    // anything in the object will get sent to the MS3 SFU
    const { maxProducingViewers, stats } = options;
    this.options = {
      call: {
        maxProducingViewers,
        stats,
      },
    };

    this._maxBitrate = options.maxBitrate;

    this._messages = new MessageList();

    this._handleOffline = () => {
      this.emit("CALL_EMIT_MESSAGES", { messages: offlineMessages.messages });
    };
    this._handleOnline = () => {
      this.emit("CALL_EMIT_MESSAGES", { messages: offlineMessages.messages });
    };

    device.addEventListener("offline", this._handleOffline);
    device.addEventListener("online", this._handleOnline);

    if (join.call == null) {
      throw new Error("json.call is undefined");
    }

    this.call = join.call;
    this._userOverrides = options.user;
    this.user = {
      ...join.user,
      ...this._userOverrides,
    };

    this._stunServers = this.call.stun?.servers?.map((x) => `stun:${x}`) ?? [];
    this._stunIndex = NUM_STUN_SERVERS;
    this.__iceServers = () => {
      if (this._stunIndex === 0 || this._stunServers.length === 0) {
        if (this.call.turn?.servers?.length > 0) {
          return this.call.turn.servers;
        }
        return [];
      }

      const stun = [
        {
          urls: this._stunServers.slice(this._stunIndex - NUM_STUN_SERVERS, this._stunIndex),
        },
      ];
      if (this.call.turn?.servers?.length > 0) {
        return this.call.turn.servers.concat(stun);
      }
      return stun;
    };

    this.__edgeIceServers = () => {
      // edge only supports UDP turn
      const svrs: RTCIceServer[] = [];
      if (this.call.turn?.servers?.length > 0) {
        this.call.turn.servers.forEach((server) => {
          if (Array.isArray(server.urls)) {
            const urls = server.urls.filter((url) => url.startsWith("turn:"));
            if (urls?.length > 0) {
              svrs.push({
                ...server,
                urls: urls.map((url) => url.split("?")[0] ?? ""),
              });
            }
          }
        });
      }

      return svrs;
    };

    this._joinCall = joinCall;

    // The local peer's ID
    this._localPeerId = null;

    this.__iceTransportPolicy = options.iceTransportPolicy;
    this.__requireRecv = options.requireRecv === true;
    this.__requireSend = options.requireSend === true;
    this._flushLogs = !options.test;
    this._retries = options.retries ?? Infinity;

    // Tells the WS to reconnect or not after idling for set amount of time.
    this._wsReconnect = options.wsReconnect ?? false;

    // Determines the amount of time a ws should wait to try to reconnect after a given amount of time with no response
    this._wsReconnectTime = options.wsReconnectTime ?? 10;

    // Set when a media soup room is created
    this._iceTransportPolicyUsed = null;

    this._ip = null;
    this._permissions = [];
    this._behaviours = [];

    this._initialRun = true;

    this.__closed = false;
    this._startResolve = null;
    this._startPromise = null;

    this._serverOptions = null;
    this._stableTimeout = undefined;
    this._stable = null;

    this._streams = {};
    const { streams } = options;
    if (streams != null && Object.keys(streams).length > 0) {
      Object.keys(streams).forEach((streamName) => {
        const stream = streams[streamName];
        this.setStream(streamName, stream);
      });
    } else {
      this.ctx.logger.debug("Setting stream failed, there are no defined streams", { streams: streams as any });
    }

    this.setLogData();
  }

  isDisposed = false;

  dispose(reason?: string | undefined): void {
    this.close(`disposed: ${reason}`);
    this.isDisposed = true;
  }

  hasPermission(permission: string): boolean {
    return this._permissions?.includes(permission);
  }

  get streamNames(): string[] {
    return Object.keys(this._streams);
  }

  setStream(streamName: string, options?: Partial<StreamOptions>): void {
    this._streams[streamName] = new Stream(extendContext(this.ctx, Stream), streamName, this, options);
  }

  getStream(streamName: string): Stream | null {
    return this._streams[streamName];
  }

  removeStream(streamName: string): void {
    delete this._streams[streamName];
  }

  forEachStream(cb: (stream: Stream, streamName: string) => void): void {
    if (this._streams != null && Object.keys(this._streams).length > 0) {
      Object.keys(this._streams).forEach((streamName) => {
        cb(this._streams[streamName], streamName);
      });
    } else {
      this.ctx.logger.debug("Stream close() failed, there are no defined streams to close", {
        streams: this._streams as any,
      });
    }
  }

  *streamsIterator(): Generator<Stream> {
    for (const k of Object.keys(this._streams)) {
      yield this._streams[k];
    }
  }

  setLogData(): void {
    this.ctx.logger.setMessageAggregate("callId", this.call.id);
    this.ctx.logger.setMessageAggregate("userId", this.user?.userId ?? "no user id");
    this.ctx.logger.setMessageAggregate("scope", this.user?.scope ?? "no scope");
    this.ctx.logger.setMessageAggregate("displayName", this.user?.displayName ?? "no display name");
  }

  clearLogData(): void {
    this.ctx.logger.removeMessageAggregate("callId");
    this.ctx.logger.removeMessageAggregate("peerId");
    this.ctx.logger.removeMessageAggregate("userId");
    this.ctx.logger.removeMessageAggregate("scope");
    this.ctx.logger.removeMessageAggregate("displayName");
    this.ctx.logger.removeMessageAggregate("recvTransportId");
    this.ctx.logger.removeMessageAggregate("recvTransportState");
    this.ctx.logger.removeMessageAggregate("sendTransportId");
    this.ctx.logger.removeMessageAggregate("sendTransportState");
  }

  _addMessage(msg: Message): void {
    this._messages.addMessage(msg);
    this.emit("CALL_EMIT_MESSAGES", { messages: offlineMessages.messages });
  }

  _removeMessage(msg: Message): void {
    this._messages.removeMessage(msg);
    this.emit("CALL_EMIT_MESSAGES", { messages: offlineMessages.messages });
  }

  get messages(): Message[] {
    return this._messages.messages;
  }

  // should only be triggered externally and should be idempotent
  async close(
    debugString = "Implementer did not pass debugString",
    internal = false,
    hasError = false,
  ): Promise<unknown> {
    // used by the _run websocket disconnect handler to determine whether this was a user requested close
    if (this.__closed) {
      return null;
    }

    this.ctx.logger.info("Call Class, call closed()", {
      aggregates: { internal, reason: debugString },
    });
    if (this._flushLogs) {
      this.ctx.logger.flush();
    }

    if (this._peer != null) {
      await this._peer.close(debugString);
    }

    if (this.__wsTransport != null) {
      this.__wsTransport.setClosing();
    }

    device.removeEventListener("offline", this._handleOffline);
    device.removeEventListener("online", this._handleOnline);
    this.__closed = true;

    this.forEachStream((s) => s.close());
    device.clearTimeout(this._stableTimeout);
    this._stable = null;
    this._cleanup(hasError);
    return Promise.resolve();
  }

  private _cleanup(hasError = false): void {
    // if hasError is true then we don't want to resolve the start promise
    // so this part **really** needs to be refactored
    if (!hasError && this._startResolve != null) {
      this._startResolve();
    }

    if (this.__wsTransport != null) {
      this.__wsTransport.close();
      this.__wsTransport = null;
    }
    this.emit("CALL_SET_ROOM_DONE");

    this.clearLogData();
  }

  async updateMaxBitrate(bitrate: number): Promise<void> {
    if (this.__wsTransport != null) {
      const args = { bitrate };
      try {
        await this._request("updateMaxBitrate", args);
        this._maxBitrate = bitrate;
      } catch (err) {
        const inner = err instanceof Error ? err : null;
        this.emit(
          "error",
          new WSRequestError("ws request error", {
            inner,
            request: "updateMaxBitrate",
            args,
            internalCall: this,
          }) as any,
        );
      }
    }
  }

  setDisplayName(displayName: string): void {
    this.ctx.logger.info("updating displayName", {
      displayName,
    });

    if (this.user != null) {
      this.user.displayName = displayName;
    }

    if (this.__wsTransport != null) {
      this.__wsTransport.send("options", {
        peer: {
          displayName,
        },
      });
    }
  }

  setMaxProducingViewers(maxProducingViewers: number): void {
    this.ctx.logger.info("updating maxProducingViewers", {
      maxProducingViewers,
    });

    if (this.__wsTransport != null) {
      this.__wsTransport.send("options", {
        call: {
          maxProducingViewers,
        },
      });
    }
  }

  getViewers(): void {
    throw new Error("getViewers is deprecated");
  }

  public async promoteViewer(userId: string, permissions: unknown, webhook = false): Promise<void> {
    if (this.__wsTransport == null) {
      this.ctx.logger.warn("calling get viewers on a disconnected call");
      throw new Error("call not connected");
    }

    if (!this.hasPermission(PERMISSIONS.CHANGE_PEER_SCOPE)) {
      this.ctx.logger.warn("viewer does not have permission to promote other viewers");
      throw new Error("no admin change scope permissions");
    }

    let response: { viewers?: unknown[] };
    const args = {
      userId,
      permissions,
      webhook,
    };
    try {
      response = await this._request("promote", args);
    } catch (err) {
      const inner = err instanceof Error ? err : null;
      this.emit(
        "error",
        new WSRequestError("ws request error", { inner, request: "promote", args, internalCall: this }) as any,
      );
      return;
    }

    this.emit("CALL_VIEWERS", { viewers: response.viewers ?? [] });
  }

  public async demoteViewer(userId: string, permissions: unknown, webhook = false): Promise<void> {
    if (this.__wsTransport == null) {
      this.ctx.logger.warn("calling get viewers on a disconnected call");
      throw new Error("call not connected");
    }

    if (!this.hasPermission(PERMISSIONS.CHANGE_PEER_SCOPE)) {
      this.ctx.logger.warn("viewer does not have permission to demote other viewers");
      throw new Error("no admin change scope permissions");
    }

    let response: { viewers?: unknown[] };
    const args = {
      userId,
      webhook,
      permissions,
    };
    try {
      response = await this._request("demote", args);
    } catch (err) {
      const inner = err instanceof Error ? err : null;
      this.emit(
        "error",
        new WSRequestError("ws request error", { inner, request: "demote", args, internalCall: this }) as any,
      );
      return;
    }

    this.emit("CALL_VIEWERS", { viewers: response.viewers ?? [] });
  }

  // should really be called kickViewerProducers
  public async kickViewer(userId: string, streamOnly: boolean, webhook = false): Promise<void> {
    if (this.__wsTransport == null) {
      this.ctx.logger.warn("calling get viewers on a disconnected call");
      throw new Error("call not connected");
    }

    if (!this.hasPermission(PERMISSIONS.CHANGE_PEER_SCOPE)) {
      this.ctx.logger.warn("viewer does not have permission to kick other viewers");
      throw new Error("no admin change scope permissions");
    }

    let response: { viewers?: unknown[] };
    const args = { userId };
    try {
      response = await this._request("kick", args);
    } catch (err) {
      const inner = err instanceof Error ? err : null;
      this.emit(
        "error",
        new WSRequestError("ws request error", { inner, request: "kick", args, internalCall: this }) as any,
      );
      return;
    }

    this.emit("CALL_VIEWERS", { viewers: response.viewers ?? [] });
  }

  public async kickPeer(peerId: string, streamOnly: boolean, webhook = false): Promise<void> {
    if (this.__wsTransport == null) {
      this.ctx.logger.warn("calling get peers on a disconnected call");
      throw new Error("call not connected");
    }

    if (!this.hasPermission(PERMISSIONS.CHANGE_PEER_SCOPE)) {
      this.ctx.logger.warn("viewer does not have permission to kick other peers");
      throw new Error("no admin change scope permissions");
    }

    let response: { viewers?: unknown[] };
    const args = { peerId };
    try {
      response = await this._request("kick", args);
    } catch (err) {
      const inner = err instanceof Error ? err : null;
      this.emit(
        "error",
        new WSRequestError("ws request error", { inner, request: "kick", args, internalCall: this }) as any,
      );

      return;
    }

    this.emit("CALL_VIEWERS", { viewers: response.viewers ?? [] });
  }

  async connect(): Promise<void> {
    this.emit("CALL_CONNECTING", true);
    if (this.__wsTransport) {
      try {
        this.__wsTransport.close(true);
        // this.__wsTransport.dispose();
      } catch (error) {
        this.ctx.logger.info("Error closing websocket transport", { error: error as any });
      }
      this.__wsTransport = null;
    }

    this._peer?.peers.forEach((p: PeerParameters) => {
      this._peer?._onPeerClosed(p);
    });

    await this._peer?.dispose("Reconnecting");
    this._peer = undefined;

    return new Promise((resolve, reject) => {
      if (this.call.callUri == null) {
        reject(new Error("callUri is undefined"));
        this.emit("CALL_CONNECTING", false);
        return;
      }

      this.__wsTransport = new Transport(
        extendContext(this.ctx, Transport),
        this.call.callUri,
        this._joinCall,
        this._retries,
        this._wsReconnect,
        this._wsReconnectTime,
      );

      this.__wsTransport.on("websocket-reconnect", (webpageInactive: boolean) => {
        if (this._wsReconnect || webpageInactive) {
          this.ctx.logger.info("Attempting websocket reconnect");
          this.emit("CALL_WEBSOCKET_RECONNECT");
        }
      });

      this.__wsTransport.on("prepare-rejoin-call", () => {
        for (const stream of this.streamsIterator()) {
          stream.close(false);
        }
      });

      this.__wsTransport.on("callEnded", () => {
        // TODO: should probably emit a new event
        this.close("call already ended on websocket", true).catch(this.ctx.logger.error);
        resolve();
        this.ctx.logger.info("calls-core/call: end");
      });

      this.__wsTransport.on("callError", (data: WSError) => {
        reject(new CallError(data.errorMessage ?? "Unknown call error", { reason: data.reason }));
        if (data.reason === "sfu-busy") {
          this.emit("error", createError(ErrorCode.SFUBusy, data.errorMessage ?? "unknown reason", {}));
        }
        this.close("call error on websocket", true, true).catch(this.ctx.logger.error);
      });

      this._setup(resolve, reject);

      this.__wsTransport.once("connect", () => {
        this.emit("CALL_CONNECTING", false);
        this.emit("CALL_SET_CONNECTED", { connected: true });
      });
    });
  }

  start(): Promise<void> {
    this.ctx.logger.info("calls-core/call: start", {});

    if (this._startPromise != null) {
      this.ctx.logger.info("call: start has already been called");
      return this._startPromise;
    }

    this._startPromise = new Promise((resolve, reject) => {
      if (!this.ctx.support.any) {
        reject(new Error("Device is not supported"));
        return;
      }

      // will resolve when close is called
      this._startResolve = resolve;

      if (!this._initialRun) {
        this.emit("CALL_SET_CONNECTED", { connected: false });
        this.emit("CALL_EMIT_MESSAGES", {
          messages: disconnectedMessages.messages,
        });
      }

      if (this.call.callUri == null) {
        reject(new Error("Call URI is undefined"));
        return;
      }

      this.connect()
        .then(() => {
          this._messages = new MessageList();
          this.emit("CALL_EMIT_MESSAGES", {
            messages: this._messages.messages ?? [],
          });
          this._initialRun = false;
        })
        .catch(reject);
    });

    return this._startPromise;
  }

  private _setup(resolve: () => void, reject: (err?: Error) => void): void {
    this.ctx.logger.debug("call setup");
    if (this.__wsTransport == null) {
      const err = new Error("_wsTransport is null");
      this.ctx.logger.warn("call _setup", { err: err?.message });
      throw err;
    }

    this.__wsTransport.on("disconnect", () => {
      this.emit("CALL_CONNECTING", false);
      // this.__wsTransport = null; I don't think this should be here, we aren't properly disposing/closing the websocket.
    });

    this.__wsTransport.on("need-user", () => {
      this.__wsTransport?.send("user", this.user);
    });

    // update user and call on when a user re-joins a call.
    this.__wsTransport.on("join-refresh", (response: RequestResponse<Join>) => {
      if (response.body.call == null) {
        this.ctx.logger.error("response.body.call is undefined");
        return;
      }

      this.user = {
        ...response.body.user,
        ...this._userOverrides,
      };
      this.call = response.body.call;
    });

    this.__wsTransport.on("join-call-error", (payload: { error: Error }) => {
      const inner = payload.error instanceof Error ? payload.error : null;
      const err = new JoinCallError("WS: Join Call Error", { inner });
      this.emit("error", err as any);
      reject(err);
    });

    this.__wsTransport.on("call-rejected", () => {
      this.ctx.logger.info("call rejected");
      const err = new CallError("WS: Call Rejected", { reason: "rejected" });
      this.emit("error", err as any);
      reject(err);
      this.close("call was rejected by websocket", true).catch(this.ctx.logger.error);
    });

    this.__wsTransport.on("callRejected", () => {
      this.ctx.logger.info("call rejected");
      const err = new CallError("WS: Call Rejected", { reason: "rejected" });
      reject(err);
      this.close("call was rejected by websocket", true);
    });

    this.__wsTransport.on("call-ended", () => {
      this.ctx.logger.info("call ended");
      this.close("call already ended on websocket", true);
    });

    this.__wsTransport.on("Forbidden", () => {
      this.ctx.logger.info("call ended - kicked");
      this.emit("CALL_FORBIDDEN");
      this.close("call ended - kicked (Forbidden)", true);
    });

    this.__wsTransport.on("call-refresh", (call: SFUJoinCall) => {
      this.call = call;
      // TODO: see if we need to remit the call
    });

    this.__wsTransport.on("peerAtCapacity", (producersAtCapacity) => {
      const k = Object.keys(producersAtCapacity);
      this.emit("CALL_PEER_AT_CAPACITY", producersAtCapacity[k[0]]);
    });

    this.__wsTransport.on("producersKicked", () => {
      this.ctx.logger.info("Call ended - producers kicked");
      this.emit("CALL_PRODUCER_KICKED");
    });

    this.__wsTransport.on("dominantSpeaker", (dominantSpeaker: DominantSpeaker) => {
      this.ctx.logger.debug("websocket transport dominantSpeaker", {
        dominantSpeaker: dominantSpeaker as any,
      });

      this.emit("CALL_DOMINANT_SPEAKER", {
        userId: dominantSpeaker.userId,
        displayName: dominantSpeaker.displayName,
        peerId: dominantSpeaker.peerId,
        streamName: dominantSpeaker.streamName,
        producerId: dominantSpeaker.producerId,
      });
    });

    const networkCheck = (stats: PeerStats): void => {
      if (this._stable != null) {
        if (stats.mediaType === "video") {
          if ((this._serverOptions?.call?.defaultMinBitrate ?? 0) > 0) {
            if ((stats.consumerBitrate ?? 0) < (this._serverOptions?.call?.defaultMinBitrate ?? 0)) {
              if (this._stable.consumerBitrate === 0) {
                this._stable.consumerBitrate = Date.now();
              } else if (Date.now() > (this._stable?.consumerBitrate ?? 0) + 1000) {
                this._addMessage(TROUBLESHOOTING.UNSTABLE_NETWORK);
              }
            } else {
              delete this._stable.consumerBitrate;
            }
          }
        }
      }
    };

    this.__wsTransport.on("consumer-stats", (stats: PeerStats) => {
      networkCheck(stats);

      this.emit("CALL_SET_CONSUMER_STATS", {
        peerId: stats.peerId,
        streamName: stats.streamName,
        consumerId: stats.id,
        stats,
      });
    });

    this.__wsTransport.on("producer-stats", (stats: PeerStats) => {
      networkCheck(stats);

      this.emit("CALL_SET_PRODUCER_STATS", {
        streamName: stats.streamName,
        producerId: stats.id,
        stats,
      });
    });

    this.__wsTransport.on("permissions", (permissionsData: { permissions: string[] }) => {
      this._permissions = permissionsData.permissions;
      this.emit("CALL_PERMISSIONS", {
        permissions: permissionsData.permissions,
      });
    });

    this.__wsTransport.on("options", (options: { call: { defaultMinBitrate: number } }) => {
      this.ctx.logger.debug("websocket transport options", {
        options,
      });

      this._serverOptions = options;
      this.emit("CALL_OPTIONS", { options });
    });

    // resolve on ready state
    this.__wsTransport.on("ready", (readyData: ReadyData) => {
      if (process.env.NODE_ENV === "development") {
        // thar block should be removed in production mode by webpack/rollup/etc
        const sfuFailChance = device.globals.get("sfuFailChance");
        if (typeof sfuFailChance === "number" && Math.random() < sfuFailChance) {
          const err = new JoinCallError("DEBUG: Simulated SFU join error", {});
          reject(err);
        }
      }

      this.emit("CALL_SFU_CONNECTION");
      if (this.__closed && this.__wsTransport != null) {
        this.ctx.logger.debug("call: ready event fired after call was closed");
        this.__wsTransport.close();
        return;
      }

      this.ctx.logger.info("websocket transport ready", { readyData: readyData as any });

      this._ip = readyData.ip;
      this._localPeerId = readyData.peerId;
      this.ctx.logger.setMessageAggregate("peerId", this._localPeerId);
      if (readyData.permissions?.length > 0) {
        this._permissions = readyData.permissions;
        this.emit("CALL_PERMISSIONS", { permissions: readyData.permissions });
      }
      if (readyData.behaviours?.length > 0) {
        this._behaviours = readyData.behaviours;
      }

      if (readyData.version === "v3" && this.user != null) {
        this.user.scope = readyData.scope;

        if (readyData.userId != null) {
          this.user.userId = readyData.userId;
        }

        if (readyData.displayName != null) {
          this.user.displayName = readyData.displayName;
        }

        this.clearLogData();
        this.setLogData();
      }

      this.emit("CALL_ROOM_DATA", {
        scope: this.user?.scope ?? "",
        peerId: readyData.peerId,
      });

      // save if the call is closed/closing before entering try/catch block
      const closed = this._closed || this._peer?._closed;

      this._request("options", this.options).catch((err) => {
        const currentlyClosed = this._closed || this._peer?._closed;
        const isPeerCloseError = !closed && currentlyClosed;

        //if the call was closed during async request sallow error
        if (!isPeerCloseError) {
          const inner = err instanceof Error ? err : null;
          this.emit(
            "error",
            new WSRequestError("ws request error", {
              inner,
              request: "options",
              args: this.options,
              internalCall: this,
            }) as any,
          );
        }
      });

      this._peer?.dispose("Create a new peer after ws reconnection");
      this._peer = new Peer(extendContext(this.ctx, Peer), this);
      this._peer.statsCollector.on("webrtc-stats", (stats) => {
        this.emit("webrtc-stats", stats);
      });
      this._peer.statsCollector.on("webrtc-stats-raw", (stats) => {
        this.emit("webrtc-stats-raw", stats);
      });
      this._peer.on("error", (err: any) => {
        if (err.critical) {
          this.ctx.logger.error(`Peer error: ${err.code}; ${err.message}`, { err, call: this });
          this.close("peer error", true);
          reject(err);
        } else {
          this.ctx.logger.warn(`Peer error: ${err.code}; ${err.message}`, { err, call: this });
        }
      });

      this._peer._setup(readyData.rtpCapabilities).then(() => {
        this.ctx.logger.debug("websocket ready", { rtpCapabilities: readyData.rtpCapabilities });
        this.emit("CALL_READY");
      });
    });
  }

  async createProducer(
    track: MediaStreamTrack,
    options: PeerProducerOptions,
    appData: AppData,
  ): Promise<types.Producer> {
    if (this._peer == null) {
      const msg = "creating a producer on a room which has not been set up";
      this.ctx.logger.warn(msg);
      throw new Error(msg);
    }

    // save if the call is closed/closing before entering try/catch block
    const closed = this._closed || this._peer?._closed;

    try {
      return await this._peer.produce(track, options, appData);
    } catch (err) {
      const currentlyClosed = this._closed || this._peer?._closed;
      const isPeerCloseError = !closed && currentlyClosed;

      //if the call was closed during async request sallow error
      if (!isPeerCloseError) {
        if (err instanceof types.InvalidStateError && err.message === "track ended") {
          this.ctx.logger.warn("createProducer: track ended");
        } else {
          this.ctx.logger.error(
            new MediasoupSetupError("createProducer: error creating producer", { inner: wrapNativeError(err) }),
          );
        }
      }
      throw err;
    }
  }

  get _localSettings(): Partial<types.TransportOptions> {
    if (!device.isImplements(Feature.LOCAL_STORAGE)) {
      return {};
    }

    const data = device.localStorage.getItem(`pvc:${this._ip}:settings`);
    if (data == null) {
      return {};
    }

    let settings;
    try {
      settings = JSON.parse(data);
    } catch (err) {
      const msg = err instanceof Error ? err.message : "unknown error";
      this.ctx.logger.error("unable to parse settings", {
        data,
        err: msg,
      });
      return {};
    }
    return settings;
  }

  set _localSettings(settings: Partial<types.TransportOptions>) {
    if (!device.isImplements(Feature.LOCAL_STORAGE)) {
      return;
    }
    // Prevent overwriting local development debug preference
    if (device.localStorage.getItem("debug") === "false") {
      return;
    }
    device.localStorage.setItem(`pvc:${this._ip}:settings`, JSON.stringify(settings));
  }

  async _request<T>(type: string, data: unknown): Promise<T> {
    if (this.__wsTransport == null) {
      const err = new Error("_wsTransport is null");
      this.ctx.logger.warn("call _request", { err: err?.message });
      throw err;
    }

    const response = await this.__wsTransport.request<T>(type, data);
    if (response == null) {
      const err = new Error("response is null");
      this.ctx.logger.warn("call _request", { err: err?.message });
      throw err;
    }

    return response;
  }

  async requestSources(streamName: string, peerId: string): Promise<WSConsumerSourcesResponse> {
    const args = { streamName, peerId };
    return this._request<WSConsumerSourcesResponse>("consumerSources", args).catch((err) => {
      const inner = err instanceof Error ? err : null;
      if (err instanceof Error && err.message !== "request error: peer-closed") {
        this.emit(
          "error",
          new WSRequestError("ws request error", {
            inner,
            request: "consumerSources",
            args,
            internalCall: this,
          }) as any,
        );
      }
      return err;
    });
  }

  setPreferredEncoding(consumerId: string, id: string | number): void {
    if (!this.peer?.hasConsumerId(consumerId)) {
      this.emit("error", new ConsumerNotFoundError("consumer not found for consumer event", { consumerId }) as any);
      return;
    }

    if (this.setPreferredEncodingProcessing) {
      this.ctx.logger.warn("setPreferredEncoding already processing", { consumerId, id });
      return;
    }

    const args = { consumerId, id };

    this.setPreferredEncodingProcessing = true;
    this._request("setPreferredEncoding", args)
      .catch((err) => {
        const inner = err instanceof Error ? err : null;

        if (this._consumerNotFoundError(consumerId, inner)) {
          // If the consumerId still exists in VDC but not on server, issue a warning rather than an error
          if (this.peer?.hasConsumerId(consumerId)) {
            this.ctx.logger.warn("consumer does not exist on server, but still exists in videoclient", {
              err: err?.message,
              consumerId,
            });
          }
          // If consumerId does not exist in VDC or server, ignore this error.
          return;
        }

        this.emit(
          "error",
          new WSRequestError("ws request error", {
            inner,
            request: "setPreferredEncoding",
            args,
            internalCall: this,
          }) as any,
        );
        return err;
      })
      .finally(() => {
        this.setPreferredEncodingProcessing = false;
      });
  }

  private _consumerNotFoundError(consumerId: string, err: Error | null): boolean {
    if (err == null || err?.message !== "request error: consumer not found") {
      return false;
    }
    return true;
  }

  async setVideoStreamProvider(
    streamName: string,
    streamProvider: MediasoupSource,
    options: PeerProducerOptions,
    execute: boolean,
    preview = false,
  ): Promise<types.Producer | null> {
    if (this._streams[streamName] == null) {
      const msg = "setVideoStreamRetriever on invalid stream";
      this.ctx.logger.error(msg);
      throw new Error(msg);
    }

    return this._streams[streamName].setVideoStreamProvider(streamProvider, options, execute, preview);
  }

  async setAudioStreamProvider(
    streamName: string,
    streamProvider: MediasoupSource,
    options: PeerProducerOptions,
    execute: boolean,
    preview = false,
  ): Promise<types.Producer | null> {
    if (this._streams[streamName] == null) {
      const msg = "setAudioStreamRetriever on invalid stream";
      this.ctx.logger.error(msg);
      throw new Error(msg);
    }

    return this._streams[streamName].setAudioStreamProvider(streamProvider, options, execute, preview);
  }

  muteAudio(streamName: string): void {
    if (this._streams[streamName] == null) {
      const msg = "muteAudio on invalid stream";
      this.ctx.logger.error(msg, {
        streamName,
      });
      throw new Error(msg);
    }

    this._streams[streamName].pauseAudio();
  }

  async unmuteAudio(streamName: string, options: PeerProducerOptions = {}): Promise<void> {
    if (this._streams[streamName] == null) {
      const msg = "unmuteAudio on invalid stream";
      this.ctx.logger.error(msg, {
        streamName,
      });
      throw new Error(msg);
    }

    return this._streams[streamName].enableAudio(options, false);
  }

  disableAudio(streamName: string): void {
    if (this._streams[streamName] == null) {
      const msg = "disableAudio on invalid stream";
      this.ctx.logger.error(msg, {
        streamName,
      });
      throw new Error(msg);
    }

    this._streams[streamName].disableAudio();
  }

  async enableAudio(
    options: PeerProducerOptions = {},
    streamName: string,
    refresh: boolean,
    preview = false,
  ): Promise<void> {
    this.ctx.logger.debug("enable audio", { streamName, refresh });

    if (this._streams[streamName] == null) {
      const msg = "enableAudio on invalid stream";
      this.ctx.logger.error(msg, {
        streamName,
      });
      throw new Error(msg);
    }

    return this._streams[streamName].enableAudio(options, refresh, preview);
  }

  async enableVideo(
    options: PeerProducerOptions = {},
    streamName: string,
    refresh: boolean,
    preview = false,
  ): Promise<void> {
    if (!options.audioOnly) {
      this.ctx.logger.debug("enable video", { streamName, refresh });

      if (this._streams[streamName] == null) {
        const msg = "enableVideo on invalid stream";
        this.ctx.logger.error(msg, {
          streamName,
        });
        throw new Error(msg);
      }

      return this._streams[streamName].enableVideo(options, refresh, preview);
    }
    this.ctx.logger.debug("enable video", { streamName, refresh });
  }

  disableVideo(streamName: string): void {
    if (this._streams[streamName] == null) {
      const msg = "disableVideo on invalid stream";
      this.ctx.logger.error(msg, {
        streamName,
      });
      throw new Error(msg);
    }

    this._streams[streamName].disableVideo();
  }

  async unpauseVideo(streamName: string, options: PeerProducerOptions = {}): Promise<void> {
    if (this._streams[streamName] == null) {
      const msg = "unpauseVideo on invalid stream";
      this.ctx.logger.error(msg, {
        streamName,
      });
      throw new Error(msg);
    }

    return this._streams[streamName].enableVideo(options, false);
  }

  pauseVideo(streamName: string): void {
    this.ctx.logger.debug("calls-core/call: pause video");
    if (this._streams[streamName] == null) {
      const msg = "pauseVideo on invalid stream";
      this.ctx.logger.error(msg, {
        streamName,
      });
      throw new Error(msg);
    }

    return this._streams[streamName].pauseVideo();
  }

  // _pauseProducer pauses server side producer
  _pauseProducer(producerId: string, streamName: string): void {
    if (this.__wsTransport == null) {
      const err = new Error("_wsTransport is null");
      this.ctx.logger.warn("call _pauseProducer", { err: err?.message });
      throw err;
    }

    this.__wsTransport
      .request("pauseProducer", {
        producerId,
      })
      .catch((err) => {
        this.ctx.logger.warn("call.pauseProducer: error pausing server producer", err);
      });
    this.emit("CALL_SET_PRODUCER_PAUSED", {
      streamName,
      producerId,
      originator: "client",
    });
  }

  // _resumeProducer resumes server site producer
  _resumeProducer(producerId: string, streamName: string): void {
    if (this.__wsTransport == null) {
      const err = new Error("_wsTransport is null");
      this.ctx.logger.warn("call _pauseProducer", { err: err?.message });
      throw err;
    }

    this.__wsTransport
      .request("resumeProducer", {
        producerId,
      })
      .catch((err) => {
        this.ctx.logger.warn("call.resumeProducer: error resuming server producer", err);
      });
    this.emit("CALL_SET_PRODUCER_RESUMED", {
      streamName,
      producerId,
      originator: "client",
    });
  }

  // _closeProducer closes server side producer
  _closeProducer(producerId: string, streamName: string): void {
    if (this.__wsTransport == null) {
      this.ctx.logger.debug("_closeProducer was called when websocket was closed");
      return;
    }

    this.__wsTransport
      .request("closeProducer", {
        producerId,
      })
      .catch((err) => {
        this.ctx.logger.warn("call.closeProducer: error closing server producer", { err: `${err}`, producerId });
      });
    this.emit("CALL_REMOVE_PRODUCER", {
      streamName,
      producerId,
      originator: "client",
    });
  }

  async hotswapProducer(
    kind: types.MediaKind,
    streamName: string,
    producerOptions: PeerProducerOptions,
  ): Promise<void> {
    this.ctx.logger.debug("calls-core/call: hotswwap producer");
    if (this._streams[streamName] == null) {
      const msg = "hotswapProducer on invalid stream";
      this.ctx.logger.error(msg, {
        streamName,
      });
      throw new Error(msg);
    }

    await this._streams[streamName].hotswapProducer(kind, producerOptions);
  }

  toJSON(): Json {
    return {
      streamNames: this.streamNames,
      peer: this._peer,

      aggregates: {
        support: this.ctx.support.hash,
        url: this.call.callUri,
        callId: this.call.id,
        userId: this.user?.userId ?? "",
        peerId: this.peerId,
        isConnected: this._wsTransport?.connected,
      },
    };
  }
}

const callSfuUri = (
  ctx: VcContext,
  call: SFUJoinCall,
  token: string | undefined | null,
  options: Partial<SFUCallOptions>,
): string => {
  const q: {
    support: string;
    debug?: boolean;
    t?: string;

    client_name?: string;
    displayName?: string;

    client_version?: string;
    videoOnly?: boolean;
    audioOnly?: boolean;
    esn?: string;
    udata?: boolean;
    bpeerId?: string;
    rsrc?: string;
    xkey?: string; // public key
    sp?: boolean;
  } = {
    support: ctx.support.hash,
  };

  const debug = device.isImplements(Feature.LOCAL_STORAGE)
    ? device.localStorage.getItem("pvc:debug") === "true"
    : false;
  if (debug) {
    q.debug = true;
  }

  if (typeof token === "string" && token.length > 0) {
    q.t = token;
  }

  if (options.test) {
    q.t = "test";
  }

  if (options.clientName != null) {
    q.client_name = options.clientName;
  }

  if (options.displayName != null) {
    q.displayName = options.displayName;
  }

  if (options.clientVersion != null) {
    q.client_version = options.clientVersion;
  }

  if (options.sendAudio === false && options.sendVideo !== false) {
    q.videoOnly = true;
  } else if (options.sendAudio !== false && options.sendVideo === false) {
    q.audioOnly = true;
  }

  // xcode options
  if (options.rsrc != null) {
    q.rsrc = options.rsrc;
  }
  if (options.xkey != null) {
    q.xkey = options.xkey;
  }
  if (options.bpeerId != null) {
    q.bpeerId = options.bpeerId;
  }

  if (options.suspendPlay) {
    q.sp = true;
  }

  // can be deprecated after
  if (options.user != null) {
    q.udata = true;
  }

  return `${call.sfu.uri}/v3/${call.id}?${querystring.stringify(q)}`;
};

export default async (
  ctx: VcContext,
  id: string,
  joinUrl: string,
  options: Partial<SFUCallOptions>,
  callInstance: CallAPI,
  sfuJoinParams: Join | null = null,
): Promise<Call> => {
  ctx.logger.debug("joining call", { id, options: options as any });
  // Avoid mutating provided args
  const callOptions = { ...options };
  const opts: Partial<RequestOptions> = {
    method: "post",
    callId: id,
    failoverUrls: options.failoverUrls,
  };

  if (callOptions.auth != null) {
    opts.auth = callOptions.auth;
  }

  if (callOptions.authOptions != null) {
    opts.auth = new Authorization(callOptions.authOptions);
  }
  if (callOptions.clientName == null) {
    callOptions.clientName = "";
  }
  if (callOptions.clientVersion == null) {
    callOptions.clientVersion = "";
  }

  opts.query = {};

  const context = contextId();

  if (context != null) {
    opts.query.contextId = context;
  }

  const instance = instanceId();

  if (instance != null) {
    opts.query.instanceId = instance;
  }

  if (callOptions.sfu?.id != null) {
    opts.query.id = callOptions.sfu?.id;
  }
  if (callOptions.sfu?.region != null) {
    opts.query.region = callOptions.sfu?.region;
  }
  if (callOptions.sfu?.version != null) {
    opts.query.version = callOptions.sfu?.version;
  }
  if (callOptions.sfu?.pool != null) {
    opts.query.pool = callOptions.sfu?.pool;
  }

  const body: SFUCallBody = {
    sfu: {
      msVersion: [3],
    },
  };

  if (callOptions.sfu != null) {
    body.sfu = { ...callOptions.sfu, msVersion: [3] };
  }

  opts.body = JSON.stringify(body);

  const joinCall = async (joinOptions: Partial<RequestOptions> = {}): Promise<RequestResponse<Join>> => {
    ctx.logger.debug("calls-core/call:joinCall");

    try {
      const response = await request<Join>(ctx, joinUrl, {
        ...opts,
        ...joinOptions,
        warnStatuses: [404, 422],
      });

      if (response?.body?.call == null) {
        throw new Error("response is null or incorrect");
      }

      response.body.call.callUri = callSfuUri(ctx, response.body.call, response.authToken, callOptions);
      ctx.logger.debug("joinCall() response", { response: response as any });
      callInstance.emit("callJoinEndpoint");
      return response;
    } catch (err) {
      const msg = err instanceof Error ? err.message : "unknown error";
      if (err instanceof Response) {
        if (err.status === 404) {
          ctx.logger.warn("sfu unavailable", {
            id,
            joinUrl,
            err: msg,
          });
        } else if (err.status === 503) {
          ctx.logger.error("unable to join a call - no sfus available", {
            id,
            joinUrl,
            err: msg,
          });
        }
      } else {
        ctx.logger.warn("unable to get sfu uri", {
          id,
          joinUrl,
          err: msg,
        });
      }

      throw err;
    }
  };

  let sfuParams: Join;
  if (sfuJoinParams?.call != null) {
    sfuJoinParams.call.callUri = callSfuUri(ctx, sfuJoinParams.call, sfuJoinParams.user.authorizeToken, callOptions);
    sfuParams = sfuJoinParams;
  } else {
    sfuParams = (await joinCall()).body;
  }

  return new Call(extendContext(ctx, Call), sfuParams, joinCall, callOptions);
};
