import { Injectable } from '@angular/core';
import { environment } from 'src/environments/environment';
import { webSocket } from 'rxjs/webSocket';
import { BehaviorSubject, Observable, Observer } from 'rxjs';
import { HttpTransportType, HubConnection, HubConnectionBuilder, LogLevel } from '@microsoft/signalr';
import { socket } from '../models/socketModel';
import { UserStateService } from '../_common/services/user-state.service';

@Injectable({
  providedIn: 'root'
})
export class WebsocketService {
  Queue$: BehaviorSubject<any> = new BehaviorSubject<any>(null)
  ArchiveQueue$:BehaviorSubject<any>=new BehaviorSubject<any>(null)
  private _hubConnection: HubConnection;
  socket$: Observable<socket>
  connectionObj: socket;
  constructor(private userStateService: UserStateService) { }
  connect(slt: string) {
    const access_token = this.userStateService.accessToken$.getValue()
    this._hubConnection = new HubConnectionBuilder().withUrl(environment.wss + `/queue?slt=${slt}`, {
      accessTokenFactory: () => access_token
      , transport: HttpTransportType.WebSockets
    })
      .build();

    return this.socket$ = new Observable<socket>(subscriber => {
this._hubConnection.onclose((close)=>{
  this.connectionObj.isConntected=false,
  this.connectionObj.id=''
  
  subscriber.next(this.connectionObj)
  subscriber.complete()
})
      this._hubConnection.on('RecievedId', (message) => {

        this.connectionObj = {
          id: message,
          isConntected: true,
          messageQueue: this.Queue$,
          archiveQueue:this.ArchiveQueue$
        }
        subscriber.next(this.connectionObj)
      })
      this._hubConnection.on('SendStream', (stream) => {
        console.log(stream)
        var streamObj = JSON.parse(stream || {})
        this.Queue$.next(streamObj)
        this._hubConnection.send('RequestQueued',streamObj)
      })
      this._hubConnection.on('SyncArchiveResponse', (stream) => {
        this.SyncArchiveInternal(stream)
      })
      this._hubConnection.start()
        .then(() => { console.log('Hub connected') })
        .catch((error) => {
          console.log('error' + error)
          subscriber.error(error)
        })

    })

  }
  private PublishToQueue(msg: any) {
    if (msg.data != null) {
      const stream = JSON.parse(msg.data) || {}
      this.Queue$.next(stream)
    }

  }
  public Close(id:string) {
    this._hubConnection.invoke('CloseStream', id).then((x) => {
      if(x){
        this.Queue$.next(null)
        // this.Queue$.complete()
        this._hubConnection.stop()
      }
      
    }).catch((error)=>{
   
    });

  }
  public Action(id:string, action:string){
    return this._hubConnection.invoke('Action',id,action)
  }
  public SyncArchive(){
    return this._hubConnection.invoke('SyncArchiveRequest').then((x)=>{
      this.SyncArchiveInternal(x)
    })
  }
  private SyncArchiveInternal(stream:any){
    if(stream){
      var streamObj = JSON.parse(stream || {})
      this.ArchiveQueue$.next(streamObj)
    }
  }
}
