Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion meteor/server/api/peripheralDevice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import { checkAccessAndGetPeripheralDevice } from '../security/check'
import { UserActionsLogItem } from '@sofie-automation/meteor-lib/dist/collections/UserActionsLog'
import { PackageManagerIntegration } from './integration/expectedPackages'
import { profiler } from './profiler'
import { QueueStudioJob } from '../worker/worker'
import { QueueStudioJob, QueueOrUpdateStudioJob } from '../worker/worker'
import { StudioJobs } from '@sofie-automation/corelib/dist/worker/studio'
import {
PlayoutChangedResults,
PeripheralDeviceInitOptions,
PeripheralDeviceStatusObject,
TimelineTriggerTimeResult,
} from '@sofie-automation/shared-lib/dist/peripheralDevice/peripheralDeviceAPI'
import type { PeripheralDeviceExternalEvent } from '@sofie-automation/shared-lib/dist/peripheralDevice/externalEvents'
import { checkStudioExists } from '../optimizations'
import {
ExpectedPackageId,
Expand Down Expand Up @@ -312,6 +313,28 @@ export namespace ServerPeripheralDeviceAPI {

transaction?.end()
}
export async function reportExternalEvents(
context: MethodContext,
deviceId: PeripheralDeviceId,
token: string,
events: PeripheralDeviceExternalEvent[]
): Promise<void> {
const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, context)

if (!peripheralDevice.studioAndConfigId)
throw new Error(`PeripheralDevice "${peripheralDevice._id}" sent reportExternalEvents, but has no studioId`)

if (!events.length) return

const studioId = peripheralDevice.studioAndConfigId.studioId

// Merge events into the last pending OnExternalEvents job in the queue, or enqueue a new one.
// This prevents queue flooding when many events arrive in a burst, or when multiple gateways
// report events for the same studio simultaneously.
QueueOrUpdateStudioJob(StudioJobs.OnExternalEvents, studioId, (existing) => ({
events: [...(existing?.events ?? []), ...events],
}))
}
Comment on lines +316 to +337
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bound and validate incoming event batches before merge.

Right now every call blindly appends to pending job data. Under burst traffic (or a stalled worker), this can grow unbounded and cause memory pressure.

Suggested fix
 export async function reportExternalEvents(
 	context: MethodContext,
 	deviceId: PeripheralDeviceId,
 	token: string,
 	events: PeripheralDeviceExternalEvent[]
 ): Promise<void> {
 	const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, context)
+	check(events, Array)

 	if (!peripheralDevice.studioAndConfigId)
 		throw new Error(`PeripheralDevice "${peripheralDevice._id}" sent reportExternalEvents, but has no studioId`)

 	if (!events.length) return

 	const studioId = peripheralDevice.studioAndConfigId.studioId
+	const MAX_PENDING_EXTERNAL_EVENTS = 1000

 	// Merge events into the last pending OnExternalEvents job in the queue, or enqueue a new one.
 	// This prevents queue flooding when many events arrive in a burst, or when multiple gateways
 	// report events for the same studio simultaneously.
 	QueueOrUpdateStudioJob(StudioJobs.OnExternalEvents, studioId, (existing) => ({
-		events: [...(existing?.events ?? []), ...events],
+		events: [...(existing?.events ?? []), ...events].slice(-MAX_PENDING_EXTERNAL_EVENTS),
 	}))
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@meteor/server/api/peripheralDevice.ts` around lines 316 - 337,
reportExternalEvents currently appends unbounded incoming events into the
pending StudioJobs.OnExternalEvents job, which can grow without limit under
bursts; update the reportExternalEvents handler to validate each incoming
PeripheralDeviceExternalEvent (type/required fields) and enforce a hard maximum
batch size before merging (e.g. define a MAX_EVENTS_PER_JOB constant), then when
calling QueueOrUpdateStudioJob merge sanitized events and truncate the resulting
array to that maximum (choose a policy: drop oldest or newest) so the stored job
data cannot grow unbounded; reference the reportExternalEvents function,
StudioJobs.OnExternalEvents and QueueOrUpdateStudioJob when making the change.

export async function pingWithCommand(
context: MethodContext,
deviceId: PeripheralDeviceId,
Expand Down Expand Up @@ -883,6 +906,13 @@ class ServerPeripheralDeviceAPIClass extends MethodContextAPI implements NewPeri
) {
return ServerPeripheralDeviceAPI.playoutPlaybackChanged(this, deviceId, deviceToken, changedResults)
}
async reportExternalEvents(
deviceId: PeripheralDeviceId,
deviceToken: string,
events: PeripheralDeviceExternalEvent[]
) {
return ServerPeripheralDeviceAPI.reportExternalEvents(this, deviceId, deviceToken, events)
}
async reportResolveDone(
deviceId: PeripheralDeviceId,
deviceToken: string,
Expand Down
1 change: 1 addition & 0 deletions meteor/server/publications/_publications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import './lib/lib'
import './buckets'
import './blueprintUpgradeStatus/publication'
import './ingestStatus/publication'
import './externalEventSubscriptions'
import './packageManager/expectedPackages/publication'
import './packageManager/packageContainers'
import './packageManager/playoutContext'
Expand Down
178 changes: 178 additions & 0 deletions meteor/server/publications/externalEventSubscriptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import { PeripheralDeviceId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { assertNever, getHash, literal } from '@sofie-automation/corelib/dist/lib'
import { protectString } from '@sofie-automation/corelib/dist/protectedString'
import { MongoFieldSpecifierOnesStrict } from '@sofie-automation/corelib/dist/mongo'
import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist/RundownPlaylist'
import { ReadonlyDeep } from 'type-fest'
import {
CustomPublish,
CustomPublishCollection,
meteorCustomPublish,
setUpCollectionOptimizedObserver,
SetupObserversResult,
TriggerUpdate,
} from '../lib/customPublication'
import { logger } from '../logging'
import { RundownPlaylists, Rundowns } from '../collections'
import {
PeripheralDevicePubSub,
PeripheralDevicePubSubCollectionsNames,
ExternalEventSubscriptionDocument,
ExternalEventSubscriptionId,
} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice'
import type { PeripheralDeviceExternalEvent } from '@sofie-automation/shared-lib/dist/peripheralDevice/externalEvents'
import { checkAccessAndGetPeripheralDevice } from '../security/check'
import { check } from '../lib/check'

type RundownPlaylistFields = '_id' | 'activationId'
const rundownPlaylistFieldSpecifier = literal<
MongoFieldSpecifierOnesStrict<Pick<DBRundownPlaylist, RundownPlaylistFields>>
>({
_id: 1,
activationId: 1,
})

type RundownFields = '_id' | 'playlistId' | 'externalEventSubscriptions'
const rundownFieldSpecifier = literal<MongoFieldSpecifierOnesStrict<Pick<DBRundown, RundownFields>>>({
_id: 1,
playlistId: 1,
externalEventSubscriptions: 1,
})

interface ExternalEventSubscriptionsArgs {
readonly studioId: StudioId
readonly type: PeripheralDeviceExternalEvent['type']
}

// eslint-disable-next-line @typescript-eslint/no-empty-object-type
interface ExternalEventSubscriptionsState {}

interface ExternalEventSubscriptionsUpdateProps {
invalidateAll: true
}

async function setupExternalEventSubscriptionsObservers(
args: ReadonlyDeep<ExternalEventSubscriptionsArgs>,
triggerUpdate: TriggerUpdate<ExternalEventSubscriptionsUpdateProps>
): Promise<SetupObserversResult> {
const trigger = () => triggerUpdate({ invalidateAll: true })

return [
// Observe active playlists in the studio — activation/deactivation changes which rundowns are in scope
RundownPlaylists.observeChanges(
{ studioId: args.studioId },
{ added: trigger, changed: trigger, removed: trigger },
{ projection: rundownPlaylistFieldSpecifier }
),
// Observe rundowns in the studio — react only when externalEventSubscriptions or playlistId changes
Rundowns.observeChanges(
{ studioId: args.studioId },
{ added: trigger, changed: trigger, removed: trigger },
{ projection: rundownFieldSpecifier }
),
]
}

async function manipulateExternalEventSubscriptionsData(
args: ReadonlyDeep<ExternalEventSubscriptionsArgs>,
_state: Partial<ExternalEventSubscriptionsState>,
collection: CustomPublishCollection<ExternalEventSubscriptionDocument>,
_updateProps: Partial<ReadonlyDeep<ExternalEventSubscriptionsUpdateProps>> | undefined
): Promise<void> {
// Find all active playlists in the studio
const activePlaylists = (await RundownPlaylists.findFetchAsync(
{ studioId: args.studioId, activationId: { $exists: true } },
{ projection: rundownPlaylistFieldSpecifier }
)) as Pick<DBRundownPlaylist, RundownPlaylistFields>[]
const activePlaylistIds = activePlaylists.map((p) => p._id)

// Find rundowns belonging to active playlists
const activeRundowns = (await Rundowns.findFetchAsync(
{ studioId: args.studioId, playlistId: { $in: activePlaylistIds } },
{ projection: rundownFieldSpecifier }
)) as Pick<DBRundown, RundownFields>[]

// Build the set of valid IDs and the docs to upsert (filtered by type)
const validIds = new Set<ExternalEventSubscriptionId>()
const subsToUpsert: ExternalEventSubscriptionDocument[] = []

for (const rundown of activeRundowns) {
for (const sub of rundown.externalEventSubscriptions ?? []) {
if (sub.type !== args.type) continue

switch (sub.type) {
case 'tsr': {
const id = protectString<ExternalEventSubscriptionId>(
getHash(`tsr_${sub.deviceId}_${sub.deviceType}_${String(sub.event)}`)
)
validIds.add(id)
subsToUpsert.push({
_id: id,
type: 'tsr',
deviceId: sub.deviceId,
deviceType: sub.deviceType,
event: sub.event as string,
})
break
}
default:
assertNever(sub.type)
break
}
}
}

// Remove docs for subscriptions that are no longer active
collection.remove((doc) => !validIds.has(doc._id))

// Upsert each individual subscription doc
for (const sub of subsToUpsert) {
collection.replace(sub)
}
}

async function startOrJoinExternalEventSubscriptionsPublication(
pub: CustomPublish<ExternalEventSubscriptionDocument>,
studioId: StudioId,
type: PeripheralDeviceExternalEvent['type']
) {
await setUpCollectionOptimizedObserver<
ExternalEventSubscriptionDocument,
ExternalEventSubscriptionsArgs,
ExternalEventSubscriptionsState,
ExternalEventSubscriptionsUpdateProps
>(
`pub_${PeripheralDevicePubSub.externalEventSubscriptionsForDevice}_${studioId}_${type}`,
{ studioId, type },
setupExternalEventSubscriptionsObservers,
manipulateExternalEventSubscriptionsData,
pub,
100
)
}

meteorCustomPublish(
PeripheralDevicePubSub.externalEventSubscriptionsForDevice,
PeripheralDevicePubSubCollectionsNames.externalEventSubscriptions,
async function (
pub: CustomPublish<ExternalEventSubscriptionDocument>,
type: PeripheralDeviceExternalEvent['type'],
deviceId: PeripheralDeviceId,
token: string | undefined
) {
check(deviceId, String)

const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, this)
Comment on lines +163 to +166
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing input validation for type parameter.

The deviceId is validated with check(deviceId, String), but the type parameter is passed directly without validation. Since type is used to filter subscription documents and forms part of the observer key, an invalid or unexpected value could cause subtle misbehavior.

Add a check() call for the type parameter to ensure runtime safety.

🛡️ Proposed fix to add type validation
 	async function (
 		pub: CustomPublish<ExternalEventSubscriptionDocument>,
 		type: PeripheralDeviceExternalEvent['type'],
 		deviceId: PeripheralDeviceId,
 		token: string | undefined
 	) {
 		check(deviceId, String)
+		check(type, String)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@meteor/server/publications/externalEventSubscriptions.ts` around lines 163 -
166, Add runtime validation for the `type` parameter in the same function that
currently does `check(deviceId, String)` (in
meteor/server/publications/externalEventSubscriptions.ts) by calling
`check(type, String)` before using `type` to filter subscription documents and
construct the observer key (e.g., where `peripheralDevice` is obtained via
`checkAccessAndGetPeripheralDevice`); this ensures `type` is a string and
prevents invalid values from affecting the publication observer behavior.


const studioId = peripheralDevice.studioAndConfigId?.studioId
if (!studioId) {
logger.warn(
`Publication ${PeripheralDevicePubSub.externalEventSubscriptionsForDevice}: device ${deviceId} has no studio`
)
return
}

await startOrJoinExternalEventSubscriptionsPublication(pub, studioId, type)
}
)
3 changes: 3 additions & 0 deletions meteor/server/publications/rundown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ meteorPublish(
{
projection: {
privateData: 0,
externalEventSubscriptions: 0,
},
}
)
Expand All @@ -90,6 +91,7 @@ meteorPublish(
const modifier: FindOptions<DBRundown> = {
projection: {
privateData: 0,
externalEventSubscriptions: 0,
},
}

Expand All @@ -112,6 +114,7 @@ meteorPublish(
const modifier: FindOptions<DBRundown> = {
projection: {
privateData: 0,
externalEventSubscriptions: 0,
},
}

Expand Down
40 changes: 40 additions & 0 deletions meteor/server/worker/jobQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,46 @@ export class WorkerJobQueueManager {
}
this.#runningJobs.clear()
}

/**
* Merge new data into the last pending job in the queue if it has the given name,
* otherwise enqueue a new fire-and-forget job.
*
* This prevents queue flooding when high-frequency events arrive: as long as the tail job
* has not yet been picked up by the worker, incoming data is merged into it in-place.
* Once the worker starts executing it or another jobName gets queued, the next call will enqueue a fresh job.
*
* @param queueName - The target queue
* @param jobName - The job type name to match against
* @param generateData - Called to produce job data. Receives the existing job's data when
* merging into a tail job, or `null` when creating a new job.
*/
mergeOrQueueJob(queueName: string, jobName: string, generateData: (existing: unknown | null) => unknown): void {
const queue = this.#getOrCreateQueue(queueName)

// Only inspect the very last entry. Scanning further back would risk updating a job in the
// middle of the queue, breaking the ordering guarantee relative to other job types.
const tail = queue.jobsHighPriority[queue.jobsHighPriority.length - 1]
if (tail && tail.spec.name === jobName) {
// Merge into the existing tail job in-place
tail.spec.data = generateData(tail.spec.data)
logger.debug(`Merged events into existing "${jobName}" job in queue "${queueName}"`)
} else {
// Tail is absent, already picked up, or a different job type — push a new job
const entry: JobEntry = {
spec: {
id: getRandomString(),
name: jobName,
data: generateData(null),
},
completionHandler: null,
}
queue.jobsHighPriority.push(entry)
queue.metricsTotal.inc()
logger.debug(`Enqueued new "${jobName}" job in queue "${queueName}"`)
this.#notifyWorker(queue)
}
}
}

function generateCompletionHandler<TRes>(
Expand Down
25 changes: 25 additions & 0 deletions meteor/server/worker/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,31 @@ export async function QueueStudioJob<T extends keyof StudioJobFunc>(
return queueManager.queueJobAndWrapResult(getStudioQueueName(studioId), jobName, jobParameters, now, options)
}

/**
* Merge new job data into the last pending job in the studio's queue if it has the same name,
* otherwise enqueue a new fire-and-forget job.
*
* Use this instead of {@link QueueStudioJob} when events can be safely batched together and you
* do not need to await the result (e.g. device feedback events from gateways).
*
* @param jobName - The job type
* @param studioId - Id of the studio
* @param generateData - Called to produce the job data. Receives the existing pending job's data
* when merging, or `null` when creating a new job. Both cases are handled by the same function.
*/
export function QueueOrUpdateStudioJob<T extends keyof StudioJobFunc>(
jobName: T,
studioId: StudioId,
generateData: (existing: Parameters<StudioJobFunc[T]>[0] | null) => Parameters<StudioJobFunc[T]>[0]
): void {
if (!studioId) throw new Meteor.Error(500, 'Missing studioId')
queueManager.mergeOrQueueJob(
getStudioQueueName(studioId),
jobName,
generateData as (existing: unknown | null) => unknown
)
}

/**
* Queue a job for ingest
* @param jobName Job name
Expand Down
10 changes: 5 additions & 5 deletions meteor/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1369,7 +1369,7 @@ __metadata:
dependencies:
"@mos-connection/model": "npm:^5.0.0-alpha.0"
kairos-lib: "npm:^1.0.0"
timeline-state-resolver-types: "npm:10.0.0-nightly-main-20260415-093148-f35c4cadc.0"
timeline-state-resolver-types: "npm:10.0.0-nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0"
tslib: "npm:^2.8.1"
type-fest: "npm:^4.41.0"
languageName: node
Expand Down Expand Up @@ -9779,13 +9779,13 @@ __metadata:
languageName: node
linkType: hard

"timeline-state-resolver-types@npm:10.0.0-nightly-main-20260415-093148-f35c4cadc.0":
version: 10.0.0-nightly-main-20260415-093148-f35c4cadc.0
resolution: "timeline-state-resolver-types@npm:10.0.0-nightly-main-20260415-093148-f35c4cadc.0"
"timeline-state-resolver-types@npm:10.0.0-nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0":
version: 10.0.0-nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0
resolution: "timeline-state-resolver-types@npm:10.0.0-nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0"
dependencies:
kairos-lib: "npm:1.0.0"
tslib: "npm:^2.8.1"
checksum: 10/712578068c41e6ac25c5f305528933f2c5c878230304fed9faa38d2d21d6fc1923bd7ad0f412fe9bf8fd684830ee628e92352ce882740c59ef9d181a98c95b7f
checksum: 10/f470c82d6329f14ebc55adb97eaa7685164819cd2e1e3a8cdc6836c96df47d5ff43e6b4dcefed341df1eeaefe7a71988d869d830cbba347660884c8829bfb865
languageName: node
linkType: hard

Expand Down
Loading
Loading