import { Injectable } from '@angular/core';
import Pusher, * as PusherTypes from 'pusher-js';
import { Observable, ReplaySubject, Subject } from 'rxjs';
import { take } from 'rxjs/operators';

import { ConfigService } from '@app/core/config.service';
import { AppSyncNotificationService } from '@app/shared/app-sync-notification-service';

const PUSHER_SUBSCRIPTION_SUCCESS_EVENT = 'pusher:subscription_succeeded';

@Injectable({
  providedIn: 'root',
})
export class RealtimeCommunicationService {
  private _pusher: Pusher;
  private readonly configService: ConfigService;

  constructor(configService: ConfigService, private appSyncNotificationService: AppSyncNotificationService) {
    this.configService = configService;
  }

  get pusher(): Pusher {
    this._pusher ??= new Pusher(this.configService.json.pusherKey, { forceTLS: true });
    return this._pusher;
  }

  /**
   * @deprecated use getResponse
   */
  subscribe(channelName: string): PusherTypes.Channel {
    return this.pusher.subscribe(channelName);
  }

  getResponse<T>(channelName: string, eventName: string): Observable<T> {
    const response$ = new Subject<T>();
    let channel: PusherTypes.Channel;
    const onConnectCallback = (): void => {
      this.pusher.connection.unbind('connected', onConnectCallback);

      channel = this.pusher.subscribe(channelName);
      channel.bind(eventName, (response: T) => {
        response$.next(response);
        channel.unbind(eventName);
      });
    };
    if (this.pusher.connection.state === 'connected') {
      onConnectCallback();
    } else {
      this.pusher.connection.bind('connected', onConnectCallback);
    }

    return response$.asObservable().pipe(take(1));
  }

  getAppSyncResponse<T>(resource: string, action: string, channelName: string): Observable<T> {
    const response$ = new Subject<T>();
    this.appSyncNotificationService.notificationSubscription(resource, action, channelName);
    this.appSyncNotificationService.notifications$.subscribe((data: T) => {
      response$.next(data);
    });
    return response$.asObservable().pipe(take(1));
  }

  getResponseAndSubscription<T>(channelName: string, eventName: string): [Observable<T>, Observable<void>] {
    const response$ = new Subject<T>();
    const subscribed$ = new ReplaySubject<void>();

    const onConnectCallback = () => {
      this.getChannelSubscription(response$, subscribed$, channelName, eventName);
      this.pusher.connection.unbind('connected', onConnectCallback);
    };

    if (this.pusher.connection.state === 'connected') {
      onConnectCallback();
    } else {
      this.pusher.connection.bind('connected', onConnectCallback);
    }

    return [response$.asObservable().pipe(take(1)), subscribed$.asObservable().pipe(take(1))];
  }

  private getChannelSubscription(
    response$: Subject<any>,
    subscribed$: ReplaySubject<void>,
    channelName: string,
    eventName: string,
  ): void {
    const channel = this.pusher.subscribe(channelName);
    const onSubscribedCallback = () => {
      subscribed$.next();
      channel.unbind(PUSHER_SUBSCRIPTION_SUCCESS_EVENT, onSubscribedCallback);
    };

    if (channel.subscribed) {
      onSubscribedCallback();
    } else {
      channel.bind(PUSHER_SUBSCRIPTION_SUCCESS_EVENT, onSubscribedCallback);
    }

    channel.bind(eventName, (response: any) => {
      response$.next(response);
      channel.unbind(eventName);
    });
  }
}
