diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 41e51bbe..5819eabc 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -44,6 +44,7 @@ jobs: path: coverage.xml - name: Add context info to env + if: steps.tests.outcome == 'success' run: | sudo apt-get install -y --no-install-recommends libxml-xpath-perl COVERAGE=`xpath -q -e "floor(/coverage/@line-rate * 100)" coverage.xml` diff --git a/docs/changelog.rst b/docs/changelog.rst index a3ce2372..11e2f9d9 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,5 +1,15 @@ Changelog --------- +2.2.25 +^^^^^^ + - Add drop-frame timecode support to SCCWriter via a new + ``drop_frame=False`` parameter. When enabled, timestamps use the + standard 10-minute block method with semicolon separators. + - Refactor SCCWriter timing pipeline: exact token counting, + monotonic frame deduplication, and caption splitting at 80 tokens. + - Extract ``SCC_TOKENS_PER_CAPTION_MAX`` as a named constant. + - CI: guard coverage-parsing step on test success. + 2.2.24 ^^^^^^ - Fix SCC ingestion error when a doubled italic-off mid-row code diff --git a/pycaption/scc/__init__.py b/pycaption/scc/__init__.py index fcd0d458..8947eeda 100644 --- a/pycaption/scc/__init__.py +++ b/pycaption/scc/__init__.py @@ -335,8 +335,7 @@ def _translate_line(self, line): # Look ahead for the next command, skipping the duplicate # that SCC uses for error-correction (same word repeated). next_idx = idx + 1 - if (next_idx < len(word_list) - and word_list[next_idx].strip() == word): + if next_idx < len(word_list) and word_list[next_idx].strip() == word: next_idx += 1 next_command = ( word_list[next_idx] if next_idx < len(word_list) else None @@ -496,10 +495,11 @@ def _translate_command(self, word, next_command=None): edm_time = self.time_translator.get_time() if self.pop_ons_queue: self._pop_on(end=edm_time) - if self.buffer_dict.active_key in ("paint", "roll") \ - and not self.buffer.is_empty(): - self.caption_stash.create_and_store( - self.buffer, self.time, edm_time) + if ( + self.buffer_dict.active_key in ("paint", "roll") + and not self.buffer.is_empty() + ): + self.caption_stash.create_and_store(self.buffer, self.time, edm_time) self.buffer = self.node_creator_factory.new_creator() self.node_creator_factory.position_tracker.reset_for_new_caption() self.time = edm_time @@ -563,9 +563,17 @@ def _pop_on(self, end=0): self.caption_stash.create_and_store(pop_on_cue.buffer, pop_on_cue.start, end) +SCC_TOKENS_PER_CAPTION_MAX = 80 + +_SCC_PREFIX = ["94ae", "94ae", "9420", "9420"] +_SCC_SUFFIX = ["942c", "942c", "942f", "942f"] +_SCC_OVERHEAD = len(_SCC_PREFIX) + len(_SCC_SUFFIX) + + class SCCWriter(BaseWriter): - def __init__(self, *args, **kw): + def __init__(self, *args, drop_frame=False, **kw): super().__init__(*args, **kw) + self.drop_frame = drop_frame def write(self, caption_set): output = HEADER + "\n\n" @@ -589,24 +597,64 @@ def write(self, caption_set): # Advance start times so as to have time to write to the pop-on # buffer; possibly remove the previous clear-screen command for index, (code, start, end) in enumerate(codes): - code_words = len(code) / 5 + 8 + code_words = len(code.split()) + _SCC_OVERHEAD code_time_microseconds = code_words * MICROSECONDS_PER_CODEWORD code_start = start - code_time_microseconds - if index > 0: - previous_code, previous_start, previous_end = codes[index - 1] - if code_start < previous_start: - code_start = previous_start - if previous_end + 3 * MICROSECONDS_PER_CODEWORD >= code_start: + if code_start < 0: + code_start = 0 + if index == 0: + codes[index] = (code, code_start, end) + continue + previous_code, previous_start, previous_end = codes[index - 1] + if code_start <= previous_start + MICROSECONDS_PER_CODEWORD: + prev_words = len(previous_code.split()) + _SCC_OVERHEAD + code_start = max( + code_start, + previous_start + prev_words * MICROSECONDS_PER_CODEWORD, + ) + codes[index] = (code, code_start, end) + codes[index - 1] = (previous_code, previous_start, None) + else: + if ( + previous_end is not None + and previous_end + 3 * MICROSECONDS_PER_CODEWORD >= code_start + ): codes[index - 1] = (previous_code, previous_start, None) - codes[index] = (code, code_start, end) + codes[index] = (code, code_start, end) # PASS 3: - # Write captions. + # Deduplicate timestamps — ensure monotonically increasing frame values + last_emitted_frame = -1 + for index, (code, start, end) in enumerate(codes): + cur_frame = self._microseconds_to_frame(start) + if cur_frame <= last_emitted_frame: + while self._microseconds_to_frame(start) <= last_emitted_frame: + start += MICROSECONDS_PER_CODEWORD + codes[index] = (code, start, end) + last_emitted_frame = self._microseconds_to_frame(start) + + # PASS 4: + # Write captions, splitting if > SCC_TOKENS_PER_CAPTION_MAX tokens. + max_payload = SCC_TOKENS_PER_CAPTION_MAX - _SCC_OVERHEAD for code, start, end in codes: - output += f"{self._format_timestamp(start)}\t" - output += "94ae 94ae 9420 9420 " - output += code - output += "942c 942c 942f 942f\n\n" + code_tokens = code.split() + if len(code_tokens) + _SCC_OVERHEAD <= SCC_TOKENS_PER_CAPTION_MAX: + output += f"{self._format_timestamp(start)}\t" + output += "94ae 94ae 9420 9420 " + output += code + output += "942c 942c 942f 942f\n\n" + else: + offset = 0 + while offset < len(code_tokens): + chunk = code_tokens[offset : offset + max_payload] + line = _SCC_PREFIX + chunk + _SCC_SUFFIX + output += ( + f"{self._format_timestamp(start)}\t" + " ".join(line) + "\n\n" + ) + offset += max_payload + if offset < len(code_tokens): + start += MICROSECONDS_PER_CODEWORD + if end is not None: output += f"{self._format_timestamp(end)}\t942c 942c\n\n" @@ -668,10 +716,20 @@ def _text_to_code(self, s): code = self._maybe_align(code) return code + def _format_timestamp(self, microseconds): + if self.drop_frame: + return self._format_timestamp_df(microseconds) + return self._format_timestamp_ndf(microseconds) + + def _microseconds_to_frame(self, microseconds): + if self.drop_frame: + return math.floor(microseconds * 30 / 1_000_000 * 1000 / 1001 + 1e-9) + seconds_float = microseconds / 1_000_000.0 * 1000.0 / 1001.0 + return math.floor(seconds_float * 30) + @staticmethod - def _format_timestamp(microseconds): - seconds_float = microseconds / 1000.0 / 1000.0 - # Convert to non-drop-frame timecode + def _format_timestamp_ndf(microseconds): + seconds_float = microseconds / 1_000_000.0 seconds_float *= 1000.0 / 1001.0 hours = math.floor(seconds_float / 3600) seconds_float -= hours * 3600 @@ -682,6 +740,26 @@ def _format_timestamp(microseconds): frames = math.floor(seconds_float * 30) return f"{hours:02}:{minutes:02}:{seconds:02}:{frames:02}" + @staticmethod + def _format_timestamp_df(microseconds): + total_frames = math.floor(microseconds * 30 / 1_000_000 * 1000 / 1001 + 1e-9) + fps = 30 + frames_per_10min = 17982 # 10*60*30 - 2*9 + d = total_frames // frames_per_10min + m = total_frames % frames_per_10min + if m < 2: + tc_frames = total_frames + 18 * d + else: + tc_frames = total_frames + 18 * d + 2 * ((m - 2) // 1798) + + hours = tc_frames // (fps * 60 * 60) + rem = tc_frames % (fps * 60 * 60) + minutes = rem // (fps * 60) + rem = rem % (fps * 60) + seconds = rem // fps + frames = rem % fps + return f"{hours:02}:{minutes:02}:{seconds:02};{frames:02}" + class _SccTimeTranslator: """Converts SCC time to microseconds, keeping track of frames passed""" diff --git a/tests/test_scc_writer.py b/tests/test_scc_writer.py new file mode 100644 index 00000000..0919aab3 --- /dev/null +++ b/tests/test_scc_writer.py @@ -0,0 +1,255 @@ +import re + + +from pycaption import SCCReader, SCCWriter, SRTReader, WebVTTReader + + +class TestSCCWriterTimestampFormatting: + def test_ndf_timestamp_uses_colons(self): + assert SCCWriter._format_timestamp_ndf(0) == "00:00:00:00" + assert ";" not in SCCWriter._format_timestamp_ndf(1_000_000) + + def test_ndf_known_values(self): + assert SCCWriter._format_timestamp_ndf(0) == "00:00:00:00" + # 1 second of real time at 29.97fps NDF + # 1_000_000 us * 1000/1001 = 999.000999 coded seconds → 29 frames + assert SCCWriter._format_timestamp_ndf(1_000_000) == "00:00:00:29" + # 60 seconds real → 59.94 coded seconds → 59s + 28 frames + assert SCCWriter._format_timestamp_ndf(60_000_000) == "00:00:59:28" + + def test_df_timestamp_uses_semicolons(self): + result = SCCWriter._format_timestamp_df(1_000_000) + assert ";" in result + + def test_df_known_values(self): + assert SCCWriter._format_timestamp_df(0) == "00:00:00;00" + # 10 minutes of real time should map to exactly 10:00;00 in DF + ten_minutes_us = 10 * 60 * 1_000_000 + assert SCCWriter._format_timestamp_df(ten_minutes_us) == "00:10:00;00" + + def test_df_frame_skip_at_minute_boundary(self): + # Real frame 1800 is the first frame of minute 1 in DF. + # Minute 0 has 1800 frames (0..1799), no drops. + # Minute 1 starts at real frame 1800, displayed as 00:01:00;02 + # (frames ;00 and ;01 are dropped at non-10th minutes). + us_for_frame_1800 = round(1800 * 1001 / 1000 / 30 * 1_000_000) + result = SCCWriter._format_timestamp_df(us_for_frame_1800) + assert result == "00:01:00;02" + + def test_df_no_frame_skip_at_10_minute_boundary(self): + # At 10-minute boundaries, no frames are dropped + twenty_minutes_us = 20 * 60 * 1_000_000 + result = SCCWriter._format_timestamp_df(twenty_minutes_us) + assert result == "00:20:00;00" + + +class TestSCCWriterDropFrameFlag: + def _make_simple_captions(self): + srt = ( + "1\n" + "00:00:01,000 --> 00:00:03,000\n" + "Hello world\n\n" + "2\n" + "00:00:05,000 --> 00:00:07,000\n" + "Second line\n" + ) + return SRTReader().read(srt) + + def test_drop_frame_false_is_default(self): + writer = SCCWriter() + assert writer.drop_frame is False + + def test_drop_frame_false_uses_colons(self): + captions = self._make_simple_captions() + output = SCCWriter(drop_frame=False).write(captions) + timestamps = re.findall(r"\d{2}:\d{2}:\d{2}[:;]\d{2}", output) + for ts in timestamps: + assert ";" not in ts + + def test_drop_frame_true_uses_semicolons(self): + captions = self._make_simple_captions() + output = SCCWriter(drop_frame=True).write(captions) + timestamps = re.findall(r"\d{2}:\d{2}:\d{2}[:;]\d{2}", output) + assert len(timestamps) > 0 + for ts in timestamps: + assert ";" in ts + + def test_drop_frame_false_preserves_existing_behavior(self): + captions = self._make_simple_captions() + output_default = SCCWriter().write(captions) + output_explicit = SCCWriter(drop_frame=False).write(captions) + assert output_default == output_explicit + + +class TestSCCWriterTimestampOrdering: + def test_timestamps_monotonically_increasing_ndf(self): + vtt_input = ( + "WEBVTT\n\n" + "0\n00:00:01.529 --> 00:00:03.640\n" + "When it comes to finding the one,\n\n" + "1\n00:00:03.730 --> 00:00:07.239\n" + "I always say that if they can love you at your messiest,\n\n" + "2\n00:00:07.570 --> 00:00:09.319\n" + "calm you at your moodiest,\n\n" + "3\n00:00:09.529 --> 00:00:11.680\n" + "and laugh with you at your quirkiest,\n\n" + "4\n00:00:11.930 --> 00:00:13.760\n" + "you've probably found your person.\n" + ) + captions = WebVTTReader().read(vtt_input) + output = SCCWriter(drop_frame=False).write(captions) + timestamps = re.findall(r"(\d{2}:\d{2}:\d{2}:\d{2})", output) + for i in range(1, len(timestamps)): + assert ( + timestamps[i] >= timestamps[i - 1] + ), f"NDF timestamps out of order: {timestamps[i - 1]} > {timestamps[i]}" + + def test_timestamps_monotonically_increasing_df(self): + vtt_input = ( + "WEBVTT\n\n" + "0\n00:00:01.529 --> 00:00:03.640\n" + "When it comes to finding the one,\n\n" + "1\n00:00:03.730 --> 00:00:07.239\n" + "I always say that if they can love you at your messiest,\n\n" + "2\n00:00:07.570 --> 00:00:09.319\n" + "calm you at your moodiest,\n\n" + "3\n00:00:09.529 --> 00:00:11.680\n" + "and laugh with you at your quirkiest,\n\n" + "4\n00:00:11.930 --> 00:00:13.760\n" + "you've probably found your person.\n" + ) + captions = WebVTTReader().read(vtt_input) + output = SCCWriter(drop_frame=True).write(captions) + timestamps = re.findall(r"(\d{2}:\d{2}:\d{2};\d{2})", output) + for i in range(1, len(timestamps)): + assert ( + timestamps[i] >= timestamps[i - 1] + ), f"DF timestamps out of order: {timestamps[i - 1]} > {timestamps[i]}" + + def test_rapid_short_captions_stay_ordered(self): + """Short text followed by long text should not cause timestamp inversion.""" + vtt_input = ( + "WEBVTT\n\n" + "0\n00:00:02.200 --> 00:00:02.359\nyou know,\n\n" + "1\n00:00:02.400 --> 00:00:03.760\n" + "the way he kind of looked at me.\n\n" + "2\n00:00:04.700 --> 00:00:05.169\nAnd I said,\n\n" + "3\n00:00:05.210 --> 00:00:05.520\noh\n" + ) + captions = WebVTTReader().read(vtt_input) + for df in (True, False): + output = SCCWriter(drop_frame=df).write(captions) + sep = ";" if df else ":" + pattern = r"\d{2}:\d{2}:\d{2}" + re.escape(sep) + r"\d{2}" + timestamps = re.findall(pattern, output) + for i in range(1, len(timestamps)): + assert ( + timestamps[i] >= timestamps[i - 1] + ), f"drop_frame={df}: {timestamps[i - 1]} > {timestamps[i]}" + + +class TestSCCWriterFirstCueBackshift: + def test_first_cue_start_is_shifted_back(self): + srt = "1\n" "00:00:10,000 --> 00:00:12,000\n" "Hello world\n" + captions = SRTReader().read(srt) + output = SCCWriter(drop_frame=False).write(captions) + timestamps = re.findall(r"(\d{2}:\d{2}:\d{2}:\d{2})", output) + # The first timestamp should be earlier than 00:00:09:29 + # (10s real time -> ~9:29 NDF, minus backshift) + assert timestamps[0] < "00:00:09:29" + + def test_first_cue_at_zero_does_not_go_negative(self): + srt = "1\n" "00:00:00,100 --> 00:00:02,000\n" "Hello\n" + captions = SRTReader().read(srt) + output = SCCWriter(drop_frame=False).write(captions) + timestamps = re.findall(r"(\d{2}:\d{2}:\d{2}:\d{2})", output) + assert timestamps[0] == "00:00:00:00" + + +class TestSCCWriterOverlappingCues: + def test_overlapping_cues_suppress_clear_screen(self): + """When cues are very close together, the clear-screen (942c) for + the previous cue should be suppressed.""" + srt = ( + "1\n00:00:01,000 --> 00:00:01,900\nFirst\n\n" + "2\n00:00:02,000 --> 00:00:03,000\nSecond\n" + ) + captions = SRTReader().read(srt) + output = SCCWriter(drop_frame=False).write(captions) + # Count standalone 942c lines (clear-screen commands between captions) + clear_lines = [ + line + for line in output.split("\n") + if line.strip().endswith("942c 942c") and "94ae" not in line + ] + # With tight cues, the first cue's clear should be suppressed + # (only the last cue gets a clear-screen at its end time) + assert len(clear_lines) <= 1 + + +class TestSCCWriterSplitLongCaption: + def test_split_caption_exceeding_80_tokens(self): + """A caption that would exceed 80 SCC tokens should be split.""" + # Create a very long caption that will produce many code tokens + long_text = "A" * 32 + "\n" + "B" * 32 + "\n" + "C" * 32 + "\n" + "D" * 32 + srt = "1\n" "00:00:05,000 --> 00:00:10,000\n" f"{long_text}\n" + captions = SRTReader().read(srt) + output = SCCWriter(drop_frame=False).write(captions) + # Each output line (non-empty, non-header) should have <= 80 tokens + for line in output.split("\n"): + line = line.strip() + if not line or line == "Scenarist_SCC V1.0": + continue + # Line format: "HH:MM:SS:FF\t" + parts = line.split("\t") + if len(parts) == 2: + tokens = parts[1].split() + assert len(tokens) <= SCC_TOKENS_PER_CAPTION_MAX, ( + f"Line has {len(tokens)} tokens, " + f"exceeds {SCC_TOKENS_PER_CAPTION_MAX}" + ) + + +class TestSCCWriterRoundTrip: + def test_srt_to_scc_roundtrip_ndf(self): + srt = ( + "1\n00:00:01,000 --> 00:00:03,000\nHello world\n\n" + "2\n00:00:05,000 --> 00:00:07,000\nGoodbye world\n" + ) + captions = SRTReader().read(srt) + scc_output = SCCWriter(drop_frame=False).write(captions) + # Should be readable by SCCReader + result = SCCReader().read(scc_output) + assert not result.is_empty() + assert len(result.get_captions("en-US")) == 2 + + def test_srt_to_scc_roundtrip_df(self): + srt = ( + "1\n00:00:01,000 --> 00:00:03,000\nHello world\n\n" + "2\n00:00:05,000 --> 00:00:07,000\nGoodbye world\n" + ) + captions = SRTReader().read(srt) + scc_output = SCCWriter(drop_frame=True).write(captions) + # SCCReader already supports semicolon (DF) timestamps + result = SCCReader().read(scc_output) + assert not result.is_empty() + assert len(result.get_captions("en-US")) == 2 + + def test_webvtt_to_scc_roundtrip_df(self): + vtt_input = ( + "WEBVTT\n\n" + "0\n00:00:01.529 --> 00:00:03.640\n" + "When it comes to finding the one,\n\n" + "1\n00:00:03.730 --> 00:00:07.239\n" + "I always say that if they can love you\n\n" + "2\n00:00:07.570 --> 00:00:09.319\n" + "calm you at your moodiest,\n" + ) + captions = WebVTTReader().read(vtt_input) + scc_output = SCCWriter(drop_frame=True).write(captions) + result = SCCReader().read(scc_output) + assert not result.is_empty() + assert len(result.get_captions("en-US")) == 3 + + +from pycaption.scc import SCC_TOKENS_PER_CAPTION_MAX # noqa: E402