Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion meteor/server/api/peripheralDevice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import { checkAccessAndGetPeripheralDevice } from '../security/check'
import { UserActionsLogItem } from '@sofie-automation/meteor-lib/dist/collections/UserActionsLog'
import { PackageManagerIntegration } from './integration/expectedPackages'
import { profiler } from './profiler'
import { QueueStudioJob } from '../worker/worker'
import { QueueStudioJob, QueueOrUpdateStudioJob } from '../worker/worker'
import { StudioJobs } from '@sofie-automation/corelib/dist/worker/studio'
import {
PlayoutChangedResults,
Expand All @@ -46,6 +46,7 @@ import {
TimelineTriggerTimeResult,
DeviceStatusDetail,
} from '@sofie-automation/shared-lib/dist/peripheralDevice/peripheralDeviceAPI'
import type { PeripheralDeviceExternalEvent } from '@sofie-automation/shared-lib/dist/peripheralDevice/externalEvents'
import { checkStudioExists } from '../optimizations'
import {
ExpectedPackageId,
Expand Down Expand Up @@ -465,6 +466,32 @@ export namespace ServerPeripheralDeviceAPI {

transaction?.end()
}
export async function reportExternalEvents(
context: MethodContext,
deviceId: PeripheralDeviceId,
token: string,
events: PeripheralDeviceExternalEvent[]
): Promise<void> {
check(events, Array)

const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, context)

if (!peripheralDevice.studioAndConfigId)
throw new Error(`PeripheralDevice "${peripheralDevice._id}" sent reportExternalEvents, but has no studioId`)

if (!events.length) return

const studioId = peripheralDevice.studioAndConfigId.studioId
// An arbitrary cap, to avoid unbound memory growth
const MAX_PENDING_EXTERNAL_EVENTS = 1000

// Merge events into the last pending OnExternalEvents job in the queue, or enqueue a new one.
// This prevents queue flooding when many events arrive in a burst, or when multiple gateways
// report events for the same studio simultaneously.
QueueOrUpdateStudioJob(StudioJobs.OnExternalEvents, studioId, (existing) => ({
events: [...(existing?.events ?? []), ...events].slice(-MAX_PENDING_EXTERNAL_EVENTS),
}))
}
Comment thread
Julusian marked this conversation as resolved.
export async function pingWithCommand(
context: MethodContext,
deviceId: PeripheralDeviceId,
Expand Down Expand Up @@ -1036,6 +1063,13 @@ class ServerPeripheralDeviceAPIClass extends MethodContextAPI implements NewPeri
) {
return ServerPeripheralDeviceAPI.playoutPlaybackChanged(this, deviceId, deviceToken, changedResults)
}
async reportExternalEvents(
deviceId: PeripheralDeviceId,
deviceToken: string,
events: PeripheralDeviceExternalEvent[]
) {
return ServerPeripheralDeviceAPI.reportExternalEvents(this, deviceId, deviceToken, events)
}
async reportResolveDone(
deviceId: PeripheralDeviceId,
deviceToken: string,
Expand Down
1 change: 1 addition & 0 deletions meteor/server/publications/_publications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import './lib/lib'
import './buckets'
import './blueprintUpgradeStatus/publication'
import './ingestStatus/publication'
import './externalEventSubscriptions'
import './packageManager/expectedPackages/publication'
import './packageManager/packageContainers'
import './packageManager/playoutContext'
Expand Down
179 changes: 179 additions & 0 deletions meteor/server/publications/externalEventSubscriptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import { PeripheralDeviceId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { assertNever, getHash, literal } from '@sofie-automation/corelib/dist/lib'
import { protectString } from '@sofie-automation/corelib/dist/protectedString'
import { MongoFieldSpecifierOnesStrict } from '@sofie-automation/corelib/dist/mongo'
import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist/RundownPlaylist'
import { ReadonlyDeep } from 'type-fest'
import {
CustomPublish,
CustomPublishCollection,
meteorCustomPublish,
setUpCollectionOptimizedObserver,
SetupObserversResult,
TriggerUpdate,
} from '../lib/customPublication'
import { logger } from '../logging'
import { RundownPlaylists, Rundowns } from '../collections'
import {
PeripheralDevicePubSub,
PeripheralDevicePubSubCollectionsNames,
ExternalEventSubscriptionDocument,
ExternalEventSubscriptionId,
} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice'
import type { PeripheralDeviceExternalEvent } from '@sofie-automation/shared-lib/dist/peripheralDevice/externalEvents'
import { checkAccessAndGetPeripheralDevice } from '../security/check'
import { check } from '../lib/check'

type RundownPlaylistFields = '_id' | 'activationId'
const rundownPlaylistFieldSpecifier = literal<
MongoFieldSpecifierOnesStrict<Pick<DBRundownPlaylist, RundownPlaylistFields>>
>({
_id: 1,
activationId: 1,
})

type RundownFields = '_id' | 'playlistId' | 'externalEventSubscriptions'
const rundownFieldSpecifier = literal<MongoFieldSpecifierOnesStrict<Pick<DBRundown, RundownFields>>>({
_id: 1,
playlistId: 1,
externalEventSubscriptions: 1,
})

interface ExternalEventSubscriptionsArgs {
readonly studioId: StudioId
readonly type: PeripheralDeviceExternalEvent['type']
}

// eslint-disable-next-line @typescript-eslint/no-empty-object-type
interface ExternalEventSubscriptionsState {}

interface ExternalEventSubscriptionsUpdateProps {
invalidateAll: true
}

async function setupExternalEventSubscriptionsObservers(
args: ReadonlyDeep<ExternalEventSubscriptionsArgs>,
triggerUpdate: TriggerUpdate<ExternalEventSubscriptionsUpdateProps>
): Promise<SetupObserversResult> {
const trigger = () => triggerUpdate({ invalidateAll: true })

return [
// Observe active playlists in the studio — activation/deactivation changes which rundowns are in scope
RundownPlaylists.observeChanges(
{ studioId: args.studioId },
{ added: trigger, changed: trigger, removed: trigger },
{ projection: rundownPlaylistFieldSpecifier }
),
// Observe rundowns in the studio — react only when externalEventSubscriptions or playlistId changes
Rundowns.observeChanges(
{ studioId: args.studioId },
{ added: trigger, changed: trigger, removed: trigger },
{ projection: rundownFieldSpecifier }
),
]
}

async function manipulateExternalEventSubscriptionsData(
args: ReadonlyDeep<ExternalEventSubscriptionsArgs>,
_state: Partial<ExternalEventSubscriptionsState>,
collection: CustomPublishCollection<ExternalEventSubscriptionDocument>,
_updateProps: Partial<ReadonlyDeep<ExternalEventSubscriptionsUpdateProps>> | undefined
): Promise<void> {
// Find all active playlists in the studio
const activePlaylists = (await RundownPlaylists.findFetchAsync(
{ studioId: args.studioId, activationId: { $exists: true } },
{ projection: rundownPlaylistFieldSpecifier }
)) as Pick<DBRundownPlaylist, RundownPlaylistFields>[]
const activePlaylistIds = activePlaylists.map((p) => p._id)

// Find rundowns belonging to active playlists
const activeRundowns = (await Rundowns.findFetchAsync(
{ studioId: args.studioId, playlistId: { $in: activePlaylistIds } },
{ projection: rundownFieldSpecifier }
)) as Pick<DBRundown, RundownFields>[]

// Build the set of valid IDs and the docs to upsert (filtered by type)
const validIds = new Set<ExternalEventSubscriptionId>()
const subsToUpsert: ExternalEventSubscriptionDocument[] = []

for (const rundown of activeRundowns) {
for (const sub of rundown.externalEventSubscriptions ?? []) {
if (sub.type !== args.type) continue

switch (sub.type) {
case 'tsr': {
const id = protectString<ExternalEventSubscriptionId>(
getHash(`tsr_${sub.deviceId}_${sub.deviceType}_${String(sub.event)}`)
)
validIds.add(id)
subsToUpsert.push({
_id: id,
type: 'tsr',
deviceId: sub.deviceId,
deviceType: sub.deviceType,
event: sub.event as string,
})
break
}
default:
assertNever(sub.type)
break
}
}
}

// Remove docs for subscriptions that are no longer active
collection.remove((doc) => !validIds.has(doc._id))

// Upsert each individual subscription doc
for (const sub of subsToUpsert) {
collection.replace(sub)
}
}

async function startOrJoinExternalEventSubscriptionsPublication(
pub: CustomPublish<ExternalEventSubscriptionDocument>,
studioId: StudioId,
type: PeripheralDeviceExternalEvent['type']
) {
await setUpCollectionOptimizedObserver<
ExternalEventSubscriptionDocument,
ExternalEventSubscriptionsArgs,
ExternalEventSubscriptionsState,
ExternalEventSubscriptionsUpdateProps
>(
`pub_${PeripheralDevicePubSub.externalEventSubscriptionsForDevice}_${studioId}_${type}`,
{ studioId, type },
setupExternalEventSubscriptionsObservers,
manipulateExternalEventSubscriptionsData,
pub,
100
)
}

meteorCustomPublish(
PeripheralDevicePubSub.externalEventSubscriptionsForDevice,
PeripheralDevicePubSubCollectionsNames.externalEventSubscriptions,
async function (
pub: CustomPublish<ExternalEventSubscriptionDocument>,
type: PeripheralDeviceExternalEvent['type'],
deviceId: PeripheralDeviceId,
token: string | undefined
) {
check(deviceId, String)
check(type, String)

const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, this)
Comment thread
Julusian marked this conversation as resolved.

const studioId = peripheralDevice.studioAndConfigId?.studioId
if (!studioId) {
logger.warn(
`Publication ${PeripheralDevicePubSub.externalEventSubscriptionsForDevice}: device ${deviceId} has no studio`
)
return
}

await startOrJoinExternalEventSubscriptionsPublication(pub, studioId, type)
}
)
3 changes: 3 additions & 0 deletions meteor/server/publications/rundown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ meteorPublish(
{
projection: {
privateData: 0,
externalEventSubscriptions: 0,
},
}
)
Expand All @@ -90,6 +91,7 @@ meteorPublish(
const modifier: FindOptions<DBRundown> = {
projection: {
privateData: 0,
externalEventSubscriptions: 0,
},
}

Expand All @@ -112,6 +114,7 @@ meteorPublish(
const modifier: FindOptions<DBRundown> = {
projection: {
privateData: 0,
externalEventSubscriptions: 0,
},
}

Expand Down
Loading
Loading