TSR Device Feedback -> Rundown (SOFIE-311)#1731
Conversation
WalkthroughIntroduces an end-to-end external event system enabling peripheral devices to report TSR device events that are processed by blueprints. Adds a Changes
Sequence Diagram(s)sequenceDiagram
participant TSRGateway as TSR Gateway
participant Core as Core Server
participant JobQueue as Job Queue
participant Playout as Playout Engine
participant Blueprint as Blueprint Handler
TSRGateway->>TSRGateway: Device emits stateEvent
TSRGateway->>Core: reportExternalEvents(deviceId, events)
activate Core
Core->>Core: Validate device & token
Core->>JobQueue: QueueOrUpdateStudioJob(OnExternalEvents, studioId, merge events)
activate JobQueue
JobQueue->>JobQueue: Merge with pending job or enqueue new
deactivate JobQueue
deactivate Core
JobQueue->>Playout: Process OnExternalEvents job
activate Playout
Playout->>Playout: Load active playlists & rundowns
Playout->>Playout: Find active part instance
Playout->>Playout: Fetch show-style blueprint
Playout->>Blueprint: onExternalEvent(context, persistentState, events[])
activate Blueprint
Blueprint->>Blueprint: Execute blueprint logic (actions, state changes)
Blueprint->>Playout: Apply execution side effects
deactivate Blueprint
Playout->>Playout: Save persistent state & notifications
deactivate Playout
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 60 minutes.Comment |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (5)
packages/playout-gateway/package.json (1)
58-58: Verify feature-branch nightly resolver is intentional before merging tomain.Line 58 pins a feature-branch nightly (
nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0). The dependency is used across 7 imports in playout-gateway (DevicesRegistry, MediaObject, DeviceOptionsAny, ActionExecutionResult, BaseRemoteDeviceIntegration, TSRDevicesManifestEntry, and AtemMediaPool types) and no conflicting TSR versions exist elsewhere in the repo. All imports use standard public API surface. If this PR lands onmain, plan to switch to anightly-mainor release build once available.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/playout-gateway/package.json` at line 58, The dependency "timeline-state-resolver" is pinned to a feature-branch nightly ("10.0.0-nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0") used by imports DevicesRegistry, MediaObject, DeviceOptionsAny, ActionExecutionResult, BaseRemoteDeviceIntegration, TSRDevicesManifestEntry, and AtemMediaPool; before merging to main either replace that pinned feature-nightly with the intended stable/nightly-main or a release version (or add a clear TODO and justification) in package.json so main doesn't depend on a feature branch—update the version string accordingly and run install/build to confirm no type or API breaks across those seven import sites.packages/job-worker/src/ingest/generationRundown.ts (1)
302-304: Add a focused regression test for this new persisted field.This introduces new rundown persistence behavior; please add an ingest test asserting
externalEventSubscriptionsfrom blueprint output are stored and retrievable.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/job-worker/src/ingest/generationRundown.ts` around lines 302 - 304, Add a regression test that verifies the new persisted field externalEventSubscriptions produced by the blueprint is saved and can be retrieved: in the ingest test suite that exercises generationRundown (where translateUserEditsFromBlueprint(...) and externalEventSubscriptions are passed into the persistence call), create a blueprint output containing a non-empty externalEventSubscriptions array, run the existing ingestion path, then query the persisted rundown and assert the stored rundown.rundown.externalEventSubscriptions strictly equals the blueprint value; ensure the test covers both save and subsequent retrieval to catch regressions.meteor/server/worker/jobQueue.ts (1)
364-402: Add focused tests for merge-vs-enqueue behavior.Please add coverage for:
- merge into matching high-priority tail,
- enqueue when tail is different/null,
- no mutation of ordering across mixed job names.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@meteor/server/worker/jobQueue.ts` around lines 364 - 402, Add unit tests for mergeOrQueueJob covering three scenarios: (1) when the last high-priority job has the same name, verify that mergeOrQueueJob calls generateData with the existing tail.spec.data, mutates tail.spec.data in-place, does not change queue.jobsHighPriority length, and logs the merge (use mergeOrQueueJob and inspect the queue returned by `#getOrCreateQueue` or access the queue object after invoking); (2) when the tail is absent or has a different name, verify a new JobEntry is pushed with id from getRandomString(), name equal to jobName and data from generateData(null), that queue.metricsTotal.inc() is called and `#notifyWorker` is invoked (spy/mock generateData, getRandomString, metricsTotal.inc, and `#notifyWorker`); (3) when multiple job names exist verify ordering is preserved (calls that merge should only affect the very last entry and must not reorder earlier entries) by setting up a queue.jobsHighPriority array with mixed named entries, calling mergeOrQueueJob and asserting only the tail changed or a new entry appended. Use spies/mocks for generateData and minimal assertions on queue.jobsHighPriority length, element identities, and tail.spec.data to ensure no unintended reordering or replacement.meteor/server/worker/worker.ts (1)
254-265: Align startup-test guard withQueueStudioJob.
QueueStudioJobblocks during startup tests (isInTestWrite()), but this new path does not. Keeping the same guard avoids accidental writes in test bootstrap flows.Suggested fix
export function QueueOrUpdateStudioJob<T extends keyof StudioJobFunc>( jobName: T, studioId: StudioId, generateData: (existing: Parameters<StudioJobFunc[T]>[0] | null) => Parameters<StudioJobFunc[T]>[0] ): void { + if (isInTestWrite()) throw new Meteor.Error(404, 'Should not be reachable during startup tests') if (!studioId) throw new Meteor.Error(500, 'Missing studioId') queueManager.mergeOrQueueJob(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@meteor/server/worker/worker.ts` around lines 254 - 265, Add the same startup-test guard used in QueueStudioJob to QueueOrUpdateStudioJob: at the top of QueueOrUpdateStudioJob check isInTestWrite() and return early (no-op) when true to avoid test-time writes; ensure the guard runs before any throws or calls to queueManager.mergeOrQueueJob and keep the existing studioId null check and mergeOrQueueJob call intact (function names: QueueOrUpdateStudioJob, QueueStudioJob, isInTestWrite, queueManager.mergeOrQueueJob).packages/job-worker/src/playout/externalEvents.ts (1)
109-113: Documented type cast acknowledged; consider runtime guard.The comment explains that
PeripheralDeviceExternalTSREventuses a loosedeviceType: stringto accommodate custom TSR plugins, whileBlueprintExternalTSREventuses the strongly-typed TSR enum. The cast throughunknownis intentional but bypasses compile-time type checking.If a future event source (non-TSR) is added, this blanket cast could silently allow mismatched shapes into the blueprint. Consider adding a lightweight runtime assertion (e.g., verifying
event.type === 'tsr') to fail fast on unexpected event types.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@packages/job-worker/src/playout/externalEvents.ts` around lines 109 - 113, The blanket cast from wireEvents to BlueprintExternalEvent[] (wireEvents -> blueprintEvents) can hide mismatched shapes; add a lightweight runtime guard before or during the cast that asserts each event is a TSR event (e.g., verify event.type === 'tsr' for all entries in wireEvents) and either filter out non-TSR events or throw a clear error; update the code around the blueprintEvents assignment (the wireEvents variable and the resulting blueprintEvents) to perform this check and fail fast if an unexpected event type is encountered, while keeping the documented cast for deviceType handling (PeripheralDeviceExternalTSREvent -> BlueprintExternalEvent).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@meteor/server/api/peripheralDevice.ts`:
- Around line 316-337: reportExternalEvents currently appends unbounded incoming
events into the pending StudioJobs.OnExternalEvents job, which can grow without
limit under bursts; update the reportExternalEvents handler to validate each
incoming PeripheralDeviceExternalEvent (type/required fields) and enforce a hard
maximum batch size before merging (e.g. define a MAX_EVENTS_PER_JOB constant),
then when calling QueueOrUpdateStudioJob merge sanitized events and truncate the
resulting array to that maximum (choose a policy: drop oldest or newest) so the
stored job data cannot grow unbounded; reference the reportExternalEvents
function, StudioJobs.OnExternalEvents and QueueOrUpdateStudioJob when making the
change.
In `@meteor/server/publications/externalEventSubscriptions.ts`:
- Around line 163-166: Add runtime validation for the `type` parameter in the
same function that currently does `check(deviceId, String)` (in
meteor/server/publications/externalEventSubscriptions.ts) by calling
`check(type, String)` before using `type` to filter subscription documents and
construct the observer key (e.g., where `peripheralDevice` is obtained via
`checkAccessAndGetPeripheralDevice`); this ensures `type` is a string and
prevents invalid values from affecting the publication observer behavior.
In `@packages/job-worker/src/playout/externalEvents.ts`:
- Around line 118-124: The category string passed into
storeNotificationsForCategory currently uses getRandomId()
("externalEvent:${getRandomId()}"), causing a new category on every call and
unbounded notification growth; change the category to a deterministic identifier
(for example "externalEvent:${playlist.playlistId}" or
"externalEvent:${playlist.rundownId}" or include blueprint.blueprintId) so
repeated external-event calls overwrite the same category. Locate the call to
storeNotificationsForCategory in externalEvents.ts and replace the getRandomId()
suffix with a stable field from the playlist or rundown (using
playlist.playlistId, playlist.rundownId, or blueprint.blueprintId) so
notifications are replaced instead of accumulated.
In `@packages/playout-gateway/src/tsrHandler.ts`:
- Around line 873-910: The _updateEventSubscriptions method only runs on DB
collection changes, so add listeners to the TSR connection manager to refresh
subscriptions whenever connection topology changes: in the class initialization
(where tsr is available) subscribe to the connection manager's
connection-added/connection-removed/connection-updated (or equivalent) events
and call this._updateEventSubscriptions() when those fire (also handle cases
where a connection's deviceId changes); ensure you remove those listeners on
teardown. Reference the existing _updateEventSubscriptions method and
tsr.connectionManager.getConnections()/container.device to locate where to wire
the event handlers.
---
Nitpick comments:
In `@meteor/server/worker/jobQueue.ts`:
- Around line 364-402: Add unit tests for mergeOrQueueJob covering three
scenarios: (1) when the last high-priority job has the same name, verify that
mergeOrQueueJob calls generateData with the existing tail.spec.data, mutates
tail.spec.data in-place, does not change queue.jobsHighPriority length, and logs
the merge (use mergeOrQueueJob and inspect the queue returned by
`#getOrCreateQueue` or access the queue object after invoking); (2) when the tail
is absent or has a different name, verify a new JobEntry is pushed with id from
getRandomString(), name equal to jobName and data from generateData(null), that
queue.metricsTotal.inc() is called and `#notifyWorker` is invoked (spy/mock
generateData, getRandomString, metricsTotal.inc, and `#notifyWorker`); (3) when
multiple job names exist verify ordering is preserved (calls that merge should
only affect the very last entry and must not reorder earlier entries) by setting
up a queue.jobsHighPriority array with mixed named entries, calling
mergeOrQueueJob and asserting only the tail changed or a new entry appended. Use
spies/mocks for generateData and minimal assertions on queue.jobsHighPriority
length, element identities, and tail.spec.data to ensure no unintended
reordering or replacement.
In `@meteor/server/worker/worker.ts`:
- Around line 254-265: Add the same startup-test guard used in QueueStudioJob to
QueueOrUpdateStudioJob: at the top of QueueOrUpdateStudioJob check
isInTestWrite() and return early (no-op) when true to avoid test-time writes;
ensure the guard runs before any throws or calls to queueManager.mergeOrQueueJob
and keep the existing studioId null check and mergeOrQueueJob call intact
(function names: QueueOrUpdateStudioJob, QueueStudioJob, isInTestWrite,
queueManager.mergeOrQueueJob).
In `@packages/job-worker/src/ingest/generationRundown.ts`:
- Around line 302-304: Add a regression test that verifies the new persisted
field externalEventSubscriptions produced by the blueprint is saved and can be
retrieved: in the ingest test suite that exercises generationRundown (where
translateUserEditsFromBlueprint(...) and externalEventSubscriptions are passed
into the persistence call), create a blueprint output containing a non-empty
externalEventSubscriptions array, run the existing ingestion path, then query
the persisted rundown and assert the stored
rundown.rundown.externalEventSubscriptions strictly equals the blueprint value;
ensure the test covers both save and subsequent retrieval to catch regressions.
In `@packages/job-worker/src/playout/externalEvents.ts`:
- Around line 109-113: The blanket cast from wireEvents to
BlueprintExternalEvent[] (wireEvents -> blueprintEvents) can hide mismatched
shapes; add a lightweight runtime guard before or during the cast that asserts
each event is a TSR event (e.g., verify event.type === 'tsr' for all entries in
wireEvents) and either filter out non-TSR events or throw a clear error; update
the code around the blueprintEvents assignment (the wireEvents variable and the
resulting blueprintEvents) to perform this check and fail fast if an unexpected
event type is encountered, while keeping the documented cast for deviceType
handling (PeripheralDeviceExternalTSREvent -> BlueprintExternalEvent).
In `@packages/playout-gateway/package.json`:
- Line 58: The dependency "timeline-state-resolver" is pinned to a
feature-branch nightly
("10.0.0-nightly-feat-tsr-device-feedback-20260429-145317-4079605bd.0") used by
imports DevicesRegistry, MediaObject, DeviceOptionsAny, ActionExecutionResult,
BaseRemoteDeviceIntegration, TSRDevicesManifestEntry, and AtemMediaPool; before
merging to main either replace that pinned feature-nightly with the intended
stable/nightly-main or a release version (or add a clear TODO and justification)
in package.json so main doesn't depend on a feature branch—update the version
string accordingly and run install/build to confirm no type or API breaks across
those seven import sites.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6b94813a-bfd3-48dc-b4e5-06cef5485324
⛔ Files ignored due to path filters (2)
meteor/yarn.lockis excluded by!**/yarn.lock,!**/*.lockpackages/yarn.lockis excluded by!**/yarn.lock,!**/*.lock
📒 Files selected for processing (28)
meteor/server/api/peripheralDevice.tsmeteor/server/publications/_publications.tsmeteor/server/publications/externalEventSubscriptions.tsmeteor/server/publications/rundown.tsmeteor/server/worker/jobQueue.tsmeteor/server/worker/worker.tspackages/blueprints-integration/src/api/showStyle.tspackages/blueprints-integration/src/context/adlibActionContext.tspackages/blueprints-integration/src/context/externalEventContext.tspackages/blueprints-integration/src/context/index.tspackages/blueprints-integration/src/context/playoutActionContext.tspackages/blueprints-integration/src/externalEvent.tspackages/blueprints-integration/src/index.tspackages/corelib/src/dataModel/Rundown.tspackages/corelib/src/worker/studio.tspackages/job-worker/src/ingest/generationRundown.tspackages/job-worker/src/ingest/model/IngestModel.tspackages/job-worker/src/ingest/model/implementation/IngestModelImpl.tspackages/job-worker/src/playout/adlibAction.tspackages/job-worker/src/playout/externalEvents.tspackages/job-worker/src/workers/studio/jobs.tspackages/playout-gateway/package.jsonpackages/playout-gateway/src/coreHandler.tspackages/playout-gateway/src/tsrHandler.tspackages/shared-lib/package.jsonpackages/shared-lib/src/peripheralDevice/externalEvents.tspackages/shared-lib/src/peripheralDevice/methodsAPI.tspackages/shared-lib/src/pubsub/peripheralDevice.ts
| export async function reportExternalEvents( | ||
| context: MethodContext, | ||
| deviceId: PeripheralDeviceId, | ||
| token: string, | ||
| events: PeripheralDeviceExternalEvent[] | ||
| ): Promise<void> { | ||
| const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, context) | ||
|
|
||
| if (!peripheralDevice.studioAndConfigId) | ||
| throw new Error(`PeripheralDevice "${peripheralDevice._id}" sent reportExternalEvents, but has no studioId`) | ||
|
|
||
| if (!events.length) return | ||
|
|
||
| const studioId = peripheralDevice.studioAndConfigId.studioId | ||
|
|
||
| // Merge events into the last pending OnExternalEvents job in the queue, or enqueue a new one. | ||
| // This prevents queue flooding when many events arrive in a burst, or when multiple gateways | ||
| // report events for the same studio simultaneously. | ||
| QueueOrUpdateStudioJob(StudioJobs.OnExternalEvents, studioId, (existing) => ({ | ||
| events: [...(existing?.events ?? []), ...events], | ||
| })) | ||
| } |
There was a problem hiding this comment.
Bound and validate incoming event batches before merge.
Right now every call blindly appends to pending job data. Under burst traffic (or a stalled worker), this can grow unbounded and cause memory pressure.
Suggested fix
export async function reportExternalEvents(
context: MethodContext,
deviceId: PeripheralDeviceId,
token: string,
events: PeripheralDeviceExternalEvent[]
): Promise<void> {
const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, context)
+ check(events, Array)
if (!peripheralDevice.studioAndConfigId)
throw new Error(`PeripheralDevice "${peripheralDevice._id}" sent reportExternalEvents, but has no studioId`)
if (!events.length) return
const studioId = peripheralDevice.studioAndConfigId.studioId
+ const MAX_PENDING_EXTERNAL_EVENTS = 1000
// Merge events into the last pending OnExternalEvents job in the queue, or enqueue a new one.
// This prevents queue flooding when many events arrive in a burst, or when multiple gateways
// report events for the same studio simultaneously.
QueueOrUpdateStudioJob(StudioJobs.OnExternalEvents, studioId, (existing) => ({
- events: [...(existing?.events ?? []), ...events],
+ events: [...(existing?.events ?? []), ...events].slice(-MAX_PENDING_EXTERNAL_EVENTS),
}))
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@meteor/server/api/peripheralDevice.ts` around lines 316 - 337,
reportExternalEvents currently appends unbounded incoming events into the
pending StudioJobs.OnExternalEvents job, which can grow without limit under
bursts; update the reportExternalEvents handler to validate each incoming
PeripheralDeviceExternalEvent (type/required fields) and enforce a hard maximum
batch size before merging (e.g. define a MAX_EVENTS_PER_JOB constant), then when
calling QueueOrUpdateStudioJob merge sanitized events and truncate the resulting
array to that maximum (choose a policy: drop oldest or newest) so the stored job
data cannot grow unbounded; reference the reportExternalEvents function,
StudioJobs.OnExternalEvents and QueueOrUpdateStudioJob when making the change.
| ) { | ||
| check(deviceId, String) | ||
|
|
||
| const peripheralDevice = await checkAccessAndGetPeripheralDevice(deviceId, token, this) |
There was a problem hiding this comment.
Missing input validation for type parameter.
The deviceId is validated with check(deviceId, String), but the type parameter is passed directly without validation. Since type is used to filter subscription documents and forms part of the observer key, an invalid or unexpected value could cause subtle misbehavior.
Add a check() call for the type parameter to ensure runtime safety.
🛡️ Proposed fix to add type validation
async function (
pub: CustomPublish<ExternalEventSubscriptionDocument>,
type: PeripheralDeviceExternalEvent['type'],
deviceId: PeripheralDeviceId,
token: string | undefined
) {
check(deviceId, String)
+ check(type, String)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@meteor/server/publications/externalEventSubscriptions.ts` around lines 163 -
166, Add runtime validation for the `type` parameter in the same function that
currently does `check(deviceId, String)` (in
meteor/server/publications/externalEventSubscriptions.ts) by calling
`check(type, String)` before using `type` to filter subscription documents and
construct the observer key (e.g., where `peripheralDevice` is obtained via
`checkAccessAndGetPeripheralDevice`); this ensures `type` is a string and
prevents invalid values from affecting the publication observer behavior.
| storeNotificationsForCategory( | ||
| playoutModel, | ||
| `externalEvent:${getRandomId()}`, | ||
| blueprint.blueprintId, | ||
| actionContext.notes, | ||
| playlist.currentPartInfo ?? playlist.nextPartInfo | ||
| ) |
There was a problem hiding this comment.
Random notification category may cause unbounded notification accumulation.
Using getRandomId() in the notification category key (externalEvent:${getRandomId()}) means every invocation creates a unique category. Consequently, notifications from previous external event batches will never be replaced—they'll accumulate indefinitely. If external events fire frequently, this could flood the notification store.
Consider using a deterministic category tied to the playlist or rundown (or a stable prefix without random suffix) so that repeated invocations overwrite stale notifications.
🐛 Suggested fix for deterministic category
storeNotificationsForCategory(
playoutModel,
- `externalEvent:${getRandomId()}`,
+ `externalEvent:${playlist._id}`,
blueprint.blueprintId,
actionContext.notes,
playlist.currentPartInfo ?? playlist.nextPartInfo
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| storeNotificationsForCategory( | |
| playoutModel, | |
| `externalEvent:${getRandomId()}`, | |
| blueprint.blueprintId, | |
| actionContext.notes, | |
| playlist.currentPartInfo ?? playlist.nextPartInfo | |
| ) | |
| storeNotificationsForCategory( | |
| playoutModel, | |
| `externalEvent:${playlist._id}`, | |
| blueprint.blueprintId, | |
| actionContext.notes, | |
| playlist.currentPartInfo ?? playlist.nextPartInfo | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/job-worker/src/playout/externalEvents.ts` around lines 118 - 124,
The category string passed into storeNotificationsForCategory currently uses
getRandomId() ("externalEvent:${getRandomId()}"), causing a new category on
every call and unbounded notification growth; change the category to a
deterministic identifier (for example "externalEvent:${playlist.playlistId}" or
"externalEvent:${playlist.rundownId}" or include blueprint.blueprintId) so
repeated external-event calls overwrite the same category. Locate the call to
storeNotificationsForCategory in externalEvents.ts and replace the getRandomId()
suffix with a stable field from the playlist or rundown (using
playlist.playlistId, playlist.rundownId, or blueprint.blueprintId) so
notifications are replaced instead of accumulated.
| private async _updateEventSubscriptions() { | ||
| const subscriptionDocs = this._coreHandler.core | ||
| .getCollection(PeripheralDevicePubSubCollectionsNames.externalEventSubscriptions) | ||
| .find({}) | ||
|
|
||
| this.logger.debug(`_updateEventSubscriptions: ${subscriptionDocs.length} subscription doc(s) in collection`) | ||
|
|
||
| // Aggregate subscriptions, group by deviceId | ||
| const subscriptionsByDeviceId = new Map<string, Set<string>>() | ||
| for (const sub of subscriptionDocs) { | ||
| if (sub.type !== 'tsr') continue | ||
|
|
||
| let events = subscriptionsByDeviceId.get(sub.deviceId) | ||
| if (!events) { | ||
| events = new Set() | ||
| subscriptionsByDeviceId.set(sub.deviceId, events) | ||
| } | ||
| events.add(sub.event) | ||
| } | ||
|
|
||
| if (subscriptionsByDeviceId.size === 0) { | ||
| this.logger.debug('_updateEventSubscriptions: no subscriptions — clearing all devices') | ||
| } | ||
|
|
||
| await Promise.allSettled( | ||
| _.map(this.tsr.connectionManager.getConnections(), async (container) => { | ||
| const events = subscriptionsByDeviceId.get(container.deviceId) ?? new Set<string>() | ||
| this.logger.debug( | ||
| `_updateEventSubscriptions: setting ${events.size} event subscription(s) on device "${container.deviceId}": [${Array.from(events).join(', ')}]` | ||
| ) | ||
| await container.device.setEventSubscriptions(Array.from(events)).catch((e) => { | ||
| this.logger.error( | ||
| `Error setting event subscriptions for device "${container.deviceId}": ${stringifyError(e)}` | ||
| ) | ||
| }) | ||
| }) | ||
| ) | ||
| } |
There was a problem hiding this comment.
Refresh device subscriptions when connection topology changes.
_updateEventSubscriptions() only runs on collection mutations. If a TSR connection is added/updated after observer callbacks have fired, that device can keep stale/empty subscriptions until the next DB change.
Suggested fix
diff --git a/packages/playout-gateway/src/tsrHandler.ts b/packages/playout-gateway/src/tsrHandler.ts
@@
this.tsr.connectionManager.on('connectionAdded', (id, container) => {
@@
this._triggerupdateExpectedPlayoutItems() // So that any recently created devices will get all the ExpectedPlayoutItems
+ this._triggerUpdateEventSubscriptions() // Ensure new device gets current external event subscriptions
})
@@
private async _updateDevices(): Promise<void> {
@@
this.tsr.connectionManager.setConnections(connections)
+ this._triggerUpdateEventSubscriptions() // Re-apply subscriptions after connection set changes
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/playout-gateway/src/tsrHandler.ts` around lines 873 - 910, The
_updateEventSubscriptions method only runs on DB collection changes, so add
listeners to the TSR connection manager to refresh subscriptions whenever
connection topology changes: in the class initialization (where tsr is
available) subscribe to the connection manager's
connection-added/connection-removed/connection-updated (or equivalent) events
and call this._updateEventSubscriptions() when those fire (also handle cases
where a connection's deviceId changes); ensure you remove those listeners on
teardown. Reference the existing _updateEventSubscriptions method and
tsr.connectionManager.getConnections()/container.device to locate where to wire
the event handlers.
About the Contributor
This pull request is posted on behalf of the BBC
Type of Contribution
This is a: Feature
New Behavior
RFC: #1670
Allow TSR devices to report events when external state changes occur. This triggers a new blueprints method to allow for custom handling of these state changes.
Events are batched to avoid flooding the bluerpints/job-worker with a large backlog of events to handle
Testing
Affected areas
Time Frame
Other Information
Status