Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion meteor/server/api/peripheralDevice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import { checkAccessAndGetPeripheralDevice } from '../security/check'
import { UserActionsLogItem } from '@sofie-automation/meteor-lib/dist/collections/UserActionsLog'
import { PackageManagerIntegration } from './integration/expectedPackages'
import { profiler } from './profiler'
import { QueueStudioJob } from '../worker/worker'
import { QueueStudioJob, QueueOrUpdateStudioJob } from '../worker/worker'
import { StudioJobs } from '@sofie-automation/corelib/dist/worker/studio'
import {
PlayoutChangedResults,
PeripheralDeviceInitOptions,
PeripheralDeviceStatusObject,
TimelineTriggerTimeResult,
} from '@sofie-automation/shared-lib/dist/peripheralDevice/peripheralDeviceAPI'
import type { PeripheralDeviceExternalEvent } from '@sofie-automation/shared-lib/dist/peripheralDevice/externalEvents'
import { checkStudioExists } from '../optimizations'
import {
ExpectedPackageId,
Expand Down Expand Up @@ -312,6 +313,28 @@ export namespace ServerPeripheralDeviceAPI {

transaction?.end()
}
export async function reportExternalEvents(
context: MethodContext,
deviceId: PeripheralDeviceId,
token: string,
events: PeripheralDeviceExternalEvent[]
): Promise<void> {
const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, context)

if (!peripheralDevice.studioAndConfigId)
throw new Error(`PeripheralDevice "${peripheralDevice._id}" sent reportExternalEvents, but has no studioId`)

if (!events.length) return

const studioId = peripheralDevice.studioAndConfigId.studioId

// Merge events into the last pending OnExternalEvents job in the queue, or enqueue a new one.
// This prevents queue flooding when many events arrive in a burst, or when multiple gateways
// report events for the same studio simultaneously.
QueueOrUpdateStudioJob(StudioJobs.OnExternalEvents, studioId, (existing) => ({
events: [...(existing?.events ?? []), ...events],
}))
}
Comment thread
Julusian marked this conversation as resolved.
export async function pingWithCommand(
context: MethodContext,
deviceId: PeripheralDeviceId,
Expand Down Expand Up @@ -883,6 +906,13 @@ class ServerPeripheralDeviceAPIClass extends MethodContextAPI implements NewPeri
) {
return ServerPeripheralDeviceAPI.playoutPlaybackChanged(this, deviceId, deviceToken, changedResults)
}
async reportExternalEvents(
deviceId: PeripheralDeviceId,
deviceToken: string,
events: PeripheralDeviceExternalEvent[]
) {
return ServerPeripheralDeviceAPI.reportExternalEvents(this, deviceId, deviceToken, events)
}
async reportResolveDone(
deviceId: PeripheralDeviceId,
deviceToken: string,
Expand Down
1 change: 1 addition & 0 deletions meteor/server/publications/_publications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import './lib/lib'
import './buckets'
import './blueprintUpgradeStatus/publication'
import './ingestStatus/publication'
import './externalEventSubscriptions'
import './packageManager/expectedPackages/publication'
import './packageManager/packageContainers'
import './packageManager/playoutContext'
Expand Down
178 changes: 178 additions & 0 deletions meteor/server/publications/externalEventSubscriptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import { PeripheralDeviceId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { assertNever, getHash, literal } from '@sofie-automation/corelib/dist/lib'
import { protectString } from '@sofie-automation/corelib/dist/protectedString'
import { MongoFieldSpecifierOnesStrict } from '@sofie-automation/corelib/dist/mongo'
import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist/RundownPlaylist'
import { ReadonlyDeep } from 'type-fest'
import {
CustomPublish,
CustomPublishCollection,
meteorCustomPublish,
setUpCollectionOptimizedObserver,
SetupObserversResult,
TriggerUpdate,
} from '../lib/customPublication'
import { logger } from '../logging'
import { RundownPlaylists, Rundowns } from '../collections'
import {
PeripheralDevicePubSub,
PeripheralDevicePubSubCollectionsNames,
ExternalEventSubscriptionDocument,
ExternalEventSubscriptionId,
} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice'
import type { PeripheralDeviceExternalEvent } from '@sofie-automation/shared-lib/dist/peripheralDevice/externalEvents'
import { checkAccessAndGetPeripheralDevice } from '../security/check'
import { check } from '../lib/check'

type RundownPlaylistFields = '_id' | 'activationId'
const rundownPlaylistFieldSpecifier = literal<
MongoFieldSpecifierOnesStrict<Pick<DBRundownPlaylist, RundownPlaylistFields>>
>({
_id: 1,
activationId: 1,
})

type RundownFields = '_id' | 'playlistId' | 'externalEventSubscriptions'
const rundownFieldSpecifier = literal<MongoFieldSpecifierOnesStrict<Pick<DBRundown, RundownFields>>>({
_id: 1,
playlistId: 1,
externalEventSubscriptions: 1,
})

interface ExternalEventSubscriptionsArgs {
readonly studioId: StudioId
readonly type: PeripheralDeviceExternalEvent['type']
}

// eslint-disable-next-line @typescript-eslint/no-empty-object-type
interface ExternalEventSubscriptionsState {}

interface ExternalEventSubscriptionsUpdateProps {
invalidateAll: true
}

async function setupExternalEventSubscriptionsObservers(
args: ReadonlyDeep<ExternalEventSubscriptionsArgs>,
triggerUpdate: TriggerUpdate<ExternalEventSubscriptionsUpdateProps>
): Promise<SetupObserversResult> {
const trigger = () => triggerUpdate({ invalidateAll: true })

return [
// Observe active playlists in the studio — activation/deactivation changes which rundowns are in scope
RundownPlaylists.observeChanges(
{ studioId: args.studioId },
{ added: trigger, changed: trigger, removed: trigger },
{ projection: rundownPlaylistFieldSpecifier }
),
// Observe rundowns in the studio — react only when externalEventSubscriptions or playlistId changes
Rundowns.observeChanges(
{ studioId: args.studioId },
{ added: trigger, changed: trigger, removed: trigger },
{ projection: rundownFieldSpecifier }
),
]
}

async function manipulateExternalEventSubscriptionsData(
args: ReadonlyDeep<ExternalEventSubscriptionsArgs>,
_state: Partial<ExternalEventSubscriptionsState>,
collection: CustomPublishCollection<ExternalEventSubscriptionDocument>,
_updateProps: Partial<ReadonlyDeep<ExternalEventSubscriptionsUpdateProps>> | undefined
): Promise<void> {
// Find all active playlists in the studio
const activePlaylists = (await RundownPlaylists.findFetchAsync(
{ studioId: args.studioId, activationId: { $exists: true } },
{ projection: rundownPlaylistFieldSpecifier }
)) as Pick<DBRundownPlaylist, RundownPlaylistFields>[]
const activePlaylistIds = activePlaylists.map((p) => p._id)

// Find rundowns belonging to active playlists
const activeRundowns = (await Rundowns.findFetchAsync(
{ studioId: args.studioId, playlistId: { $in: activePlaylistIds } },
{ projection: rundownFieldSpecifier }
)) as Pick<DBRundown, RundownFields>[]

// Build the set of valid IDs and the docs to upsert (filtered by type)
const validIds = new Set<ExternalEventSubscriptionId>()
const subsToUpsert: ExternalEventSubscriptionDocument[] = []

for (const rundown of activeRundowns) {
for (const sub of rundown.externalEventSubscriptions ?? []) {
if (sub.type !== args.type) continue

switch (sub.type) {
case 'tsr': {
const id = protectString<ExternalEventSubscriptionId>(
getHash(`tsr_${sub.deviceId}_${sub.deviceType}_${String(sub.event)}`)
)
validIds.add(id)
subsToUpsert.push({
_id: id,
type: 'tsr',
deviceId: sub.deviceId,
deviceType: sub.deviceType,
event: sub.event as string,
})
break
}
default:
assertNever(sub.type)
break
}
}
}

// Remove docs for subscriptions that are no longer active
collection.remove((doc) => !validIds.has(doc._id))

// Upsert each individual subscription doc
for (const sub of subsToUpsert) {
collection.replace(sub)
}
}

async function startOrJoinExternalEventSubscriptionsPublication(
pub: CustomPublish<ExternalEventSubscriptionDocument>,
studioId: StudioId,
type: PeripheralDeviceExternalEvent['type']
) {
await setUpCollectionOptimizedObserver<
ExternalEventSubscriptionDocument,
ExternalEventSubscriptionsArgs,
ExternalEventSubscriptionsState,
ExternalEventSubscriptionsUpdateProps
>(
`pub_${PeripheralDevicePubSub.externalEventSubscriptionsForDevice}_${studioId}_${type}`,
{ studioId, type },
setupExternalEventSubscriptionsObservers,
manipulateExternalEventSubscriptionsData,
pub,
100
)
}

meteorCustomPublish(
PeripheralDevicePubSub.externalEventSubscriptionsForDevice,
PeripheralDevicePubSubCollectionsNames.externalEventSubscriptions,
async function (
pub: CustomPublish<ExternalEventSubscriptionDocument>,
type: PeripheralDeviceExternalEvent['type'],
deviceId: PeripheralDeviceId,
token: string | undefined
) {
check(deviceId, String)

const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, this)
Comment thread
Julusian marked this conversation as resolved.

const studioId = peripheralDevice.studioAndConfigId?.studioId
if (!studioId) {
logger.warn(
`Publication ${PeripheralDevicePubSub.externalEventSubscriptionsForDevice}: device ${deviceId} has no studio`
)
return
}

await startOrJoinExternalEventSubscriptionsPublication(pub, studioId, type)
}
)
3 changes: 3 additions & 0 deletions meteor/server/publications/rundown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ meteorPublish(
{
projection: {
privateData: 0,
externalEventSubscriptions: 0,
},
}
)
Expand All @@ -90,6 +91,7 @@ meteorPublish(
const modifier: FindOptions<DBRundown> = {
projection: {
privateData: 0,
externalEventSubscriptions: 0,
},
}

Expand All @@ -112,6 +114,7 @@ meteorPublish(
const modifier: FindOptions<DBRundown> = {
projection: {
privateData: 0,
externalEventSubscriptions: 0,
},
}

Expand Down
40 changes: 40 additions & 0 deletions meteor/server/worker/jobQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,46 @@ export class WorkerJobQueueManager {
}
this.#runningJobs.clear()
}

/**
* Merge new data into the last pending job in the queue if it has the given name,
* otherwise enqueue a new fire-and-forget job.
*
* This prevents queue flooding when high-frequency events arrive: as long as the tail job
* has not yet been picked up by the worker, incoming data is merged into it in-place.
* Once the worker starts executing it or another jobName gets queued, the next call will enqueue a fresh job.
*
* @param queueName - The target queue
* @param jobName - The job type name to match against
* @param generateData - Called to produce job data. Receives the existing job's data when
* merging into a tail job, or `null` when creating a new job.
*/
mergeOrQueueJob(queueName: string, jobName: string, generateData: (existing: unknown | null) => unknown): void {
const queue = this.#getOrCreateQueue(queueName)

// Only inspect the very last entry. Scanning further back would risk updating a job in the
// middle of the queue, breaking the ordering guarantee relative to other job types.
const tail = queue.jobsHighPriority[queue.jobsHighPriority.length - 1]
if (tail && tail.spec.name === jobName) {
// Merge into the existing tail job in-place
tail.spec.data = generateData(tail.spec.data)
logger.debug(`Merged events into existing "${jobName}" job in queue "${queueName}"`)
} else {
// Tail is absent, already picked up, or a different job type — push a new job
const entry: JobEntry = {
spec: {
id: getRandomString(),
name: jobName,
data: generateData(null),
},
completionHandler: null,
}
queue.jobsHighPriority.push(entry)
queue.metricsTotal.inc()
logger.debug(`Enqueued new "${jobName}" job in queue "${queueName}"`)
this.#notifyWorker(queue)
}
}
}

function generateCompletionHandler<TRes>(
Expand Down
25 changes: 25 additions & 0 deletions meteor/server/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,31 @@ export async function QueueStudioJob<T extends keyof StudioJobFunc>(
return queueManager.queueJobAndWrapResult(getStudioQueueName(studioId), jobName, jobParameters, now, options)
}

/**
* Merge new job data into the last pending job in the studio's queue if it has the same name,
* otherwise enqueue a new fire-and-forget job.
*
* Use this instead of {@link QueueStudioJob} when events can be safely batched together and you
* do not need to await the result (e.g. device feedback events from gateways).
*
* @param jobName - The job type
* @param studioId - Id of the studio
* @param generateData - Called to produce the job data. Receives the existing pending job's data
* when merging, or `null` when creating a new job. Both cases are handled by the same function.
*/
export function QueueOrUpdateStudioJob<T extends keyof StudioJobFunc>(
jobName: T,
studioId: StudioId,
generateData: (existing: Parameters<StudioJobFunc[T]>[0] | null) => Parameters<StudioJobFunc[T]>[0]
): void {
if (!studioId) throw new Meteor.Error(500, 'Missing studioId')
queueManager.mergeOrQueueJob(
getStudioQueueName(studioId),
jobName,
generateData as (existing: unknown | null) => unknown
)
}

/**
* Queue a job for ingest
* @param jobName Job name
Expand Down
10 changes: 5 additions & 5 deletions meteor/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ __metadata:
dependencies:
"@mos-connection/model": "npm:^5.0.0-alpha.0"
kairos-lib: "npm:^1.0.0"
timeline-state-resolver-types: "npm:10.0.0-nightly-main-20260415-093148-f35c4cadc.0"
timeline-state-resolver-types: "npm:10.0.0-nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0"
tslib: "npm:^2.8.1"
type-fest: "npm:^4.41.0"
languageName: node
Expand Down Expand Up @@ -9779,13 +9779,13 @@ __metadata:
languageName: node
linkType: hard

"timeline-state-resolver-types@npm:10.0.0-nightly-main-20260415-093148-f35c4cadc.0":
version: 10.0.0-nightly-main-20260415-093148-f35c4cadc.0
resolution: "timeline-state-resolver-types@npm:10.0.0-nightly-main-20260415-093148-f35c4cadc.0"
"timeline-state-resolver-types@npm:10.0.0-nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0":
version: 10.0.0-nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0
resolution: "timeline-state-resolver-types@npm:10.0.0-nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0"
dependencies:
kairos-lib: "npm:1.0.0"
tslib: "npm:^2.8.1"
checksum: 10/712578068c41e6ac25c5f305528933f2c5c878230304fed9faa38d2d21d6fc1923bd7ad0f412fe9bf8fd684830ee628e92352ce882740c59ef9d181a98c95b7f
checksum: 10/f470c82d6329f14ebc55adb97eaa7685164819cd2e1e3a8cdc6836c96df47d5ff43e6b4dcefed341df1eeaefe7a71988d869d830cbba347660884c8829bfb865
languageName: node
linkType: hard

Expand Down
Loading
Loading