const WebSocket = require('ws') const config = require('@config/index') const DB = require('@libs/database') const { msgSplit } = require('./parser') const event = require('@root/event') // const _ = require('lodash') class WS { constructor () { /** @type {WebSocket} */ this.ws = null // cache struct // cache = { // [channel]: { // time: timestamp, // list: [] // } // } this.cache = {} this.cmdQueue = [] this.tick = null this.join = [] this.wait = 0 event.on('twitchJoin', (channel) => { // this.joinChannel(channel) this.addCmdQueue(this.joinChannel, [channel]) }) event.on('twitchSend', data => { if (!('msg' in data) || typeof data.msg !== 'string') return if (!('channel' in data) || typeof data.channel !== 'string') return this.queueMsg(data.channel, data.msg) }) } runBot () { if (!config.twitch.chat_host || !config.twitch.bot_oauth) return if (this.ws !== null) return this.ws = new WebSocket(`wss://${config.twitch.chat_host}:443`, 'irc') this.ws.on('open', this.handleOpen.bind(this)) this.ws.on('message', this.handleMessage.bind(this)) this.ws.on('error', this.handleError.bind(this)) this.ws.on('close', this.handleExit.bind(this)) } handleError (err) { console.log(err) clearInterval(this.tick) this.ws = null this.join = [] this.runBot() } handleExit (code) { clearInterval(this.tick) this.ws = null this.join = [] this.runBot() } handleMessage (data) { // socket message convert to string let d = data.toString() // split message with \n // filter array remove empty element // replace \r to empty let darr = d.split(/\n/).filter(t => t).map(t => t.replace(/\r$/, '')) for (let i in darr) { // parse message console.log('IRC::: > ', darr[i]) msgSplit(this.ws, darr[i]).then(() => { /* pass */ }) } } async handleOpen () { this.tick = setInterval(this.runTick.bind(this), 1000) // from pool get db connection let db = await DB.connect() // login to twitch irc this.ws.send('PASS ' + config.twitch.bot_oauth) this.ws.send('NICK mtfosbot') this.ws.send('CAP REQ :twitch.tv/membership :twitch.tv/commands') // 取得要加入的頻道列表 let text = `select "name" from "public"."twitch_channel" where "join" = true` let result = await db.query({ text }) // release pool connection await db.release() if (result !== null && result.rowCount > 0) { for (let row of result.rows) { if ('name' in row) { // this.joinChannel(row.name) this.addCmdQueue(this.joinChannel, [row.name]) // this.ws.send('JOIN #' + row.name) // this.join.push(row.name) } } } } queueMsg (channel, msg) { if (typeof channel !== 'string' || channel.trim().length === 0) return if (typeof msg !== 'string' || msg.trim().length === 0) return if (channel in this.cache) { if ('list' in this.cache[channel]) { this.cache[channel].list = [...this.cache[channel].list, msg] } else { this.cache[channel].list = [msg] } } else { this.cache[channel] = { time: -1, list: [msg] } } } async reJoinChannel () { this.leaveAllChannel() // from pool get db connection let db = await DB.connect() // 取得要加入的頻道列表 let text = `select "name" from "public"."twitch_channel" where "join" = true` let result = await db.query({ text }) // release pool connection await db.release() if (result !== null && result.rowCount > 0) { for (let row of result.rows) { if ('name' in row) { // this.joinChannel(row.name) this.addCmdQueue(this.joinChannel, [row.name]) // this.ws.send('JOIN #' + row.name) // this.join.push(row.name) } } } } leaveAllChannel () { for (let i of this.join) { this.addCmdQueue(this.leaveChannel, [i]) } } addCmdQueue (act = null, arg = []) { if (act === null || typeof act !== 'function') return if (!Array.isArray(arg)) return act = act.bind(this) this.cmdQueue.push({act, arg}) } runTick () { this.wait++ if (this.wait > 1800) { this.wait = 0 this.reJoinChannel().then(() => {}).catch(() => {}) } try { if (this.cmdQueue.length > 0 && this.ws) { let tmp = this.cmdQueue.shift() if ('act' in tmp && typeof tmp.act === 'function' && 'arg' in tmp && Array.isArray(tmp.arg)) { tmp.act(...tmp.arg) } } } catch (err) { console.log('tick error :::: ', err) } if (this.cache === null || typeof this.cache !== 'object') return let time = Date.now() for (let i in this.cache) { if ('time' in this.cache[i] && this.cache[i].time <= (time - 1500)) { if ('list' in this.cache[i] && Array.isArray(this.cache[i].list) && this.cache[i].list.length > 0) { let msg = this.cache[i].list[0] this.cache[i].list = [...this.cache[i].list.slice(1)] this.sendMsg(i, msg) } this.cache[i].time = time } else { if ('list' in this.cache[i] && Array.isArray(this.cache[i].list) && this.cache[i].list.length > 0) { let msg = this.cache[i].list[0] this.cache[i].list = [...this.cache[i].list.slice(1)] this.sendMsg(i, msg) } this.cache[i].time = time } } } sendMsg (channel = null, message = null) { if (this.ws === null || !('send' in this.ws) || typeof this.ws.send !== 'function') return null if (channel === null || typeof channel !== 'string' || channel.trim().length === 0) return null if (message === null || typeof message !== 'string' || message.trim().length === 0) return null if (this.join.indexOf(channel) === -1) return null console.log(`IRC::: < ${`PRIVMSG #${channel} :${message}`}`) this.ws.send(`PRIVMSG #${channel} :${message}`) } joinChannel (channel = null) { if (this.ws === null || !('send' in this.ws) || typeof this.ws.send !== 'function') return null if (channel === null || typeof channel !== 'string' || channel.trim().length === 0) return null if (this.join.indexOf(channel) !== -1) return null console.log(`IRC::: < ${`JOIN #${channel.trim()}`}`) this.ws.send(`JOIN #${channel.trim()}`) this.join.push(channel.trim()) } leaveChannel (channel = null) { if (this.ws === null || !('send' in this.ws) || typeof this.ws.send !== 'function') return null if (channel === null || typeof channel !== 'string' || channel.trim().length === 0) return null console.log(`IRC::: < ${`PART #${channel.trim()}`}`) this.ws.send(`PART #${channel.trim()}`) let idx = this.join.indexOf(channel) this.join.splice(idx, 1) } } module.exports = new WS()