diff --git a/src/client.js b/src/client.js index e5312105..64e80bbf 100644 --- a/src/client.js +++ b/src/client.js @@ -3,6 +3,7 @@ import { loadPolyfills } from './routes/_utils/loadPolyfills' import './routes/_utils/serviceWorkerClient' import './routes/_utils/historyEvents' import './routes/_utils/loadingMask' +import './routes/_utils/forceOnline' loadPolyfills().then(() => { console.log('init()') diff --git a/src/routes/_actions/stream/fillStreamingGap.js b/src/routes/_actions/stream/fillStreamingGap.js new file mode 100644 index 00000000..930a34ff --- /dev/null +++ b/src/routes/_actions/stream/fillStreamingGap.js @@ -0,0 +1,15 @@ +// TODO: should probably just keep fetching timeline items in the gap, not stop at 40 +import { addStatusesOrNotifications } from '../addStatusOrNotification' +import { getTimeline } from '../../_api/timelines' + +const TIMELINE_GAP_BATCH_SIZE = 40 + +// fill in the "streaming gap" – i.e. fetch the most recent items so that there isn't +// a big gap in the timeline if you haven't looked at it in awhile +export async function fillStreamingGap (instanceName, accessToken, timelineName, firstTimelineItemId) { + let newTimelineItems = await getTimeline(instanceName, accessToken, + timelineName, null, firstTimelineItemId, TIMELINE_GAP_BATCH_SIZE) + if (newTimelineItems.length) { + addStatusesOrNotifications(instanceName, timelineName, newTimelineItems) + } +} diff --git a/src/routes/_actions/streaming.js b/src/routes/_actions/stream/processMessage.js similarity index 50% rename from src/routes/_actions/streaming.js rename to src/routes/_actions/stream/processMessage.js index 0476f857..4a59e571 100644 --- a/src/routes/_actions/streaming.js +++ b/src/routes/_actions/stream/processMessage.js @@ -1,11 +1,16 @@ -import { TimelineStream } from '../_api/TimelineStream' -import { mark, stop } from '../_utils/marks' -import { deleteStatus } from './deleteStatuses' -import { addStatusOrNotification } from './addStatusOrNotification' +import { mark, stop } from '../../_utils/marks' +import { deleteStatus } from '../deleteStatuses' +import { addStatusOrNotification } from '../addStatusOrNotification' -function processMessage (instanceName, timelineName, message) { - mark('processMessage') +const KNOWN_EVENTS = ['update', 'delete', 'notification', 'conversation'] + +export function processMessage (instanceName, timelineName, message) { let { event, payload } = message + if (!KNOWN_EVENTS.includes(event)) { + console.error("don't know how to handle event", message) + return + } + mark('processMessage') if (['update', 'notification', 'conversation'].includes(event)) { payload = JSON.parse(payload) // only these payloads are JSON-encoded for some reason } @@ -34,33 +39,3 @@ function processMessage (instanceName, timelineName, message) { } stop('processMessage') } - -export function createStream (streamingApi, instanceName, accessToken, - timelineName, onOpenStream) { - return new TimelineStream(streamingApi, accessToken, timelineName, { - onMessage (msg) { - if ( - msg.event !== 'update' && - msg.event !== 'delete' && - msg.event !== 'notification' && - msg.event !== 'conversation' - ) { - console.error("don't know how to handle event", msg) - return - } - processMessage(instanceName, timelineName, msg) - }, - onOpen () { - if (onOpenStream) { - onOpenStream() - } - console.log('opened stream for timeline', timelineName) - }, - onClose () { - console.log('closed stream for timeline', timelineName) - }, - onReconnect () { - console.log('reconnected stream for timeline', timelineName) - } - }) -} diff --git a/src/routes/_actions/stream/streaming.js b/src/routes/_actions/stream/streaming.js new file mode 100644 index 00000000..f5b551f7 --- /dev/null +++ b/src/routes/_actions/stream/streaming.js @@ -0,0 +1,52 @@ +import { TimelineStream } from '../../_api/stream/TimelineStream' +import { processMessage } from './processMessage' +import { fillStreamingGap } from './fillStreamingGap' +import { store } from '../../_store/store' + +export function createStream (api, instanceName, accessToken, timelineName, firstStatusId, firstNotificationId) { + console.log(`streaming ${instanceName} ${timelineName}: createStream`, 'firstStatusId', firstStatusId, + 'firstNotificationId', firstNotificationId) + + const fillGap = (timelineName, timelineItemId) => { + if (timelineItemId) { + console.log(`streaming ${instanceName} ${timelineName}: fillGap since`, timelineItemId) + /* no await */ fillStreamingGap(instanceName, accessToken, timelineName, timelineItemId) + } + } + + const onMessage = message => { + processMessage(instanceName, timelineName, message) + } + + const onOpen = () => { + console.log(`streaming ${instanceName} ${timelineName}: opened`) + fillGap(timelineName, firstStatusId) + if (timelineName === 'home') { + // special case - home timeline stream also handles notifications + fillGap('notifications', firstNotificationId) + } + } + + const onClose = () => { + console.log(`streaming ${instanceName} ${timelineName}: closed`) + } + + const onReconnect = () => { + console.log(`streaming ${instanceName} ${timelineName}: reconnected`) + // When reconnecting, we recompute the firstStatusId and firstNotificationId because these may have + // changed since we first started streaming. + let newFirstStatusId = store.getFirstTimelineItemId(instanceName, timelineName) + fillGap(timelineName, newFirstStatusId) + if (timelineName === 'home') { + // special case - home timeline stream also handles notifications + let newFirstNotificationId = store.getFirstTimelineItemId(instanceName, timelineName) + fillGap('notifications', newFirstNotificationId) + } + } + + return new TimelineStream(api, accessToken, timelineName) + .on('message', onMessage) + .on('open', onOpen) + .on('close', onClose) + .on('reconnect', onReconnect) +} diff --git a/src/routes/_api/TimelineStream.js b/src/routes/_api/TimelineStream.js deleted file mode 100644 index 2274ac87..00000000 --- a/src/routes/_api/TimelineStream.js +++ /dev/null @@ -1,106 +0,0 @@ -import { paramsString } from '../_utils/ajax' -import noop from 'lodash-es/noop' -import WebSocketClient from '@gamestdio/websocket' -import lifecycle from 'page-lifecycle/dist/lifecycle.mjs' - -function getStreamName (timeline) { - switch (timeline) { - case 'local': - return 'public:local' - case 'federated': - return 'public' - case 'home': - return 'user' - case 'notifications': - return 'user:notification' - case 'direct': - return 'direct' - } - if (timeline.startsWith('tag/')) { - return 'hashtag' - } - if (timeline.startsWith('list/')) { - return 'list' - } -} - -function getUrl (streamingApi, accessToken, timeline) { - let url = `${streamingApi}/api/v1/streaming` - let streamName = getStreamName(timeline) - - let params = { - stream: streamName - } - - if (timeline.startsWith('tag/')) { - params.tag = timeline.split('/').slice(-1)[0] - } else if (timeline.startsWith('list/')) { - params.list = timeline.split('/').slice(-1)[0] - } - - if (accessToken) { - params.access_token = accessToken - } - - return url + '?' + paramsString(params) -} - -export class TimelineStream { - constructor (streamingApi, accessToken, timeline, opts) { - this._streamingApi = streamingApi - this._accessToken = accessToken - this._timeline = timeline - this._opts = opts - this._onStateChange = this._onStateChange.bind(this) - this._setupWebSocket() - this._setupLifecycle() - } - - close () { - this._closed = true - this._closeWebSocket() - this._teardownLifecycle() - } - - _closeWebSocket () { - if (this._ws) { - this._ws.close() - this._ws = null - } - } - - _setupWebSocket () { - const url = getUrl(this._streamingApi, this._accessToken, this._timeline) - const ws = new WebSocketClient(url, null, { backoff: 'fibonacci' }) - - ws.onopen = this._opts.onOpen || noop - ws.onmessage = this._opts.onMessage ? e => this._opts.onMessage(JSON.parse(e.data)) : noop - ws.onclose = this._opts.onClose || noop - ws.onreconnect = this._opts.onReconnect || noop - - this._ws = ws - } - - _setupLifecycle () { - lifecycle.addEventListener('statechange', this._onStateChange) - } - - _teardownLifecycle () { - lifecycle.removeEventListener('statechange', this._onStateChange) - } - - _onStateChange (event) { - if (this._closed) { - return - } - // when the page enters or exits a frozen state, pause or resume websocket polling - if (event.newState === 'frozen') { // page is frozen - console.log('frozen') - this._closeWebSocket() - } else if (event.oldState === 'frozen') { // page is unfrozen - console.log('unfrozen') - this._closeWebSocket() - this._setupWebSocket() - } - } -} diff --git a/src/routes/_api/stream/TimelineStream.js b/src/routes/_api/stream/TimelineStream.js new file mode 100644 index 00000000..c11a7eef --- /dev/null +++ b/src/routes/_api/stream/TimelineStream.js @@ -0,0 +1,109 @@ +import WebSocketClient from '@gamestdio/websocket' +import lifecycle from 'page-lifecycle/dist/lifecycle.mjs' +import { getStreamUrl } from './getStreamUrl' +import { EventEmitter } from 'events-light' +import { eventBus } from '../../_utils/eventBus' + +export class TimelineStream extends EventEmitter { + constructor (streamingApi, accessToken, timeline) { + super() + this._streamingApi = streamingApi + this._accessToken = accessToken + this._timeline = timeline + this._onStateChange = this._onStateChange.bind(this) + this._onOnlineForced = this._onOnlineForced.bind(this) + this._setupWebSocket() + this._setupEvents() + } + + close () { + this._closed = true + this._closeWebSocket() + this._teardownEvents() + // events-light currently does not support removeAllListeners() + // https://github.com/patrick-steele-idem/events-light/issues/2 + for (let event of ['open', 'close', 'reconnect', 'message']) { + this.removeAllListeners(event) + } + } + + _closeWebSocket () { + if (this._ws) { + this.emit('close') + this._ws.onopen = null + this._ws.onmessage = null + this._ws.onclose = null + this._ws.close() + this._ws = null + } + } + + _setupWebSocket () { + const url = getStreamUrl(this._streamingApi, this._accessToken, this._timeline) + const ws = new WebSocketClient(url, null, { backoff: 'fibonacci' }) + + ws.onopen = () => { + if (!this._opened) { + this.emit('open') + this._opened = true + } else { + // we may close or reopen websockets due to freeze/unfreeze events + // and we want to fire "reconnect" rather than "open" in that case + this.emit('reconnect') + } + } + ws.onmessage = (e) => this.emit('message', JSON.parse(e.data)) + ws.onclose = () => this.emit('close') + // The ws "onreconnect" event seems unreliable. When the server goes down and comes back up, + // it doesn't fire (but "open" does). When we freeze and unfreeze, it fires along with the + // "open" event. The above is my attempt to normalize it. + + this._ws = ws + } + + _setupEvents () { + lifecycle.addEventListener('statechange', this._onStateChange) + eventBus.on('forcedOnline', this._onOnlineForced) + } + + _teardownEvents () { + lifecycle.removeEventListener('statechange', this._onStateChange) + eventBus.removeListener('forcedOnline', this._onOnlineForced) + } + + _pause () { + if (this._closed) { + return + } + this._closeWebSocket() + } + + _unpause () { + if (this._closed) { + return + } + this._closeWebSocket() + this._setupWebSocket() + } + + _onStateChange (event) { + // when the page enters or exits a frozen state, pause or resume websocket polling + if (event.newState === 'frozen') { // page is frozen + console.log('frozen') + this._pause() + } else if (event.oldState === 'frozen') { // page is unfrozen + console.log('unfrozen') + this._unpause() + } + } + + _onOnlineForced (online) { + if (online) { + console.log('online forced') + this._unpause() + } else { + console.log('offline forced') + this._pause() + } + } +} diff --git a/src/routes/_api/stream/getStreamUrl.js b/src/routes/_api/stream/getStreamUrl.js new file mode 100644 index 00000000..32a6e99b --- /dev/null +++ b/src/routes/_api/stream/getStreamUrl.js @@ -0,0 +1,43 @@ +import { paramsString } from '../../_utils/ajax' + +function getStreamName (timeline) { + switch (timeline) { + case 'local': + return 'public:local' + case 'federated': + return 'public' + case 'home': + return 'user' + case 'notifications': + return 'user:notification' + case 'direct': + return 'direct' + } + if (timeline.startsWith('tag/')) { + return 'hashtag' + } + if (timeline.startsWith('list/')) { + return 'list' + } +} + +export function getStreamUrl (streamingApi, accessToken, timeline) { + let url = `${streamingApi}/api/v1/streaming` + let streamName = getStreamName(timeline) + + let params = { + stream: streamName + } + + if (timeline.startsWith('tag/')) { + params.tag = timeline.split('/').slice(-1)[0] + } else if (timeline.startsWith('list/')) { + params.list = timeline.split('/').slice(-1)[0] + } + + if (accessToken) { + params.access_token = accessToken + } + + return url + '?' + paramsString(params) +} diff --git a/src/routes/_store/mixins/timelineMixins.js b/src/routes/_store/mixins/timelineMixins.js index ecec0632..13c93eae 100644 --- a/src/routes/_store/mixins/timelineMixins.js +++ b/src/routes/_store/mixins/timelineMixins.js @@ -1,4 +1,5 @@ import { pickBy, get } from '../../_utils/lodash-lite' +import { getFirstIdFromItemSummaries } from '../../_utils/getIdFromItemSummaries' export function timelineMixins (Store) { Store.prototype.setForTimeline = function (instanceName, timelineName, obj) { @@ -25,6 +26,11 @@ export function timelineMixins (Store) { return root[instanceName] || {} } + Store.prototype.getFirstTimelineItemId = function (instanceName, timelineName) { + let summaries = this.getForTimeline(instanceName, timelineName, 'timelineItemSummaries') + return getFirstIdFromItemSummaries(summaries) + } + Store.prototype.setForCurrentTimeline = function (obj) { let { currentInstance, currentTimeline } = this.get() this.setForTimeline(currentInstance, currentTimeline, obj) diff --git a/src/routes/_store/observers/instanceObservers.js b/src/routes/_store/observers/instanceObservers.js index b98095df..9caa2406 100644 --- a/src/routes/_store/observers/instanceObservers.js +++ b/src/routes/_store/observers/instanceObservers.js @@ -1,15 +1,11 @@ import { updateInstanceInfo, updateVerifyCredentialsForInstance } from '../../_actions/instances' import { updateListsForInstance } from '../../_actions/lists' -import { createStream } from '../../_actions/streaming' +import { createStream } from '../../_actions/stream/streaming' import { updatePushSubscriptionForInstance } from '../../_actions/pushSubscription' import { updateCustomEmojiForInstance } from '../../_actions/emoji' -import { addStatusesOrNotifications } from '../../_actions/addStatusOrNotification' -import { getTimeline } from '../../_api/timelines' -import { TIMELINE_BATCH_SIZE } from '../../_static/timelines' import { scheduleIdleTask } from '../../_utils/scheduleIdleTask' import { mark, stop } from '../../_utils/marks' import { store } from '../store' -import { getFirstIdFromItemSummaries } from '../../_utils/getIdFromItemSummaries' // stream to watch for home timeline updates and notifications let currentInstanceStream @@ -57,45 +53,19 @@ async function refreshInstanceData (instanceName) { } function stream (store, instanceName, currentInstanceInfo) { - let homeTimelineItemSummaries = store.getForTimeline(instanceName, - 'home', 'timelineItemSummaries') - let firstHomeTimelineItemId = getFirstIdFromItemSummaries(homeTimelineItemSummaries) - let notificationItemSummaries = store.getForTimeline(instanceName, - 'notifications', 'timelineItemSummaries') - let firstNotificationTimelineItemId = getFirstIdFromItemSummaries(notificationItemSummaries) - let { accessToken } = store.get() let streamingApi = currentInstanceInfo.urls.streaming_api + let firstStatusId = store.getFirstTimelineItemId(instanceName, 'home') + let firstNotificationId = store.getFirstTimelineItemId(instanceName, 'notifications') - function onOpenStream () { - if (currentInstanceChanged(store, instanceName)) { - return - } - - /* no await */ fillGap(instanceName, accessToken, 'home', firstHomeTimelineItemId) - /* no await */ fillGap(instanceName, accessToken, 'notifications', firstNotificationTimelineItemId) - } - - currentInstanceStream = createStream(streamingApi, instanceName, accessToken, 'home', onOpenStream) + currentInstanceStream = createStream(streamingApi, instanceName, accessToken, 'home', + firstStatusId, firstNotificationId) if (process.env.NODE_ENV !== 'production') { window.currentInstanceStream = currentInstanceStream } } -// fill in the "streaming gap" – i.e. fetch the most recent 20 items so that there isn't -// a big gap in the timeline if you haven't looked at it in awhile -async function fillGap (instanceName, accessToken, timelineName, firstTimelineItemId) { - if (!firstTimelineItemId) { - return - } - let newTimelineItems = await getTimeline(instanceName, accessToken, - timelineName, null, firstTimelineItemId, TIMELINE_BATCH_SIZE) - if (newTimelineItems.length) { - addStatusesOrNotifications(instanceName, timelineName, newTimelineItems) - } -} - export function instanceObservers () { store.observe('currentInstance', async (currentInstance) => { if (!process.browser) { diff --git a/src/routes/_store/observers/timelineObservers.js b/src/routes/_store/observers/timelineObservers.js index 6bb06903..45c18925 100644 --- a/src/routes/_store/observers/timelineObservers.js +++ b/src/routes/_store/observers/timelineObservers.js @@ -1,10 +1,6 @@ import { updateInstanceInfo } from '../../_actions/instances' -import { createStream } from '../../_actions/streaming' -import { getTimeline } from '../../_api/timelines' -import { addStatusesOrNotifications } from '../../_actions/addStatusOrNotification' -import { TIMELINE_BATCH_SIZE } from '../../_static/timelines' +import { createStream } from '../../_actions/stream/streaming' import { store } from '../store' -import { getFirstIdFromItemSummaries } from '../../_utils/getIdFromItemSummaries' export function timelineObservers () { // stream to watch for local/federated/etc. updates. home and notification @@ -60,27 +56,12 @@ export function timelineObservers () { return } - let timelineItemSummaries = store.getForTimeline(currentInstance, - currentTimeline, 'timelineItemSummaries') - let firstTimelineItemId = getFirstIdFromItemSummaries(timelineItemSummaries) - - let onOpenStream = async () => { - if (!firstTimelineItemId || !currentTimelineIsUnchanged()) { - return - } - // fill in the "streaming gap" – i.e. fetch the most recent 20 items so that there isn't - // a big gap in the timeline if you haven't looked at it in awhile - let newTimelineItems = await getTimeline(currentInstance, accessToken, - currentTimeline, null, firstTimelineItemId, TIMELINE_BATCH_SIZE) - if (newTimelineItems.length) { - addStatusesOrNotifications(currentInstance, currentTimeline, newTimelineItems) - } - } - + let firstStatusId = store.getFirstTimelineItemId(currentInstance, currentTimeline) let { currentInstanceInfo } = store.get() let streamingApi = currentInstanceInfo.urls.streaming_api + currentTimelineStream = createStream(streamingApi, currentInstance, accessToken, - currentTimeline, onOpenStream) + currentTimeline, firstStatusId) if (process.env.NODE_ENV !== 'production') { window.currentTimelineStream = currentTimelineStream diff --git a/src/routes/_store/store.js b/src/routes/_store/store.js index b8483398..865732d5 100644 --- a/src/routes/_store/store.js +++ b/src/routes/_store/store.js @@ -80,8 +80,3 @@ observers(store) if (process.browser && process.env.NODE_ENV !== 'production') { window.store = store // for debugging } - -// needed for tests -if (process.browser) { - window.__forceOnline = online => store.set({ online }) -} diff --git a/src/routes/_utils/eventBus.js b/src/routes/_utils/eventBus.js index 65ba075c..a9c32bb3 100644 --- a/src/routes/_utils/eventBus.js +++ b/src/routes/_utils/eventBus.js @@ -1,6 +1,6 @@ import EventEmitter from 'events-light' -const eventBus = new EventEmitter() +export const eventBus = new EventEmitter() if (process.browser && process.env.NODE_ENV !== 'production') { window.eventBus = eventBus diff --git a/src/routes/_utils/forceOnline.js b/src/routes/_utils/forceOnline.js new file mode 100644 index 00000000..7dce07bd --- /dev/null +++ b/src/routes/_utils/forceOnline.js @@ -0,0 +1,20 @@ +import { store } from '../_store/store' +import { emit } from './eventBus' + +// Force online/offline state. Needed for integration tests. +// It would be nice not to actually ship this in production, but *shrug* +if (process.browser) { + const globalFetch = window.fetch + + window.__forceOnline = online => { + store.set({ online }) + + if (online) { + window.fetch = globalFetch + emit('forcedOnline', true) + } else { + window.fetch = () => Promise.reject(new Error('force offline')) + emit('forcedOnline', false) + } + } +} diff --git a/tests/spec/107-streaming-gap.js b/tests/spec/107-streaming-gap.js index ed2cfd93..14f06711 100644 --- a/tests/spec/107-streaming-gap.js +++ b/tests/spec/107-streaming-gap.js @@ -1,6 +1,8 @@ import { loginAsFoobar } from '../roles' import { - getNthStatus, homeNavButton, localTimelineNavButton, sleep + forceOffline, + forceOnline, + getNthStatus, homeNavButton, localTimelineNavButton, notificationBadge, notificationsNavButton, sleep } from '../utils' import { postAs @@ -9,9 +11,9 @@ import { fixture`107-streaming-gap.js` .page`http://localhost:4002` -test('fills in a status posted while away from timeline', async t => { - let timeout = 30000 +const timeout = 30000 +test('fills timeline gap while away from local timeline', async t => { await loginAsFoobar(t) await t .click(localTimelineNavButton) @@ -26,7 +28,57 @@ test('fills in a status posted while away from timeline', async t => { .click(localTimelineNavButton) .expect(getNthStatus(1).innerText).contains('posted this while you were away!', { timeout }) .expect(getNthStatus(2).innerText).contains('heyo', { timeout }) - await sleep(5000) + await sleep(2000) await postAs('admin', 'posted this while you were watching') await t.expect(getNthStatus(1).innerText).contains('posted this while you were watching', { timeout }) }) + +test('fills timeline gap while away from home timeline', async t => { + await loginAsFoobar(t) + await t + .expect(getNthStatus(1).exists).ok({ timeout }) + .hover(getNthStatus(1)) + await postAs('admin', 'hello world') + await t.expect(getNthStatus(1).innerText).contains('hello world', { timeout }) + await forceOffline() + await sleep(1000) + await postAs('admin', 'posted this while you were offline') + await sleep(1000) + await forceOnline() + await t.expect(getNthStatus(1).innerText).contains('posted this while you were offline', { timeout }) +}) + +test('fills timeline gap while away from notifications timeline', async t => { + await loginAsFoobar(t) + await t + .click(notificationsNavButton) + .expect(getNthStatus(1).exists).ok({ timeout }) + .hover(getNthStatus(1)) + await postAs('admin', '@foobar yo yo yo') + await t.expect(getNthStatus(1).innerText).contains('yo yo yo', { timeout }) + await forceOffline() + await sleep(1000) + await postAs('admin', '@foobar mentioning you while you are offline!') + await sleep(1000) + await forceOnline() + await t.expect(getNthStatus(1).innerText).contains('mentioning you while you are offline!', { timeout }) +}) + +test('fills timeline gap while away from notifications timeline - badge updates', async t => { + await loginAsFoobar(t) + await t + .expect(getNthStatus(1).exists).ok({ timeout }) + .hover(getNthStatus(1)) + await postAs('admin', '@foobar hi hi hi') + await t.expect(getNthStatus(1).innerText).contains('hi hi hi', { timeout }) + await forceOffline() + await sleep(1000) + await postAs('admin', '@foobar sneaky mention!') + await sleep(1000) + await forceOnline() + await t + .expect(notificationBadge.innerText).eql('1', { timeout }) + .click(notificationsNavButton) + .expect(notificationBadge.exists).notOk() + .expect(getNthStatus(1).innerText).contains('sneaky mention!', { timeout }) +})