import { Injectable, OnDestroy, Inject } from '@angular/core';
import { Observable, SubscriptionLike, Subject, Observer, interval } from 'rxjs';
import { filter, map, share, distinctUntilChanged, takeWhile } from 'rxjs/operators';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';

import { IWebsocketService, IWsMessage, WebSocketConfig } from './websocket.interfaces';
import { config } from './websocket.config';
import { Router } from '@angular/router';
import { environment } from '../../../environments/environment';

@Injectable({
  providedIn: 'root',
})
export class WebsocketApiService implements IWebsocketService, OnDestroy {
  private readonly config: WebSocketSubjectConfig<IWsMessage<any>>;

  private websocketSub: SubscriptionLike;
  private statusSub: SubscriptionLike;

  private reconnection$: Observable<number>;
  private websocket$: WebSocketSubject<IWsMessage<any>>;
  private connection$: Observer<boolean>;
  private wsMessages$: Subject<IWsMessage<any>>;

  private reconnectInterval: number;
  private readonly reconnectAttempts: number;
  private isConnected: boolean;

  public status: Observable<boolean>;

  constructor(@Inject(config) public wsConfig: WebSocketConfig, private router: Router) {
    this.wsMessages$ = new Subject<IWsMessage<any>>();

    this.reconnectInterval = wsConfig.reconnectInterval || 5000;
    this.reconnectAttempts = wsConfig.reconnectAttempts || 10;

    this.config = {
      url: wsConfig.url + localStorage.getItem('token'),
      closeObserver: {
        next: (event: CloseEvent) => {
          console.log('WebSocket disconnected!');
          this.connection$.next(false);
          localStorage.removeItem('isLoggedIn');
          localStorage.removeItem('token');
          localStorage.removeItem('userId');
          localStorage.removeItem('fileId');
          this.send('exit', {}, String(+new Date()));
          router.navigate(['/auth']);
          this.ngOnDestroy();
          window.location.assign(environment.url + 'auth');
        },
      },
      openObserver: {
        next: (event: Event) => {
          console.log('WebSocket connected!');
          this.connection$.next(true);
        },
      },
    };

    // connection status
    this.status = new Observable<boolean>((observer) => {
      this.connection$ = observer;
    }).pipe(share(), distinctUntilChanged());

    // run reconnect if not connection
    this.statusSub = this.status.subscribe((isConnected) => {
      this.isConnected = isConnected;

      if (!this.reconnection$ && typeof isConnected === 'boolean' && !isConnected) {
        this.reconnect();
      }
    });
    this.websocketSub = this.wsMessages$.subscribe(
      () => {},
      (error: ErrorEvent) => console.error('WebSocket error!', error)
    );

    this.connect();
  }

  ngOnDestroy() {
    this.websocketSub.unsubscribe();
    this.statusSub.unsubscribe();
    this.websocket$.unsubscribe();
    this.websocket$.complete();
  }

  /**
   * подключение к websocket
   */
  private connect(): void {
    if (localStorage.getItem('token')) {
      this.websocket$ = new WebSocketSubject(this.config);

      this.websocket$.subscribe(
        (message) => this.wsMessages$.next(message),
        (error: Event) => {
          if (!this.websocket$) {
            this.reconnect();
          }
        }
      );
    }
  }

  /**
   * переподключение если нет соединения или ошибка
   */
  private reconnect(): void {
    this.reconnection$ = interval(this.reconnectInterval).pipe(
      takeWhile((v, index) => index < this.reconnectAttempts && !this.websocket$)
    );

    this.reconnection$.subscribe(
      () => this.connect(),
      () => {},
      () => {
        this.reconnection$ = null;

        if (!this.websocket$) {
          this.wsMessages$.complete();
          this.connection$.complete();
        }
      }
    );
  }

  /**
   * получение сообщений по event-у
   * @param event - для обновления папок или файлов
   * @param messageId - название event-а по которому слушаем сообщения от сервера
   * @param streamId - streamId
   */
  public on<T>(messageId: string, event?: string, streamId?: string): Observable<T> {
    return this.wsMessages$.pipe(
      filter((message: IWsMessage<T>) => {
        return message.hasOwnProperty('messageId') || message.hasOwnProperty('event')
          ? message.messageId === messageId || message.event === event
          : message.streamId === streamId || message.event === event;
      }),
      map((message: IWsMessage<T>) => {
        return message.data;
      })
    );
  }

  /**
   * on message на сервер
   * @param messageId - id запроса = String(+new Date())
   * @param event - название event-а по которому отправляем сообщения на сервер
   * @param data - отправляем измененные данные
   * @param action - для открытия/закрытия стрима
   * @param streamId = messageId по значению, но для стримов
   */
  public send(event: string, data: any = {}, messageId?: string, action?: string, streamId?: string): void {
    if ((event || this.isConnected) && this.websocket$) {
      messageId
        ? this.websocket$.next({ messageId, event, data } as any)
        : this.websocket$.next({ streamId, event, data, action } as any);
    } else {
      console.error('Send error!');
    }
  }
}
