fix: fix streaming gap (#1349)

This commit is contained in:
Nolan Lawson 2019-07-21 15:31:26 -07:00 committed by GitHub
parent 6fafe19454
commit 4d098d6b46
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 323 additions and 210 deletions

View file

@ -3,6 +3,7 @@ import { loadPolyfills } from './routes/_utils/loadPolyfills'
import './routes/_utils/serviceWorkerClient'
import './routes/_utils/historyEvents'
import './routes/_utils/loadingMask'
import './routes/_utils/forceOnline'
loadPolyfills().then(() => {
console.log('init()')

View file

@ -0,0 +1,15 @@
// TODO: should probably just keep fetching timeline items in the gap, not stop at 40
import { addStatusesOrNotifications } from '../addStatusOrNotification'
import { getTimeline } from '../../_api/timelines'
const TIMELINE_GAP_BATCH_SIZE = 40
// fill in the "streaming gap" i.e. fetch the most recent items so that there isn't
// a big gap in the timeline if you haven't looked at it in awhile
export async function fillStreamingGap (instanceName, accessToken, timelineName, firstTimelineItemId) {
let newTimelineItems = await getTimeline(instanceName, accessToken,
timelineName, null, firstTimelineItemId, TIMELINE_GAP_BATCH_SIZE)
if (newTimelineItems.length) {
addStatusesOrNotifications(instanceName, timelineName, newTimelineItems)
}
}

View file

@ -1,11 +1,16 @@
import { TimelineStream } from '../_api/TimelineStream'
import { mark, stop } from '../_utils/marks'
import { deleteStatus } from './deleteStatuses'
import { addStatusOrNotification } from './addStatusOrNotification'
import { mark, stop } from '../../_utils/marks'
import { deleteStatus } from '../deleteStatuses'
import { addStatusOrNotification } from '../addStatusOrNotification'
function processMessage (instanceName, timelineName, message) {
mark('processMessage')
const KNOWN_EVENTS = ['update', 'delete', 'notification', 'conversation']
export function processMessage (instanceName, timelineName, message) {
let { event, payload } = message
if (!KNOWN_EVENTS.includes(event)) {
console.error("don't know how to handle event", message)
return
}
mark('processMessage')
if (['update', 'notification', 'conversation'].includes(event)) {
payload = JSON.parse(payload) // only these payloads are JSON-encoded for some reason
}
@ -34,33 +39,3 @@ function processMessage (instanceName, timelineName, message) {
}
stop('processMessage')
}
export function createStream (streamingApi, instanceName, accessToken,
timelineName, onOpenStream) {
return new TimelineStream(streamingApi, accessToken, timelineName, {
onMessage (msg) {
if (
msg.event !== 'update' &&
msg.event !== 'delete' &&
msg.event !== 'notification' &&
msg.event !== 'conversation'
) {
console.error("don't know how to handle event", msg)
return
}
processMessage(instanceName, timelineName, msg)
},
onOpen () {
if (onOpenStream) {
onOpenStream()
}
console.log('opened stream for timeline', timelineName)
},
onClose () {
console.log('closed stream for timeline', timelineName)
},
onReconnect () {
console.log('reconnected stream for timeline', timelineName)
}
})
}

View file

@ -0,0 +1,52 @@
import { TimelineStream } from '../../_api/stream/TimelineStream'
import { processMessage } from './processMessage'
import { fillStreamingGap } from './fillStreamingGap'
import { store } from '../../_store/store'
export function createStream (api, instanceName, accessToken, timelineName, firstStatusId, firstNotificationId) {
console.log(`streaming ${instanceName} ${timelineName}: createStream`, 'firstStatusId', firstStatusId,
'firstNotificationId', firstNotificationId)
const fillGap = (timelineName, timelineItemId) => {
if (timelineItemId) {
console.log(`streaming ${instanceName} ${timelineName}: fillGap since`, timelineItemId)
/* no await */ fillStreamingGap(instanceName, accessToken, timelineName, timelineItemId)
}
}
const onMessage = message => {
processMessage(instanceName, timelineName, message)
}
const onOpen = () => {
console.log(`streaming ${instanceName} ${timelineName}: opened`)
fillGap(timelineName, firstStatusId)
if (timelineName === 'home') {
// special case - home timeline stream also handles notifications
fillGap('notifications', firstNotificationId)
}
}
const onClose = () => {
console.log(`streaming ${instanceName} ${timelineName}: closed`)
}
const onReconnect = () => {
console.log(`streaming ${instanceName} ${timelineName}: reconnected`)
// When reconnecting, we recompute the firstStatusId and firstNotificationId because these may have
// changed since we first started streaming.
let newFirstStatusId = store.getFirstTimelineItemId(instanceName, timelineName)
fillGap(timelineName, newFirstStatusId)
if (timelineName === 'home') {
// special case - home timeline stream also handles notifications
let newFirstNotificationId = store.getFirstTimelineItemId(instanceName, timelineName)
fillGap('notifications', newFirstNotificationId)
}
}
return new TimelineStream(api, accessToken, timelineName)
.on('message', onMessage)
.on('open', onOpen)
.on('close', onClose)
.on('reconnect', onReconnect)
}

View file

@ -1,106 +0,0 @@
import { paramsString } from '../_utils/ajax'
import noop from 'lodash-es/noop'
import WebSocketClient from '@gamestdio/websocket'
import lifecycle from 'page-lifecycle/dist/lifecycle.mjs'
function getStreamName (timeline) {
switch (timeline) {
case 'local':
return 'public:local'
case 'federated':
return 'public'
case 'home':
return 'user'
case 'notifications':
return 'user:notification'
case 'direct':
return 'direct'
}
if (timeline.startsWith('tag/')) {
return 'hashtag'
}
if (timeline.startsWith('list/')) {
return 'list'
}
}
function getUrl (streamingApi, accessToken, timeline) {
let url = `${streamingApi}/api/v1/streaming`
let streamName = getStreamName(timeline)
let params = {
stream: streamName
}
if (timeline.startsWith('tag/')) {
params.tag = timeline.split('/').slice(-1)[0]
} else if (timeline.startsWith('list/')) {
params.list = timeline.split('/').slice(-1)[0]
}
if (accessToken) {
params.access_token = accessToken
}
return url + '?' + paramsString(params)
}
export class TimelineStream {
constructor (streamingApi, accessToken, timeline, opts) {
this._streamingApi = streamingApi
this._accessToken = accessToken
this._timeline = timeline
this._opts = opts
this._onStateChange = this._onStateChange.bind(this)
this._setupWebSocket()
this._setupLifecycle()
}
close () {
this._closed = true
this._closeWebSocket()
this._teardownLifecycle()
}
_closeWebSocket () {
if (this._ws) {
this._ws.close()
this._ws = null
}
}
_setupWebSocket () {
const url = getUrl(this._streamingApi, this._accessToken, this._timeline)
const ws = new WebSocketClient(url, null, { backoff: 'fibonacci' })
ws.onopen = this._opts.onOpen || noop
ws.onmessage = this._opts.onMessage ? e => this._opts.onMessage(JSON.parse(e.data)) : noop
ws.onclose = this._opts.onClose || noop
ws.onreconnect = this._opts.onReconnect || noop
this._ws = ws
}
_setupLifecycle () {
lifecycle.addEventListener('statechange', this._onStateChange)
}
_teardownLifecycle () {
lifecycle.removeEventListener('statechange', this._onStateChange)
}
_onStateChange (event) {
if (this._closed) {
return
}
// when the page enters or exits a frozen state, pause or resume websocket polling
if (event.newState === 'frozen') { // page is frozen
console.log('frozen')
this._closeWebSocket()
} else if (event.oldState === 'frozen') { // page is unfrozen
console.log('unfrozen')
this._closeWebSocket()
this._setupWebSocket()
}
}
}

View file

@ -0,0 +1,109 @@
import WebSocketClient from '@gamestdio/websocket'
import lifecycle from 'page-lifecycle/dist/lifecycle.mjs'
import { getStreamUrl } from './getStreamUrl'
import { EventEmitter } from 'events-light'
import { eventBus } from '../../_utils/eventBus'
export class TimelineStream extends EventEmitter {
constructor (streamingApi, accessToken, timeline) {
super()
this._streamingApi = streamingApi
this._accessToken = accessToken
this._timeline = timeline
this._onStateChange = this._onStateChange.bind(this)
this._onOnlineForced = this._onOnlineForced.bind(this)
this._setupWebSocket()
this._setupEvents()
}
close () {
this._closed = true
this._closeWebSocket()
this._teardownEvents()
// events-light currently does not support removeAllListeners()
// https://github.com/patrick-steele-idem/events-light/issues/2
for (let event of ['open', 'close', 'reconnect', 'message']) {
this.removeAllListeners(event)
}
}
_closeWebSocket () {
if (this._ws) {
this.emit('close')
this._ws.onopen = null
this._ws.onmessage = null
this._ws.onclose = null
this._ws.close()
this._ws = null
}
}
_setupWebSocket () {
const url = getStreamUrl(this._streamingApi, this._accessToken, this._timeline)
const ws = new WebSocketClient(url, null, { backoff: 'fibonacci' })
ws.onopen = () => {
if (!this._opened) {
this.emit('open')
this._opened = true
} else {
// we may close or reopen websockets due to freeze/unfreeze events
// and we want to fire "reconnect" rather than "open" in that case
this.emit('reconnect')
}
}
ws.onmessage = (e) => this.emit('message', JSON.parse(e.data))
ws.onclose = () => this.emit('close')
// The ws "onreconnect" event seems unreliable. When the server goes down and comes back up,
// it doesn't fire (but "open" does). When we freeze and unfreeze, it fires along with the
// "open" event. The above is my attempt to normalize it.
this._ws = ws
}
_setupEvents () {
lifecycle.addEventListener('statechange', this._onStateChange)
eventBus.on('forcedOnline', this._onOnlineForced)
}
_teardownEvents () {
lifecycle.removeEventListener('statechange', this._onStateChange)
eventBus.removeListener('forcedOnline', this._onOnlineForced)
}
_pause () {
if (this._closed) {
return
}
this._closeWebSocket()
}
_unpause () {
if (this._closed) {
return
}
this._closeWebSocket()
this._setupWebSocket()
}
_onStateChange (event) {
// when the page enters or exits a frozen state, pause or resume websocket polling
if (event.newState === 'frozen') { // page is frozen
console.log('frozen')
this._pause()
} else if (event.oldState === 'frozen') { // page is unfrozen
console.log('unfrozen')
this._unpause()
}
}
_onOnlineForced (online) {
if (online) {
console.log('online forced')
this._unpause()
} else {
console.log('offline forced')
this._pause()
}
}
}

View file

@ -0,0 +1,43 @@
import { paramsString } from '../../_utils/ajax'
function getStreamName (timeline) {
switch (timeline) {
case 'local':
return 'public:local'
case 'federated':
return 'public'
case 'home':
return 'user'
case 'notifications':
return 'user:notification'
case 'direct':
return 'direct'
}
if (timeline.startsWith('tag/')) {
return 'hashtag'
}
if (timeline.startsWith('list/')) {
return 'list'
}
}
export function getStreamUrl (streamingApi, accessToken, timeline) {
let url = `${streamingApi}/api/v1/streaming`
let streamName = getStreamName(timeline)
let params = {
stream: streamName
}
if (timeline.startsWith('tag/')) {
params.tag = timeline.split('/').slice(-1)[0]
} else if (timeline.startsWith('list/')) {
params.list = timeline.split('/').slice(-1)[0]
}
if (accessToken) {
params.access_token = accessToken
}
return url + '?' + paramsString(params)
}

View file

@ -1,4 +1,5 @@
import { pickBy, get } from '../../_utils/lodash-lite'
import { getFirstIdFromItemSummaries } from '../../_utils/getIdFromItemSummaries'
export function timelineMixins (Store) {
Store.prototype.setForTimeline = function (instanceName, timelineName, obj) {
@ -25,6 +26,11 @@ export function timelineMixins (Store) {
return root[instanceName] || {}
}
Store.prototype.getFirstTimelineItemId = function (instanceName, timelineName) {
let summaries = this.getForTimeline(instanceName, timelineName, 'timelineItemSummaries')
return getFirstIdFromItemSummaries(summaries)
}
Store.prototype.setForCurrentTimeline = function (obj) {
let { currentInstance, currentTimeline } = this.get()
this.setForTimeline(currentInstance, currentTimeline, obj)

View file

@ -1,15 +1,11 @@
import { updateInstanceInfo, updateVerifyCredentialsForInstance } from '../../_actions/instances'
import { updateListsForInstance } from '../../_actions/lists'
import { createStream } from '../../_actions/streaming'
import { createStream } from '../../_actions/stream/streaming'
import { updatePushSubscriptionForInstance } from '../../_actions/pushSubscription'
import { updateCustomEmojiForInstance } from '../../_actions/emoji'
import { addStatusesOrNotifications } from '../../_actions/addStatusOrNotification'
import { getTimeline } from '../../_api/timelines'
import { TIMELINE_BATCH_SIZE } from '../../_static/timelines'
import { scheduleIdleTask } from '../../_utils/scheduleIdleTask'
import { mark, stop } from '../../_utils/marks'
import { store } from '../store'
import { getFirstIdFromItemSummaries } from '../../_utils/getIdFromItemSummaries'
// stream to watch for home timeline updates and notifications
let currentInstanceStream
@ -57,45 +53,19 @@ async function refreshInstanceData (instanceName) {
}
function stream (store, instanceName, currentInstanceInfo) {
let homeTimelineItemSummaries = store.getForTimeline(instanceName,
'home', 'timelineItemSummaries')
let firstHomeTimelineItemId = getFirstIdFromItemSummaries(homeTimelineItemSummaries)
let notificationItemSummaries = store.getForTimeline(instanceName,
'notifications', 'timelineItemSummaries')
let firstNotificationTimelineItemId = getFirstIdFromItemSummaries(notificationItemSummaries)
let { accessToken } = store.get()
let streamingApi = currentInstanceInfo.urls.streaming_api
let firstStatusId = store.getFirstTimelineItemId(instanceName, 'home')
let firstNotificationId = store.getFirstTimelineItemId(instanceName, 'notifications')
function onOpenStream () {
if (currentInstanceChanged(store, instanceName)) {
return
}
/* no await */ fillGap(instanceName, accessToken, 'home', firstHomeTimelineItemId)
/* no await */ fillGap(instanceName, accessToken, 'notifications', firstNotificationTimelineItemId)
}
currentInstanceStream = createStream(streamingApi, instanceName, accessToken, 'home', onOpenStream)
currentInstanceStream = createStream(streamingApi, instanceName, accessToken, 'home',
firstStatusId, firstNotificationId)
if (process.env.NODE_ENV !== 'production') {
window.currentInstanceStream = currentInstanceStream
}
}
// fill in the "streaming gap" i.e. fetch the most recent 20 items so that there isn't
// a big gap in the timeline if you haven't looked at it in awhile
async function fillGap (instanceName, accessToken, timelineName, firstTimelineItemId) {
if (!firstTimelineItemId) {
return
}
let newTimelineItems = await getTimeline(instanceName, accessToken,
timelineName, null, firstTimelineItemId, TIMELINE_BATCH_SIZE)
if (newTimelineItems.length) {
addStatusesOrNotifications(instanceName, timelineName, newTimelineItems)
}
}
export function instanceObservers () {
store.observe('currentInstance', async (currentInstance) => {
if (!process.browser) {

View file

@ -1,10 +1,6 @@
import { updateInstanceInfo } from '../../_actions/instances'
import { createStream } from '../../_actions/streaming'
import { getTimeline } from '../../_api/timelines'
import { addStatusesOrNotifications } from '../../_actions/addStatusOrNotification'
import { TIMELINE_BATCH_SIZE } from '../../_static/timelines'
import { createStream } from '../../_actions/stream/streaming'
import { store } from '../store'
import { getFirstIdFromItemSummaries } from '../../_utils/getIdFromItemSummaries'
export function timelineObservers () {
// stream to watch for local/federated/etc. updates. home and notification
@ -60,27 +56,12 @@ export function timelineObservers () {
return
}
let timelineItemSummaries = store.getForTimeline(currentInstance,
currentTimeline, 'timelineItemSummaries')
let firstTimelineItemId = getFirstIdFromItemSummaries(timelineItemSummaries)
let onOpenStream = async () => {
if (!firstTimelineItemId || !currentTimelineIsUnchanged()) {
return
}
// fill in the "streaming gap" i.e. fetch the most recent 20 items so that there isn't
// a big gap in the timeline if you haven't looked at it in awhile
let newTimelineItems = await getTimeline(currentInstance, accessToken,
currentTimeline, null, firstTimelineItemId, TIMELINE_BATCH_SIZE)
if (newTimelineItems.length) {
addStatusesOrNotifications(currentInstance, currentTimeline, newTimelineItems)
}
}
let firstStatusId = store.getFirstTimelineItemId(currentInstance, currentTimeline)
let { currentInstanceInfo } = store.get()
let streamingApi = currentInstanceInfo.urls.streaming_api
currentTimelineStream = createStream(streamingApi, currentInstance, accessToken,
currentTimeline, onOpenStream)
currentTimeline, firstStatusId)
if (process.env.NODE_ENV !== 'production') {
window.currentTimelineStream = currentTimelineStream

View file

@ -80,8 +80,3 @@ observers(store)
if (process.browser && process.env.NODE_ENV !== 'production') {
window.store = store // for debugging
}
// needed for tests
if (process.browser) {
window.__forceOnline = online => store.set({ online })
}

View file

@ -1,6 +1,6 @@
import EventEmitter from 'events-light'
const eventBus = new EventEmitter()
export const eventBus = new EventEmitter()
if (process.browser && process.env.NODE_ENV !== 'production') {
window.eventBus = eventBus

View file

@ -0,0 +1,20 @@
import { store } from '../_store/store'
import { emit } from './eventBus'
// Force online/offline state. Needed for integration tests.
// It would be nice not to actually ship this in production, but *shrug*
if (process.browser) {
const globalFetch = window.fetch
window.__forceOnline = online => {
store.set({ online })
if (online) {
window.fetch = globalFetch
emit('forcedOnline', true)
} else {
window.fetch = () => Promise.reject(new Error('force offline'))
emit('forcedOnline', false)
}
}
}

View file

@ -1,6 +1,8 @@
import { loginAsFoobar } from '../roles'
import {
getNthStatus, homeNavButton, localTimelineNavButton, sleep
forceOffline,
forceOnline,
getNthStatus, homeNavButton, localTimelineNavButton, notificationBadge, notificationsNavButton, sleep
} from '../utils'
import {
postAs
@ -9,9 +11,9 @@ import {
fixture`107-streaming-gap.js`
.page`http://localhost:4002`
test('fills in a status posted while away from timeline', async t => {
let timeout = 30000
const timeout = 30000
test('fills timeline gap while away from local timeline', async t => {
await loginAsFoobar(t)
await t
.click(localTimelineNavButton)
@ -26,7 +28,57 @@ test('fills in a status posted while away from timeline', async t => {
.click(localTimelineNavButton)
.expect(getNthStatus(1).innerText).contains('posted this while you were away!', { timeout })
.expect(getNthStatus(2).innerText).contains('heyo', { timeout })
await sleep(5000)
await sleep(2000)
await postAs('admin', 'posted this while you were watching')
await t.expect(getNthStatus(1).innerText).contains('posted this while you were watching', { timeout })
})
test('fills timeline gap while away from home timeline', async t => {
await loginAsFoobar(t)
await t
.expect(getNthStatus(1).exists).ok({ timeout })
.hover(getNthStatus(1))
await postAs('admin', 'hello world')
await t.expect(getNthStatus(1).innerText).contains('hello world', { timeout })
await forceOffline()
await sleep(1000)
await postAs('admin', 'posted this while you were offline')
await sleep(1000)
await forceOnline()
await t.expect(getNthStatus(1).innerText).contains('posted this while you were offline', { timeout })
})
test('fills timeline gap while away from notifications timeline', async t => {
await loginAsFoobar(t)
await t
.click(notificationsNavButton)
.expect(getNthStatus(1).exists).ok({ timeout })
.hover(getNthStatus(1))
await postAs('admin', '@foobar yo yo yo')
await t.expect(getNthStatus(1).innerText).contains('yo yo yo', { timeout })
await forceOffline()
await sleep(1000)
await postAs('admin', '@foobar mentioning you while you are offline!')
await sleep(1000)
await forceOnline()
await t.expect(getNthStatus(1).innerText).contains('mentioning you while you are offline!', { timeout })
})
test('fills timeline gap while away from notifications timeline - badge updates', async t => {
await loginAsFoobar(t)
await t
.expect(getNthStatus(1).exists).ok({ timeout })
.hover(getNthStatus(1))
await postAs('admin', '@foobar hi hi hi')
await t.expect(getNthStatus(1).innerText).contains('hi hi hi', { timeout })
await forceOffline()
await sleep(1000)
await postAs('admin', '@foobar sneaky mention!')
await sleep(1000)
await forceOnline()
await t
.expect(notificationBadge.innerText).eql('1', { timeout })
.click(notificationsNavButton)
.expect(notificationBadge.exists).notOk()
.expect(getNthStatus(1).innerText).contains('sneaky mention!', { timeout })
})