import IndexedKV from "./IndexedKV";
import QueueItem from "./QueueItem";
import ChatWS from "./ChatWS";

let events = require('events');

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

class WebSocketQueue {
  cacheDb
  wsOptions
  currentWS
  onOffline
  lastProcessed = Date.now();
  events = new events.EventEmitter();
  options = {
    name: 'queue_default'
  }
  _memoryQueue = []
  ready = false

  constructor(options) {
    this.options = Object.assign(this.options, options)
    this.cacheDb = new IndexedKV({ db: 'offline_queue', table: 'items', onReady: this.init })

    this.events.on('ready', this.queueReady)
  }

  init = () => {
    this.ready = true;
    this.events.emit('ready')
  }

  queueReady = async () => {
    this.processQueue()
  }

  ws = (options) => {
    return new Promise(async (resolve, reject) => {
      try {
        this.wsOptions = options
        const cachedWs = await this.cacheDb.getItem(`queue_ws_${this.wsOptions.room}`) || null;
        this.currentWS = await new ChatWS(this.wsOptions)
        if (!cachedWs) {
          await this.cacheDb.add(`queue_ws_${this.wsOptions.room}`, this.wsOptions);
        }
        resolve();
      } catch (e) {
        reject(e)
      }

    })

  }

  setLastProcessed = () => {
    this.lastProcessed = Date.now();
  }

  wsCallback = (_id, ws, message, ms) => {
    return new Promise(async (resolve, reject) => {
      if (ms) {
        while (Date.now() <= this.lastProcessed + ms) {
          await sleep(ms);
        }
      }
      try {
        if (!this.currentWS) {
          const options = await this.cacheDb.getItem(`queue_ws_${ws}`) || false;
          if (options) {
            await this.ws(options)
            this.currentWS.send(JSON.stringify(message)).then(async () => {
              await this.remove(_id)
              this.setLastProcessed()
              resolve()
            }).catch(() => {
              if (typeof this.onOffline === 'function') {
                this.events.emit('offline')
                this.onOffline(message)
                this.setLastProcessed()
                resolve()
              }
            })
          }
        } else {
          if (this.currentWS.readyState !== 1) {
            return;
          }
          this.currentWS.send(JSON.stringify(message)).then(async () => {
            await this.remove(_id)
            this.setLastProcessed()
            resolve()
          }).catch(() => {
            console.log('Your client is offline, your message will be sent when you reconnect')
            if (typeof this.onOffline === 'function') {
              this.events.emit('offline')
              this.onOffline(message)
              this.setLastProcessed()
              resolve()
            }
          })

        }
      } catch (e) {
        console.log(e)
        reject(e)
      }
    })


  }

  add = async (args, action) => {
    if (!this.ready) {
      this._memoryQueue.push(await new QueueItem(args, action))
      return;
    }
    const item = await new QueueItem(args, action);
    await this.cacheDb.add(`queued_${item._id}`, item);
    this.processQueue()
  }

  remove = async (_id) => {
    return await this.cacheDb.removeItem(`queued_${_id}`);
  }

  processQueue = async () => {
    if (!this.ready) {
      return
    }

    await this.cacheDb.openCursor('queued_', async (item) => {
      return await this.wsCallback(item._id, item.body.ws, item.body.message, 2000)
    });

  }

}

export default WebSocketQueue
