From 64350ad912b71dce6f97d4bcf7eaf5e29d63a589 Mon Sep 17 00:00:00 2001 From: Nolan Lawson Date: Sun, 11 Feb 2018 13:46:57 -0800 Subject: [PATCH] v1 of streaming --- routes/_actions/instances.js | 15 ++++ routes/_actions/streaming.js | 74 +++++++++++++++++++ routes/_actions/timeline.js | 6 +- .../{StatusStream.js => TimelineStream.js} | 34 ++++++--- routes/_components/timeline/Timeline.html | 34 +++++++-- routes/_store/instanceComputations.js | 6 ++ routes/_store/instanceObservers.js | 13 ++++ routes/_store/observers.js | 12 +-- routes/_store/store.js | 3 +- routes/_store/timelineComputations.js | 1 + routes/_store/timelineObservers.js | 38 ++++++++++ routes/_utils/asyncModules.js | 6 +- 12 files changed, 213 insertions(+), 29 deletions(-) create mode 100644 routes/_actions/streaming.js rename routes/_api/{StatusStream.js => TimelineStream.js} (55%) create mode 100644 routes/_store/instanceObservers.js create mode 100644 routes/_store/timelineObservers.js diff --git a/routes/_actions/instances.js b/routes/_actions/instances.js index 5ad962c1..893f6602 100644 --- a/routes/_actions/instances.js +++ b/routes/_actions/instances.js @@ -5,6 +5,7 @@ import { toast } from '../_utils/toast' import { database } from '../_database/database' import { goto } from 'sapper/runtime.js' import { cacheFirstUpdateAfter } from '../_utils/sync' +import { getInstanceInfo } from '../_api/instance' export function changeTheme (instanceName, newTheme) { let instanceThemes = store.get('instanceThemes') @@ -65,3 +66,17 @@ export async function updateVerifyCredentialsForInstance (instanceName) { verifyCredentials => setStoreVerifyCredentials(instanceName, verifyCredentials) ) } + + +export async function updateInstanceInfo(instanceName) { + await cacheFirstUpdateAfter( + () => getInstanceInfo(instanceName), + () => database.getInstanceInfo(instanceName), + info => database.setInstanceInfo(instanceName, info), + info => { + let instanceInfos = store.get('instanceInfos') + instanceInfos[instanceName] = info + store.set({instanceInfos: instanceInfos}) + } + ) +} \ No newline at end of file diff --git a/routes/_actions/streaming.js b/routes/_actions/streaming.js new file mode 100644 index 00000000..a584fef7 --- /dev/null +++ b/routes/_actions/streaming.js @@ -0,0 +1,74 @@ +import { TimelineStream } from '../_api/TimelineStream' +import identity from 'lodash/identity' +import { database } from '../_database/database' +import { store } from '../_store/store' +import { scheduleIdleTask } from '../_utils/scheduleIdleTask' + +async function removeDuplicates (instanceName, timelineName, updates) { + // remove duplicates, including duplicates due to reblogs + let timelineItemIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') + let reblogIds = (await Promise.all(timelineItemIds.map(async timelineItemId => { + let status = await database.getStatus(instanceName, timelineItemId) + return status.reblog && status.reblog.id + }))).filter(identity) + let existingItemIds = new Set([].concat(timelineItemIds).concat(reblogIds)) + return updates.filter(update => !existingItemIds.has(update.id)) +} + +async function handleFreshChanges (instanceName, timelineName) { + console.log('handleFreshChanges') + let freshChanges = store.getForTimeline(instanceName, timelineName, 'freshChanges') + console.log('freshChanges', freshChanges) + if (freshChanges.updates && freshChanges.updates.length) { + let updates = freshChanges.updates.slice() + freshChanges.updates = [] + store.setForTimeline(instanceName, timelineName, {freshChanges: freshChanges}) + + console.log('before removing duplicates, updates are ', updates.length) + updates = await removeDuplicates(instanceName, timelineName, updates) + console.log('after removing duplicates, updates are ', updates.length) + + await database.insertTimelineItems(instanceName, timelineName, updates) + + let itemIdsToAdd = store.getForTimeline(instanceName, timelineName, 'itemIdsToAdd') || [] + itemIdsToAdd = itemIdsToAdd.concat(updates.map(_ => _.id)) + store.setForTimeline(instanceName, timelineName, {itemIdsToAdd: itemIdsToAdd}) + } +} + +function handleStreamMessage (instanceName, timelineName, message) { + console.log('handleStreamMessage') + let { event, payload } = message + let key = event === 'update' ? 'updates' : 'deletes' + let freshChanges = store.getForTimeline(instanceName, timelineName, 'freshChanges') || {} + freshChanges[key] = freshChanges[key] || [] + freshChanges[key].push(JSON.parse(payload)) + store.setForTimeline(instanceName, timelineName, {freshChanges: freshChanges}) + scheduleIdleTask(() => { + handleFreshChanges(instanceName, timelineName) + }) +} + +export function createStream (streamingApi, instanceName, accessToken, timelineName) { + new TimelineStream(streamingApi, accessToken, timelineName, { + onMessage(msg) { + console.log('message', msg) + if (msg.event !== 'update' && msg.event !== 'delete') { + console.error("don't know how to handle event", msg) + return + } + scheduleIdleTask(() => { + handleStreamMessage(instanceName, timelineName, msg) + }) + }, + onOpen() { + console.log('opened stream for timeline', timelineName) + }, + onClose() { + console.log('closed stream for timeline', timelineName) + }, + onReconnect() { + console.log('reconnected stream for timeline', timelineName) + } + }) +} \ No newline at end of file diff --git a/routes/_actions/timeline.js b/routes/_actions/timeline.js index 6d8422bd..b935b963 100644 --- a/routes/_actions/timeline.js +++ b/routes/_actions/timeline.js @@ -30,10 +30,14 @@ async function addTimelineItems (instanceName, timelineName, newItems) { console.log('addTimelineItems, length:', newItems.length) mark('addTimelineItems') let newIds = newItems.map(item => item.id) + addTimelineItemIds(instanceName, timelineName, newIds) + stop('addTimelineItems') +} + +export async function addTimelineItemIds (instanceName, timelineName, newIds) { let oldIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') || [] let merged = mergeArrays(oldIds, newIds) store.setForTimeline(instanceName, timelineName, { timelineItemIds: merged }) - stop('addTimelineItems') } async function fetchTimelineItemsAndPossiblyFallBack () { diff --git a/routes/_api/StatusStream.js b/routes/_api/TimelineStream.js similarity index 55% rename from routes/_api/StatusStream.js rename to routes/_api/TimelineStream.js index 26cd04f6..952f0211 100644 --- a/routes/_api/StatusStream.js +++ b/routes/_api/TimelineStream.js @@ -1,6 +1,6 @@ import { paramsString } from '../_utils/ajax' import noop from 'lodash/noop' -import WebSocketClient from '@gamestdio/websocket' +import { importWebSocketClient } from '../_utils/asyncModules' function getStreamName (timeline) { switch (timeline) { @@ -16,6 +16,9 @@ function getStreamName (timeline) { if (timeline.startsWith('tag/')) { return 'hashtag' } + if (timeline.startsWith('list/')) { + return 'list' + } } function getUrl (streamingApi, accessToken, timeline) { @@ -28,6 +31,8 @@ function getUrl (streamingApi, accessToken, timeline) { if (timeline.startsWith('tag/')) { params.tag = timeline.split('/').slice(-1)[0] + } else if (timeline.startsWith('list/')) { + params.list = timeline.split('/').slice(-1)[0] } if (accessToken) { @@ -37,22 +42,29 @@ function getUrl (streamingApi, accessToken, timeline) { return url + '?' + paramsString(params) } -export class StatusStream { +export class TimelineStream { constructor (streamingApi, accessToken, timeline, opts) { let url = getUrl(streamingApi, accessToken, timeline) + importWebSocketClient().then(WebSocketClient => { + if (this.__closed) { + return + } + const ws = new WebSocketClient(url, null, { backoff: 'exponential' }) + const onMessage = opts.onMessage || noop - const ws = new WebSocketClient(url, null, { backoff: 'exponential' }) - const onMessage = opts.onMessage || noop + ws.onopen = opts.onOpen || noop + ws.onmessage = e => onMessage(JSON.parse(e.data)) + ws.onclose = opts.onClose || noop + ws.onreconnect = opts.onReconnect || noop - ws.onopen = opts.onOpen || noop - ws.onmessage = e => onMessage(JSON.parse(e.data)) - ws.onclose = opts.onClose || noop - ws.onreconnect = opts.onReconnect || noop - - this._ws = ws + this._ws = ws + }) } close () { - this._ws.close() + this.__closed = true + if (this._ws) { + this._ws.close() + } } } diff --git a/routes/_components/timeline/Timeline.html b/routes/_components/timeline/Timeline.html index 42c83ac4..f7dd9138 100644 --- a/routes/_components/timeline/Timeline.html +++ b/routes/_components/timeline/Timeline.html @@ -61,21 +61,27 @@ import { initializeTimeline, fetchTimelineItemsOnScrollToBottom, setupTimeline } from '../../_actions/timeline' import LoadingPage from '../LoadingPage.html' import { focusWithCapture, blurWithCapture } from '../../_utils/events' + import { addTimelineItemIds } from '../../_actions/timeline' export default { oncreate() { console.log('timeline oncreate()') - this.onPushState = this.onPushState.bind(this) - this.store.setForCurrentTimeline({ignoreBlurEvents: false}) - window.addEventListener('pushState', this.onPushState) + this.setupFocus() setupTimeline() if (this.store.get('initialized')) { this.restoreFocus() } + let instanceName = this.store.get('currentInstance') + let timelineName = this.get('timeline') + this.observe('itemIdsToAdd', itemIdsToAdd => { + console.log('itemIdsToAdd', itemIdsToAdd) + addTimelineItemIds(instanceName, timelineName, itemIdsToAdd) + this.store.setForTimeline(instanceName, timelineName, { itemIdsToAdd: [] }) + }) }, ondestroy() { console.log('ondestroy') - window.removeEventListener('pushState', this.onPushState) + this.teardownFocus() }, data: () => ({ StatusVirtualListItem, @@ -125,6 +131,12 @@ && $firstTimelineItemId && timelineValue !== $firstTimelineItemId && timelineValue + }, + itemIdsToAdd: (timeline, $currentInstance, $timelines) => { + return ($timelines && + $timelines[$currentInstance] && + $timelines[$currentInstance][timeline] && + $timelines[$currentInstance][timeline].itemIdsToAdd) || [] } }, store: () => store, @@ -145,9 +157,6 @@ console.log('timeline initialize()') initializeTimeline() }, - onPushState() { - this.store.setForCurrentTimeline({ ignoreBlurEvents: true }) - }, onScrollToBottom() { if (!this.store.get('initialized') || this.store.get('runningUpdate') || @@ -156,6 +165,17 @@ } fetchTimelineItemsOnScrollToBottom() }, + setupFocus() { + this.onPushState = this.onPushState.bind(this) + this.store.setForCurrentTimeline({ignoreBlurEvents: false}) + window.addEventListener('pushState', this.onPushState) + }, + teardownFocus() { + window.removeEventListener('pushState', this.onPushState) + }, + onPushState() { + this.store.setForCurrentTimeline({ ignoreBlurEvents: true }) + }, saveFocus(e) { let instanceName = this.store.get('currentInstance') let timelineName = this.get('timeline') diff --git a/routes/_store/instanceComputations.js b/routes/_store/instanceComputations.js index 7dcf4fbb..f347936c 100644 --- a/routes/_store/instanceComputations.js +++ b/routes/_store/instanceComputations.js @@ -47,6 +47,12 @@ export function instanceComputations (store) { (currentInstance, verifyCredentials) => verifyCredentials && verifyCredentials[currentInstance] ) + store.compute( + 'currentInstanceInfo', + ['currentInstance', 'instanceInfos'], + (currentInstance, instanceInfos) => instanceInfos && instanceInfos[currentInstance] + ) + store.compute( 'pinnedPage', ['pinnedPages', 'currentInstance'], diff --git a/routes/_store/instanceObservers.js b/routes/_store/instanceObservers.js new file mode 100644 index 00000000..a97a2146 --- /dev/null +++ b/routes/_store/instanceObservers.js @@ -0,0 +1,13 @@ +import { updateInstanceInfo, updateVerifyCredentialsForInstance } from '../_actions/instances' +import { updateLists } from '../_actions/lists' + +export function instanceObservers (store) { + store.observe('currentInstance', (currentInstance) => { + if (!currentInstance) { + return + } + updateVerifyCredentialsForInstance(currentInstance) + updateInstanceInfo(currentInstance) + updateLists() + }) +} \ No newline at end of file diff --git a/routes/_store/observers.js b/routes/_store/observers.js index c5510600..228b5d66 100644 --- a/routes/_store/observers.js +++ b/routes/_store/observers.js @@ -1,11 +1,7 @@ -import { updateVerifyCredentialsForInstance } from '../_actions/instances' -import { updateLists } from '../_actions/lists' +import { instanceObservers } from './instanceObservers' +import { timelineObservers } from './timelineObservers' export function observers (store) { - store.observe('currentInstance', (currentInstance) => { - if (currentInstance) { - updateVerifyCredentialsForInstance(currentInstance) - updateLists() - } - }) + instanceObservers(store) + timelineObservers(store) } diff --git a/routes/_store/store.js b/routes/_store/store.js index 70071d0a..2fd1453d 100644 --- a/routes/_store/store.js +++ b/routes/_store/store.js @@ -35,7 +35,8 @@ const store = new PinaforeStore({ markMediaAsSensitive: false, pinnedPages: {}, instanceLists: {}, - pinnedStatuses: {} + pinnedStatuses: {}, + instanceInfos: {} }) mixins(PinaforeStore) diff --git a/routes/_store/timelineComputations.js b/routes/_store/timelineComputations.js index 68310fc3..0e7bbe87 100644 --- a/routes/_store/timelineComputations.js +++ b/routes/_store/timelineComputations.js @@ -14,6 +14,7 @@ export function timelineComputations (store) { computeForTimeline(store, 'initialized') computeForTimeline(store, 'lastFocusedElementSelector') computeForTimeline(store, 'ignoreBlurEvents') + computeForTimeline(store, 'itemIdsToAdd') store.compute('firstTimelineItemId', ['timelineItemIds'], (timelineItemIds) => timelineItemIds && timelineItemIds.length && timelineItemIds[0]) store.compute('lastTimelineItemId', ['timelineItemIds'], (timelineItemIds) => timelineItemIds && timelineItemIds.length && timelineItemIds[timelineItemIds.length - 1]) diff --git a/routes/_store/timelineObservers.js b/routes/_store/timelineObservers.js new file mode 100644 index 00000000..927b148f --- /dev/null +++ b/routes/_store/timelineObservers.js @@ -0,0 +1,38 @@ +import { updateInstanceInfo } from '../_actions/instances' +import { createStream } from '../_actions/streaming' + +export function timelineObservers (store) { + + let currentTimelineStream + + store.observe('currentTimeline', async (currentTimeline) => { + if (!process.browser) { + return + } + if (currentTimelineStream) { + currentTimelineStream.close() + currentTimelineStream = null + } + if (!currentTimeline) { + return + } + if (!(['home', 'local', 'federated'].includes(currentTimeline) || + currentTimeline.startsWith('list/') || + currentTimeline.startsWith('tag/'))) { + return + } + + let currentInstance = store.get('currentInstance') + let accessToken = store.get('accessToken') + await updateInstanceInfo(currentInstance) + let instanceInfo = store.get('currentInstanceInfo') + if (!(instanceInfo && + store.get('currentInstance') === currentInstance && + store.get('currentTimeline') === currentTimeline)) { + return + } + + currentTimelineStream = createStream(instanceInfo.urls.streaming_api, + currentInstance, accessToken, currentTimeline) + }) +} diff --git a/routes/_utils/asyncModules.js b/routes/_utils/asyncModules.js index e623c4d8..c225dc8d 100644 --- a/routes/_utils/asyncModules.js +++ b/routes/_utils/asyncModules.js @@ -23,4 +23,8 @@ export const importRequestIdleCallback = () => import( export const importIndexedDBGetAllShim = () => import( /* webpackChunkName: 'indexeddb-getall-shim' */ 'indexeddb-getall-shim' - ) \ No newline at end of file + ) + +export const importWebSocketClient = () => import( + /* webpackChunkName: '@gamestdio/websocket' */ '@gamestdio/websocket' + ).then(mod => mod.default) \ No newline at end of file