add twitch stream notify

This commit is contained in:
Jay 2018-06-28 17:37:33 +08:00
parent e2eeef9d82
commit 171df6193b
13 changed files with 552 additions and 116 deletions

View File

@ -1,9 +1,7 @@
const cron = require('cron')
const DB = require('./libs/database')
const fbParser = require('./libs/facebook-pageparse')
const {
pushMessage
} = require('./libs/line-message')
const api = require('./libs/api-action')
new cron.CronJob({ // eslint-disable-line
cronTime: '00 */2 * * * *',
@ -29,72 +27,173 @@ new cron.CronJob({ // eslint-disable-line
const runCheckPage = async function (t) {
let db = await DB.connect()
fbParser.getLastPost(t.page)
.then(async (data) => {
let n = Math.floor(Date.now() / 1000)
if (t.post === data.id || data.time < (n - 10 * 60)) {
return
}
t.post = data.id
try {
let text = `update "public"."facebook_page" set "lastpost" = $1, "mtime" = now() where "id" = $2`
let values = [data.id, t.page]
await db.query({
text,
values
})
} catch (err) {
console.log(err)
}
let rt = null
try {
let text = `select rt."facebook" as page, rt."tmpl" as tmpl, rt."line" as group
from "public"."line_fb_rt" rt
left join "public"."line_group" line
on line."id" = rt."line"
where
line."notify" = true
and rt."facebook" = $1`
let values = [t.page]
rt = await db.query({
text,
values
})
} catch (err) {
console.log(err)
}
if (rt === null || rt.rowCount === 0) {
return
}
let data = null
try {
data = await fbParser.getLastPost(t.page)
} catch (err) {
db.release()
}
if (data === null) {
db.release()
return
}
for (let i in rt.rows) {
let tmp = rt.rows[i]
let msg = tmp.tmpl || ''
if (typeof msg !== 'string' || msg.trim().length === 0) {
msg = `${data.txt}\n${data.link}`
} else {
msg = msg.replace(/{link}/, data.link).replace(/{txt}/, data.txt).replace(/\\n/, '\n')
}
let n = Math.floor(Date.now() / 1000)
if (t.post === data.id || data.time < (n - 10 * 60)) {
db.release()
return
}
t.post = data.id
try {
let text = `update "public"."facebook_page" set "lastpost" = $1, "mtime" = now() where "id" = $2`
let values = [data.id, t.page]
await db.query({
text,
values
})
} catch (err) {
console.log(err)
}
pushMessage(tmp.group, msg).then(() => {}).catch(() => {})
}
})
.then(() => {
db.release()
})
.catch(() => {
db.release()
let rt = null
try {
let text = `select rt."facebook" as page, rt."tmpl" as tmpl, rt."line" as group
from "public"."line_fb_rt" rt
left join "public"."line_group" line
on line."id" = rt."line"
where
line."notify" = true
and rt."facebook" = $1`
let values = [t.page]
rt = await db.query({
text,
values
})
} catch (err) {
console.log(err)
}
if (rt === null || rt.rowCount === 0) {
db.release()
return
}
for (let i in rt.rows) {
let tmp = rt.rows[i]
let msg = tmp.tmpl || ''
if (typeof msg !== 'string' || msg.trim().length === 0) {
msg = `${data.txt}\n${data.link}`
} else {
msg = msg.replace(/{link}/, data.link).replace(/{txt}/, data.txt).replace(/\\n/, '\n')
}
api.line.pushMessage(tmp.group, msg).then(() => { }).catch(() => { })
}
db.release()
}
// register twitch streaming webhook
new cron.CronJob({ //eslint-disable-line
cronTime: '00 00 0,6,12,18 * * *',
cronTime: '*/20 * * * * *',
onTick: async () => {
let db = await DB.connect()
let text = `select "id" from "public"."twitch_channel"`
try {
let twch = await db.query({
text
})
console.log('check twitch channel number :::: ', twch.rowCount)
let ids = twch.rows.map(t => t.id)
let streams = await api.twitch.getUserStream(ids)
if (streams !== null && Array.isArray(streams)) {
streams.forEach(t => {
sendStreamNotify(t).then(() => {}).catch(() => {})
})
}
} catch (err) {
console.log(err)
db.release()
return
}
db.release()
},
runOnInit: true,
start: true,
timeZone: 'Asia/Taipei'
})
const sendStreamNotify = async (streamer = null) => {
console.log(streamer)
if (streamer === null || typeof streamer !== 'object' || !('user_id' in streamer) || !('id' in streamer)) return null
let db = await DB.connect()
try {
let result = await db.query({
text: `select "id" from "public"."twitch_channel" where "id" = $1 and "laststream" = $2`,
values: [streamer.user_id, streamer.id]
})
if (result.rowCount > 0) {
db.release()
return
}
} catch (err) {
console.log(err)
db.release()
return
}
try {
let text = `update "public"."twitch_channel" set "laststream" = $1, "mtime" = now() where "id" = $2`
let values = [streamer.id, streamer.user_id]
await db.query({
text,
values
})
} catch (err) {
console.log(err)
}
let rt = null
try {
let text = `select rt."tmpl" as tmpl, line."id" as group, twitch."laststream" as laststream, twitch."name" as user from "public"."line_twitch_rt" rt
left join "public"."line_group" line
on line."id" = rt."line"
left join "public"."twitch_channel" twitch
on twitch."id" = rt."twitch"
where
twitch."id" = $1
and line."notify" = true`
let values = [streamer.user_id]
rt = await db.query({
text,
values
})
} catch (err) {
console.log(err)
db.release()
return
}
if (rt === null || rt.rowCount === 0) {
db.release()
return
}
let twLink = 'https://twitch.tv/'
for (let i in rt.rows) {
let tmp = rt.rows[i]
let link = twLink + tmp.user
let msg = tmp.tmpl || ''
if (typeof msg !== 'string' || msg.trim().length === 0) {
msg = `${streamer.title || ''}\n${link}`
} else {
msg = msg.replace(/{link}/, link).replace(/{txt}/, streamer.title).replace(/\\n/, '\n')
}
api.line.pushMessage(tmp.group, msg).then(() => { }).catch(() => { })
}
db.release()
}

View File

@ -1,6 +1,8 @@
{
"versions":[
{"file": "main.sql", "version": 1}
{"file": "main.sql", "version": 1},
{"file": "20180628-1.sql", "version": 2},
{"file": "20180628-2.sql", "version": 3}
],
"test": []
}

View File

@ -1,5 +1,6 @@
module.exports = {
port: process.env.NODE_PORT || 10230,
url: process.env.HOST_URL || '',
line: {
secret: process.env.LINE_SECRET || '',
access: process.env.LINE_ACCESS || ''

7
libs/api-action/index.js Normal file
View File

@ -0,0 +1,7 @@
const twitch = require('./twitch')
const line = require('./line')
module.exports = {
twitch,
line
}

56
libs/api-action/line.js Normal file
View File

@ -0,0 +1,56 @@
const axios = require('axios')
const config = require('../../config')
const client = axios.create({
baseURL: 'https://api.line.me/v2/bot',
headers: {
Authorization: `Bearer ${config.line.access}`
}
})
const pushMessage = async (target, message = '') => {
if (typeof target !== 'string' || target.trim().length === 0) return
if (typeof message !== 'string' || message.trim().length === 0) return
let data = {
to: target,
messages: [
{
type: 'text',
text: message
}
]
}
let opts = {
method: 'post',
url: '/message/push',
data
}
await client(opts)
}
const replyMessage = async (replyToken, message) => {
let url = '/message/reply'
let opts = {
method: 'post',
url,
data: {
replyToken,
messages: [
{
type: 'text',
text: message
}
]
}
}
await client(opts)
}
module.exports = {
pushMessage,
replyMessage
}

98
libs/api-action/twitch.js Normal file
View File

@ -0,0 +1,98 @@
const axios = require('axios')
const config = require('../../config')
const client = axios.create({
baseURL: 'https://api.twitch.tv/helix',
headers: {
'Client-ID': config.twitch.clientid
}
})
const querystring = require('querystring')
/**
* get twitch user id by login name
* @param {string} name login name
*/
const getUserIDByName = async (name = '') => {
if (typeof name !== 'string' || name.trim().length === 0) return null
name = name.trim()
let url = '/users'
let params = {
login: name
}
let res = await client({
url,
method: 'get',
params
})
if (!('data' in res) || !('data' in res.data) || !Array.isArray(res.data.data) || res.data.data.length === 0) return null
let userInfo = res.data.data[0]
if (!('id' in userInfo) || !('login' in userInfo)) return null
return {
id: userInfo.id,
login: userInfo.login
}
}
const registerWebHook = async (uid = '', type = null) => {
if (!typeof uid === 'string' || uid.trim().length === 0) return null
uid = uid.trim()
let topic = null
switch (type) {
case 'live':
topic = `https://api.twitch.tv/helix/streams?user_id=${encodeURIComponent(uid)}`
break
}
if (topic === null) return null
let url = '/webhooks/hub'
let data = {
'hub.mode': 'subscribe',
'hub.topic': topic,
'hub.callback': `${config.url.replace(/\/$/, '')}/twitch/webhook?uid=${encodeURIComponent(uid)}&type=${encodeURIComponent(type)}`,
'hub.lease_seconds': 864000,
'hub.secret': config.twitch.subsecret
}
try {
await client({
method: 'post',
data,
url
})
} catch (err) {
console.log(err)
}
return null
}
const getUserStream = async (uids = []) => {
if (!uids || !Array.isArray(uids) || uids.length === 0) return null
let res = []
while (uids.length > 0) {
let tmp = uids.splice(0, 50)
let url = '/streams'
let params = {
user_id: [...tmp]
}
try {
let result = await client({
method: 'get',
url,
params,
paramsSerializer: function (params) {
return querystring.stringify(params)
}
})
if (!('data' in result) || !('data' in result.data)) continue
res = [...result.data.data]
} catch (err) {
console.log(err)
}
}
return res
}
module.exports = {
registerWebHook,
getUserIDByName,
getUserStream
}

View File

@ -70,9 +70,14 @@ const getLastPost = async (pageid = '') => {
if (m !== null && m.length > 1) {
id = m[1]
}
} else if (/\/videos\/(\d+)/.test(link)) {
let m = link.match(/\/videos\/(\d+)/)
if (m !== null && m.length > 1) {
id = m[1]
}
}
}
console.log(time, link, txt, id)
// console.log(time, link, txt, id)
if (!time || !link || !id) return null
let tmp = {
txt,

View File

@ -1,3 +1,4 @@
const api = require('../../api-action')
/**
* add group to database
* @param {string} txt command body format => groupName notifyEnable(0,1)
@ -45,7 +46,7 @@ const addPage = async (txt, source = {}, db) => {
if (!('type' in source) || !('groupId' in source) || !('userId' in source)) return null
let {groupId, userId} = source
let arr = txt.split(' ')
if (arr.length < 3) return null
if (arr.length < 2) return null
let page = arr[0]
let tmpl = arr.slice(1).join(' ')
let text = `select "id","owner" from "public"."line_group" where "id" = $1`
@ -113,7 +114,167 @@ const addPage = async (txt, source = {}, db) => {
return { reply }
}
/**
* del facebook page notify from group
* @param {string} txt command body format => pageid tmpl
* @param {object} source
* @param {object} db
*/
const delPage = async (txt = '', source = {}, db) => {
if (!db) return null
if (!('type' in source) || !('groupId' in source) || !('userId' in source)) return null
let {groupId, userId} = source
let arr = txt.split(' ')
if (arr.length < 1) return null
let page = arr[0]
let text = `select "id","owner" from "public"."line_group" where "id" = $1`
let values = [groupId]
let result = await db.query({
text,
values
})
if (result.rowCount === 0) {
let reply = 'group not register'
return { reply }
}
if (result.rows[0].owner !== userId) {
let reply = 'not owner'
return { reply }
}
text = `select count(rt.*) as c from "public"."line_fb_rt" rt
left join "public"."facebook_page" fb
on fb."id" = rt."facebook"
where
fb."id" = $1`
values = [page]
let count = await db.query({
text,
values
})
if (count.rowCount === 0 || count.rows[0].c === 0) return null
text = `select rt.* as c from "public"."line_fb_rt" rt
left join "public"."facebook_page" fb
on fb."id" = rt."facebook"
where
fb."id" = $1
and rt."line" = $2`
values = [page, groupId]
let rt = await db.query({
text,
values
})
if (rt.rowCount === 0) return null
if (rt.rowCount == 1 && count.rows[0].c == 1) { // eslint-disable-line
let text = `delete from "public"."facebook_page" where "id" = $1`
let values = [page]
await db.query({
text,
values
})
} else {
let text = `delete from "public"."line_fb_rt" where "line" = $1 and "facebook" = $2`
let values = [groupId, page]
await db.query({
text,
values
})
}
return {
reply: 'delete page notify success'
}
}
const addTwitch = async (txt = '', source = {}, db) => {
if (!db) return null
if (!('type' in source) || !('groupId' in source) || !('userId' in source)) return null
let {groupId, userId} = source
let arr = txt.split(' ')
if (arr.length < 3) return null
let name = arr[0]
let type = arr[1]
let tmpl = arr.slice(2).join(' ')
if (['live'].indexOf(type) < 0) return null
let text = `select "id","owner" from "public"."line_group" where "id" = $1`
let values = [groupId]
let result = await db.query({
text,
values
})
if (result.rowCount === 0) {
let reply = 'group not register'
return { reply }
}
if (result.rows[0].owner !== userId) {
let reply = 'not owner'
return { reply }
}
let twitchUser = await api.twitch.getUserIDByName(name)
if (twitchUser === null) return null
// check channel count
text = `select "id" from "public"."twitch_channel" where "id" = $1`
values = [twitchUser.id]
let twch = await db.query({
text,
values
})
if (twch.rowCount === 0) {
let text = `insert into "public"."twitch_channel" ("id", "name", "ctime", "mtime") values
($1, $2, now(), now())`
let values = [twitchUser.id, twitchUser.login]
await db.query({
text,
values
})
text = `insert into "public"."line_twitch_rt" ("line", "twitch", "tmpl", "type") values
($1, $2, $3, $4)`
values = [groupId, twitchUser.id, tmpl, type]
await db.query({
text,
values
})
} else {
let text = `select rt.* from "public"."line_twitch_rt" rt
left join "public"."twitch_channel" twitch
on twitch."id" = rt."twitch"
left join "public"."line_group" line
on line."id" = rt."line"
where
line."id" = $1
and twitch."id" = $2
and rt."type" = $3`
let values = [groupId, twitchUser.id, type]
let rt = await db.query({
text,
values
})
if (rt.rowCount === 0) {
let text = `insert into "public"."line_twitch_rt" ("line", "twitch", "tmpl", "type") values
($1, $2, $3, $4)`
let values = [groupId, twitchUser.id, tmpl, type]
await db.query({
text,
values
})
}
}
// api.twitch.registerWebHook(twitchUser.id, type)
return {
reply: 'add channel success'
}
}
module.exports = {
addgroup: addGroup,
addpage: addPage
addpage: addPage,
delpage: delPage,
addtwitch: addTwitch
}

View File

@ -1,39 +1,7 @@
const axios = require('axios')
const config = require('../../config')
const commands = require('./commands')
const client = axios.create({
baseURL: 'https://api.line.me/v2/bot',
headers: {
Authorization: `Bearer ${config.line.access}`
}
})
const pushMessage = async (target, message = '') => {
if (typeof target !== 'string' || target.trim().length === 0) return
if (typeof message !== 'string' || message.trim().length === 0) return
let data = {
to: target,
messages: [
{
type: 'text',
text: message
}
]
}
let opts = {
method: 'post',
url: '/message/push',
data
}
await client(opts)
}
const api = require('../api-action')
const textMessage = async (evt) => {
let url = '/message/reply'
let {replyToken, source, message} = evt
if (!source || !('type' in source) || source.type !== 'group') return
if (!message || message.type !== 'text') return
@ -45,25 +13,10 @@ const textMessage = async (evt) => {
let result = await commands(text, source)
if (result === null) return
if (typeof result === 'object' && 'reply' in result) {
let opts = {
method: 'post',
url,
data: {
replyToken,
messages: [
{
type: 'text',
text: result.reply
}
]
}
}
await client(opts)
await api.line.replyMessage(replyToken, result.reply)
}
}
module.exports = {
textMessage,
pushMessage
textMessage
}

View File

@ -2,6 +2,6 @@ const Router = require('koa-router')
const r = new Router()
r.use('/line', require('./line').routes())
r.use('/twitch', require('./twitch').routes())
// r.use('/twitch', require('./twitch').routes())
module.exports = r

View File

@ -3,9 +3,11 @@ const r = new Router()
const {
getRaw
} = require('../../libs/middleware')
const config = require('../../config')
const DB = require('../../libs/database')
const api = require('../../libs/api-action')
// const config = require('../../config')
r.get('/', async (c, n) => {
r.get('/webhook', async (c, n) => {
let mode = c.query['hub.mode']
let token = c.query['hub.secret']
let challenge = c.query['hub.challenge']
@ -22,10 +24,59 @@ r.get('/', async (c, n) => {
}
})
r.post('/', getRaw, async (c, n) => {
console.log(JSON.stringify(c.request.body, null, 2))
r.post('/webhook', getRaw, async (c, n) => {
// middleware
c.db = await DB.connect()
try {
await n()
} catch (err) {
console.log(err)
}
c.db.release()
c.body = 'success'
c.status = 200
}, async (c, n) => {
console.log(JSON.stringify(c.request.body, null, 2))
if (!('data' in c.request.body) || !Array.isArray(c.request.body.data)) return
if (c.request.body.data.length === 0) return // length > 0 live, length === 0 down
let data = c.request.body.data[0]
let uid = c.query['uid'] || ''
let type = c.query['type'] || ''
if (!uid || !type) return
let text = `select rt."line" as group, rt."twitch" as twitch, rt."tmpl" as tmpl, twitch."name" as user from "public"."line_twitch_rt" rt
left join "public"."twitch_channel" twitch
on twitch."id" = rt."twitch"
left join "public"."line_group" line
on line."id" = rt."line"
where
line."notify" = true
and twitch."id" = $1
and rt."type" = $2`
let values = [uid, type]
let twch = await c.db.query({
text,
values
})
if (twch.rowCount === 0) return
let chLink = `https://twitch.tv/`
for (let i in twch.rows) {
let tmp = twch.rows[i]
let msg = tmp.tmpl || ''
let link = chLink + tmp.user
if (typeof msg !== 'string' || msg.trim().length === 0) {
msg = `${data.title}\n${link}`
} else {
msg = msg.replace(/{link}/, link).replace(/{txt}/, data.title).replace(/\\n/, '\n')
}
await api.line.pushMessage(tmp.group, msg).then(() => { }).catch(() => { })
}
})
module.exports = r

2
schema/20180628-1.sql Normal file
View File

@ -0,0 +1,2 @@
ALTER TABLE public.twitch_channel DROP type;
ALTER TABLE public.line_twitch_rt ADD type varchar(100) DEFAULT '' NOT NULL;

1
schema/20180628-2.sql Normal file
View File

@ -0,0 +1 @@
ALTER TABLE public.twitch_channel ADD laststream varchar(100) DEFAULT '' NOT NULL;