Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
path: coverage.xml

- name: Add context info to env
if: steps.tests.outcome == 'success'
run: |
sudo apt-get install -y --no-install-recommends libxml-xpath-perl
COVERAGE=`xpath -q -e "floor(/coverage/@line-rate * 100)" coverage.xml`
Expand Down
10 changes: 10 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
Changelog
---------
2.2.25
^^^^^^
- Add drop-frame timecode support to SCCWriter via a new
``drop_frame=False`` parameter. When enabled, timestamps use the
standard 10-minute block method with semicolon separators.
- Refactor SCCWriter timing pipeline: exact token counting,
monotonic frame deduplication, and caption splitting at 80 tokens.
- Extract ``SCC_TOKENS_PER_CAPTION_MAX`` as a named constant.
- CI: guard coverage-parsing step on test success.

2.2.24
^^^^^^
- Fix SCC ingestion error when a doubled italic-off mid-row code
Expand Down
122 changes: 100 additions & 22 deletions pycaption/scc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,7 @@ def _translate_line(self, line):
# Look ahead for the next command, skipping the duplicate
# that SCC uses for error-correction (same word repeated).
next_idx = idx + 1
if (next_idx < len(word_list)
and word_list[next_idx].strip() == word):
if next_idx < len(word_list) and word_list[next_idx].strip() == word:
next_idx += 1
next_command = (
word_list[next_idx] if next_idx < len(word_list) else None
Expand Down Expand Up @@ -496,10 +495,11 @@ def _translate_command(self, word, next_command=None):
edm_time = self.time_translator.get_time()
if self.pop_ons_queue:
self._pop_on(end=edm_time)
if self.buffer_dict.active_key in ("paint", "roll") \
and not self.buffer.is_empty():
self.caption_stash.create_and_store(
self.buffer, self.time, edm_time)
if (
self.buffer_dict.active_key in ("paint", "roll")
and not self.buffer.is_empty()
):
self.caption_stash.create_and_store(self.buffer, self.time, edm_time)
self.buffer = self.node_creator_factory.new_creator()
self.node_creator_factory.position_tracker.reset_for_new_caption()
self.time = edm_time
Expand Down Expand Up @@ -563,9 +563,17 @@ def _pop_on(self, end=0):
self.caption_stash.create_and_store(pop_on_cue.buffer, pop_on_cue.start, end)


SCC_TOKENS_PER_CAPTION_MAX = 80

_SCC_PREFIX = ["94ae", "94ae", "9420", "9420"]
_SCC_SUFFIX = ["942c", "942c", "942f", "942f"]
_SCC_OVERHEAD = len(_SCC_PREFIX) + len(_SCC_SUFFIX)


class SCCWriter(BaseWriter):
def __init__(self, *args, **kw):
def __init__(self, *args, drop_frame=False, **kw):
super().__init__(*args, **kw)
self.drop_frame = drop_frame

def write(self, caption_set):
output = HEADER + "\n\n"
Expand All @@ -589,24 +597,64 @@ def write(self, caption_set):
# Advance start times so as to have time to write to the pop-on
# buffer; possibly remove the previous clear-screen command
for index, (code, start, end) in enumerate(codes):
code_words = len(code) / 5 + 8
code_words = len(code.split()) + _SCC_OVERHEAD
code_time_microseconds = code_words * MICROSECONDS_PER_CODEWORD
code_start = start - code_time_microseconds
if index > 0:
previous_code, previous_start, previous_end = codes[index - 1]
if code_start < previous_start:
code_start = previous_start
if previous_end + 3 * MICROSECONDS_PER_CODEWORD >= code_start:
if code_start < 0:
code_start = 0
if index == 0:
codes[index] = (code, code_start, end)
continue
previous_code, previous_start, previous_end = codes[index - 1]
if code_start <= previous_start + MICROSECONDS_PER_CODEWORD:
prev_words = len(previous_code.split()) + _SCC_OVERHEAD
code_start = max(
code_start,
previous_start + prev_words * MICROSECONDS_PER_CODEWORD,
)
codes[index] = (code, code_start, end)
codes[index - 1] = (previous_code, previous_start, None)
else:
if (
previous_end is not None
and previous_end + 3 * MICROSECONDS_PER_CODEWORD >= code_start
):
codes[index - 1] = (previous_code, previous_start, None)
codes[index] = (code, code_start, end)
codes[index] = (code, code_start, end)

# PASS 3:
# Write captions.
# Deduplicate timestamps — ensure monotonically increasing frame values
last_emitted_frame = -1
for index, (code, start, end) in enumerate(codes):
cur_frame = self._microseconds_to_frame(start)
if cur_frame <= last_emitted_frame:
while self._microseconds_to_frame(start) <= last_emitted_frame:
start += MICROSECONDS_PER_CODEWORD
codes[index] = (code, start, end)
last_emitted_frame = self._microseconds_to_frame(start)

# PASS 4:
# Write captions, splitting if > SCC_TOKENS_PER_CAPTION_MAX tokens.
max_payload = SCC_TOKENS_PER_CAPTION_MAX - _SCC_OVERHEAD
for code, start, end in codes:
output += f"{self._format_timestamp(start)}\t"
output += "94ae 94ae 9420 9420 "
output += code
output += "942c 942c 942f 942f\n\n"
code_tokens = code.split()
if len(code_tokens) + _SCC_OVERHEAD <= SCC_TOKENS_PER_CAPTION_MAX:
output += f"{self._format_timestamp(start)}\t"
output += "94ae 94ae 9420 9420 "
output += code
output += "942c 942c 942f 942f\n\n"
else:
offset = 0
while offset < len(code_tokens):
chunk = code_tokens[offset : offset + max_payload]
line = _SCC_PREFIX + chunk + _SCC_SUFFIX
output += (
f"{self._format_timestamp(start)}\t" + " ".join(line) + "\n\n"
)
offset += max_payload
if offset < len(code_tokens):
start += MICROSECONDS_PER_CODEWORD

if end is not None:
output += f"{self._format_timestamp(end)}\t942c 942c\n\n"

Expand Down Expand Up @@ -668,10 +716,20 @@ def _text_to_code(self, s):
code = self._maybe_align(code)
return code

def _format_timestamp(self, microseconds):
if self.drop_frame:
return self._format_timestamp_df(microseconds)
return self._format_timestamp_ndf(microseconds)

def _microseconds_to_frame(self, microseconds):
if self.drop_frame:
return math.floor(microseconds * 30 / 1_000_000 * 1000 / 1001 + 1e-9)
seconds_float = microseconds / 1_000_000.0 * 1000.0 / 1001.0
return math.floor(seconds_float * 30)

@staticmethod
def _format_timestamp(microseconds):
seconds_float = microseconds / 1000.0 / 1000.0
# Convert to non-drop-frame timecode
def _format_timestamp_ndf(microseconds):
seconds_float = microseconds / 1_000_000.0
seconds_float *= 1000.0 / 1001.0
hours = math.floor(seconds_float / 3600)
seconds_float -= hours * 3600
Expand All @@ -682,6 +740,26 @@ def _format_timestamp(microseconds):
frames = math.floor(seconds_float * 30)
return f"{hours:02}:{minutes:02}:{seconds:02}:{frames:02}"

@staticmethod
def _format_timestamp_df(microseconds):
total_frames = math.floor(microseconds * 30 / 1_000_000 * 1000 / 1001 + 1e-9)
fps = 30
frames_per_10min = 17982 # 10*60*30 - 2*9
d = total_frames // frames_per_10min
m = total_frames % frames_per_10min
if m < 2:
tc_frames = total_frames + 18 * d
else:
tc_frames = total_frames + 18 * d + 2 * ((m - 2) // 1798)

hours = tc_frames // (fps * 60 * 60)
rem = tc_frames % (fps * 60 * 60)
minutes = rem // (fps * 60)
rem = rem % (fps * 60)
seconds = rem // fps
frames = rem % fps
return f"{hours:02}:{minutes:02}:{seconds:02};{frames:02}"


class _SccTimeTranslator:
"""Converts SCC time to microseconds, keeping track of frames passed"""
Expand Down
Loading
Loading