handle streamed deletions

This commit is contained in:
Nolan Lawson 2018-02-16 19:38:21 -08:00
parent 6dfed9d12a
commit 5a1016d1c9
7 changed files with 246 additions and 85 deletions

View file

@ -0,0 +1,55 @@
import throttle from 'lodash/throttle'
import { getIdsThatTheseStatusesReblogged } from './statuses'
import { database } from '../_database/database'
import { mark, stop } from '../_utils/marks'
import { store } from '../_store/store'
import { scheduleIdleTask } from '../_utils/scheduleIdleTask'
async function getExistingItemIdsSet (instanceName, timelineName) {
let timelineItemIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') || []
if (timelineName === 'notifications') {
return new Set(timelineItemIds)
}
let reblogIds = await getIdsThatTheseStatusesReblogged(instanceName, timelineItemIds)
return new Set([].concat(timelineItemIds).concat(reblogIds))
}
async function removeDuplicates (instanceName, timelineName, updates) {
// remove duplicates, including duplicates due to reblogs
let existingItemIds = await getExistingItemIdsSet(instanceName, timelineName)
return updates.filter(update => !existingItemIds.has(update.id))
}
async function processFreshUpdates (instanceName, timelineName) {
mark('processFreshUpdates')
let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates')
if (freshUpdates && freshUpdates.length) {
let updates = freshUpdates.slice()
store.setForTimeline(instanceName, timelineName, {freshUpdates: []})
updates = await removeDuplicates(instanceName, timelineName, updates)
await database.insertTimelineItems(instanceName, timelineName, updates)
let itemIdsToAdd = store.getForTimeline(instanceName, timelineName, 'itemIdsToAdd') || []
if (updates && updates.length) {
itemIdsToAdd = itemIdsToAdd.concat(updates.map(_ => _.id))
console.log('adding ', itemIdsToAdd.length, 'items to itemIdsToAdd')
store.setForTimeline(instanceName, timelineName, {itemIdsToAdd: itemIdsToAdd})
}
stop('processFreshUpdates')
}
}
const lazilyProcessFreshUpdates = throttle((instanceName, timelineName) => {
scheduleIdleTask(() => {
/* no await */ processFreshUpdates(instanceName, timelineName)
})
}, 5000)
export function addStatusOrNotification (instanceName, timelineName, newStatusOrNotification) {
let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates') || []
freshUpdates.push(newStatusOrNotification)
store.setForTimeline(instanceName, timelineName, {freshUpdates: freshUpdates})
lazilyProcessFreshUpdates(instanceName, timelineName)
}

View file

@ -0,0 +1,41 @@
import { getIdsThatRebloggedThisStatus, getIdThatThisStatusReblogged, getNotificationIdsForStatuses } from './statuses'
import { store } from '../_store/store'
import { scheduleIdleTask } from '../_utils/scheduleIdleTask'
import { database } from '../_database/database'
import identity from 'lodash/identity'
function deleteStatusIdsFromStore (instanceName, idsToDelete) {
let idsToDeleteSet = new Set(idsToDelete)
let timelines = store.get('timelines')
if (timelines && timelines[instanceName]) {
Object.keys(timelines[instanceName]).forEach(timelineName => {
let timelineData = timelines[instanceName][timelineName]
if (timelineName !== 'notifications') {
timelineData.timelineItemIds = timelineData.timelineItemIds.filter(_ => !idsToDeleteSet.has(_))
timelineData.itemIdsToAdd = timelineData.itemIdsToAdd.filter(_ => !idsToDeleteSet.has(_))
}
})
store.set({timelines: timelines})
}
}
async function deleteStatusesAndNotifications (instanceName, statusIdsToDelete, notificationIdsToDelete) {
deleteStatusIdsFromStore(instanceName, statusIdsToDelete)
await database.deleteStatusesAndNotifications(instanceName, statusIdsToDelete, notificationIdsToDelete)
}
async function doDeleteStatus (instanceName, statusId) {
let reblogId = await getIdThatThisStatusReblogged(instanceName, statusId)
let rebloggedIds = await getIdsThatRebloggedThisStatus(reblogId || statusId)
let statusIdsToDelete = Array.from(new Set([statusId, reblogId].concat(rebloggedIds).filter(identity)))
let notificationIdsToDelete = new Set(await getNotificationIdsForStatuses(instanceName, statusIdsToDelete))
await Promise.all([
deleteStatusesAndNotifications(instanceName, statusIdsToDelete, notificationIdsToDelete)
])
}
export function deleteStatus (instanceName, statusId) {
scheduleIdleTask(() => {
/* no await */ doDeleteStatus(instanceName, statusId)
})
}

View file

@ -0,0 +1,22 @@
import identity from 'lodash/identity'
import { database } from '../_database/database'
export async function getIdThatThisStatusReblogged (instanceName, statusId) {
let status = await database.getStatus(instanceName, statusId)
return status.reblog && status.reblog.id
}
export async function getIdsThatTheseStatusesReblogged (instanceName, statusIds) {
let reblogIds = await Promise.all(statusIds.map(async statusId => {
return getIdThatThisStatusReblogged(instanceName, statusId)
}))
return reblogIds.filter(identity)
}
export async function getIdsThatRebloggedThisStatus (instanceName, statusId) {
return database.getReblogsForStatus(instanceName, statusId)
}
export async function getNotificationIdsForStatuses (instanceName, statusIds) {
return database.getNotificationIdsForStatuses(instanceName, statusIds)
}

View file

@ -1,71 +1,8 @@
import { TimelineStream } from '../_api/TimelineStream'
import identity from 'lodash/identity'
import { database } from '../_database/database'
import { store } from '../_store/store'
import { scheduleIdleTask } from '../_utils/scheduleIdleTask'
import throttle from 'lodash/throttle'
import { mark, stop } from '../_utils/marks'
async function getReblogIds (instanceName, statusIds) {
let reblogIds = await Promise.all(statusIds.map(async timelineItemId => {
let status = await database.getStatus(instanceName, timelineItemId)
return status.reblog && status.reblog.id
}))
return reblogIds.filter(identity)
}
async function getExistingItemIdsSet (instanceName, timelineName) {
let timelineItemIds = store.getForTimeline(instanceName, timelineName, 'timelineItemIds') || []
if (timelineName === 'notifications') {
return new Set(timelineItemIds)
}
let reblogIds = await getReblogIds(instanceName, timelineItemIds)
return new Set([].concat(timelineItemIds).concat(reblogIds))
}
async function removeDuplicates (instanceName, timelineName, updates) {
// remove duplicates, including duplicates due to reblogs
let existingItemIds = await getExistingItemIdsSet(instanceName, timelineName)
return updates.filter(update => !existingItemIds.has(update.id))
}
async function processFreshUpdates (instanceName, timelineName) {
mark('processFreshUpdates')
let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates')
if (freshUpdates && freshUpdates.length) {
let updates = freshUpdates.slice()
store.setForTimeline(instanceName, timelineName, {freshUpdates: []})
updates = await removeDuplicates(instanceName, timelineName, updates)
await database.insertTimelineItems(instanceName, timelineName, updates)
let itemIdsToAdd = store.getForTimeline(instanceName, timelineName, 'itemIdsToAdd') || []
if (updates && updates.length) {
itemIdsToAdd = itemIdsToAdd.concat(updates.map(_ => _.id))
console.log('adding ', itemIdsToAdd.length, 'items to itemIdsToAdd')
store.setForTimeline(instanceName, timelineName, {itemIdsToAdd: itemIdsToAdd})
}
stop('processFreshUpdates')
}
}
const lazilyProcessFreshUpdates = throttle((instanceName, timelineName) => {
scheduleIdleTask(() => {
/* no await */ processFreshUpdates(instanceName, timelineName)
})
}, 5000)
function processUpdate (instanceName, timelineName, update) {
let freshUpdates = store.getForTimeline(instanceName, timelineName, 'freshUpdates') || []
freshUpdates.push(update)
store.setForTimeline(instanceName, timelineName, {freshUpdates: freshUpdates})
lazilyProcessFreshUpdates(instanceName, timelineName)
}
function processDelete (instanceName, deletion) {
// TODO
}
import { deleteStatus } from './deleteStatuses'
import { addStatusOrNotification } from './addStatusOrNotification'
function processMessage (instanceName, timelineName, message) {
mark('processMessage')
@ -73,13 +10,13 @@ function processMessage (instanceName, timelineName, message) {
let parsedPayload = JSON.parse(payload)
switch (event) {
case 'delete':
processDelete(instanceName, parsedPayload)
deleteStatus(instanceName, parsedPayload)
break
case 'update':
processUpdate(instanceName, timelineName, parsedPayload)
addStatusOrNotification(instanceName, timelineName, parsedPayload)
break
case 'notification':
processUpdate(instanceName, 'notifications', parsedPayload)
addStatusOrNotification(instanceName, 'notifications', parsedPayload)
break
}
stop('processMessage')

View file

@ -64,3 +64,8 @@ export function hasInCache (cache, instanceName, key) {
}
return res
}
export function deleteFromCache (cache, instanceName, key) {
let instanceCache = getOrCreateInstanceCache(cache, instanceName)
instanceCache.delete(key)
}

View file

@ -7,13 +7,13 @@ import {
NOTIFICATIONS_STORE,
NOTIFICATION_TIMELINES_STORE,
PINNED_STATUSES_STORE,
TIMESTAMP
TIMESTAMP, REBLOG_ID
} from './constants'
const openReqs = {}
const databaseCache = {}
const DB_VERSION = 1
const DB_VERSION = 3
export function getDatabase (instanceName) {
if (!instanceName) {
@ -32,20 +32,29 @@ export function getDatabase (instanceName) {
}
req.onupgradeneeded = (e) => {
let db = req.result
db.createObjectStore(STATUSES_STORE, {keyPath: 'id'})
.createIndex(TIMESTAMP, TIMESTAMP)
db.createObjectStore(STATUS_TIMELINES_STORE, {keyPath: 'id'})
.createIndex('statusId', 'statusId')
db.createObjectStore(NOTIFICATIONS_STORE, {keyPath: 'id'})
.createIndex(TIMESTAMP, TIMESTAMP)
db.createObjectStore(NOTIFICATION_TIMELINES_STORE, {keyPath: 'id'})
.createIndex('notificationId', 'notificationId')
db.createObjectStore(ACCOUNTS_STORE, {keyPath: 'id'})
.createIndex(TIMESTAMP, TIMESTAMP)
db.createObjectStore(RELATIONSHIPS_STORE, {keyPath: 'id'})
.createIndex(TIMESTAMP, TIMESTAMP)
db.createObjectStore(META_STORE, {keyPath: 'key'})
db.createObjectStore(PINNED_STATUSES_STORE, {keyPath: 'id'})
let tx = e.currentTarget.transaction
if (e.oldVersion < 1) {
db.createObjectStore(STATUSES_STORE, {keyPath: 'id'})
.createIndex(TIMESTAMP, TIMESTAMP)
db.createObjectStore(STATUS_TIMELINES_STORE, {keyPath: 'id'})
.createIndex('statusId', 'statusId')
db.createObjectStore(NOTIFICATIONS_STORE, {keyPath: 'id'})
.createIndex(TIMESTAMP, TIMESTAMP)
db.createObjectStore(NOTIFICATION_TIMELINES_STORE, {keyPath: 'id'})
.createIndex('notificationId', 'notificationId')
db.createObjectStore(ACCOUNTS_STORE, {keyPath: 'id'})
.createIndex(TIMESTAMP, TIMESTAMP)
db.createObjectStore(RELATIONSHIPS_STORE, {keyPath: 'id'})
.createIndex(TIMESTAMP, TIMESTAMP)
db.createObjectStore(META_STORE, {keyPath: 'key'})
db.createObjectStore(PINNED_STATUSES_STORE, {keyPath: 'id'})
}
if (e.oldVersion < 2) {
tx.objectStore(STATUSES_STORE).createIndex(REBLOG_ID, REBLOG_ID)
}
if (e.oldVersion < 3) {
tx.objectStore(NOTIFICATIONS_STORE).createIndex('statusId', 'statusId')
}
}
req.onsuccess = () => resolve(req.result)
})

View file

@ -1,7 +1,10 @@
import { toPaddedBigInt, toReversePaddedBigInt } from './utils'
import { cloneForStorage } from './helpers'
import { dbPromise, getDatabase } from './databaseLifecycle'
import { accountsCache, getInCache, hasInCache, notificationsCache, setInCache, statusesCache } from './cache'
import {
accountsCache, deleteFromCache, getInCache, hasInCache, notificationsCache, setInCache,
statusesCache
} from './cache'
import { scheduleCleanup } from './cleanup'
import {
ACCOUNTS_STORE,
@ -253,6 +256,82 @@ export async function getNotification (instanceName, id) {
return result
}
//
// lookup by reblogs
//
export async function getReblogsForStatus (instanceName, id) {
const db = await getDatabase(instanceName)
await dbPromise(db, STATUSES_STORE, 'readonly', (statusesStore, callback) => {
statusesStore.index(REBLOG_ID).getAll(IDBKeyRange.only(id)).onsuccess = e => {
callback(e.target.result)
}
})
}
//
// deletes
//
export async function deleteStatusesAndNotifications (instanceName, statusIds, notificationIds) {
for (let statusId of statusIds) {
deleteFromCache(statusesCache, instanceName, statusId)
}
for (let notificationId of notificationIds) {
deleteFromCache(notificationsCache, instanceName, notificationId)
}
const db = await getDatabase(instanceName)
let storeNames = [
STATUSES_STORE,
STATUS_TIMELINES_STORE,
NOTIFICATIONS_STORE,
NOTIFICATION_TIMELINES_STORE,
PINNED_STATUSES_STORE
]
await dbPromise(db, storeNames, 'readwrite', (stores) => {
let [
statusesStore,
statusTimelinesStore,
notificationsStore,
notificationTimelinesStore,
pinnedStatusesStore
] = stores
function deleteStatus (statusId) {
pinnedStatusesStore.delete(statusId).onerror = e => {
e.preventDefault()
e.stopPropagation()
}
statusesStore.delete(statusId)
let getAllReq = statusTimelinesStore.index('statusId')
.getAllKeys(IDBKeyRange.only(statusId))
getAllReq.onsuccess = e => {
for (let result of e.target.result) {
statusTimelinesStore.delete(result)
}
}
}
function deleteNotification (notificationId) {
notificationsStore.delete(notificationId)
let getAllReq = notificationTimelinesStore.index('statusId')
.getAllKeys(IDBKeyRange.only(notificationId))
getAllReq.onsuccess = e => {
for (let result of e.target.result) {
notificationTimelinesStore.delete(result)
}
}
}
for (let statusId of statusIds) {
deleteStatus(statusId)
}
for (let notificationId of notificationIds) {
deleteNotification(notificationId)
}
})
}
//
// pinned statuses
//
@ -296,3 +375,16 @@ export async function getPinnedStatuses (instanceName, accountId) {
}
})
}
//
// notifications by status
//
export async function getNotificationIdsForStatus (instanceName, statusId) {
const db = await getDatabase(instanceName)
return dbPromise(db, NOTIFICATIONS_STORE, 'readonly', (notificationStore, callback) => {
notificationStore.index(statusId).getAllKeys(IDBKeyRange.only(statusId)).onsuccess = e => {
callback(Array.from(e.target.result))
}
})
}