diff --git a/src/workflow/FileManager.py b/src/workflow/FileManager.py index 46227bb..9dd6077 100644 --- a/src/workflow/FileManager.py +++ b/src/workflow/FileManager.py @@ -1,4 +1,5 @@ import gzip +import os import shutil import string import random @@ -50,6 +51,13 @@ def __init__( self._connect_to_sql() def _connect_to_sql(self): + # In a loaded demo workspace cache.db may be a symlink into the + # read-only ground truth; sqlite3 writes the database in place and would + # follow the link. Replace it with an independent copy (preserving the + # demo's existing index rows) before connecting. + self._materialize_if_symlink( + Path(self.cache_path, 'cache.db'), preserve_content=True + ) self.cache_connection = sqlite3.connect( Path(self.cache_path, 'cache.db'), isolation_level=None ) @@ -79,6 +87,38 @@ def __setstate__(self, state): self.__dict__.update(state) self._connect_to_sql() + def _materialize_if_symlink(self, path: Path, preserve_content: bool) -> None: + """ + Replace a symlinked cache entry with an independent real file inside the + workspace, so that writes never follow the link back to the read-only + demo ground truth. + + On Linux, loading a demo workspace materializes it by symlinking cache + files to the shared source under ``example-data/``. Writing through such + a symlink (e.g. via ``sqlite3``, ``open(..., 'wb')`` or ``shutil.copy``) + would modify the ground truth instead of the user's workspace. Calling + this just before a write makes the workspace diverge on first write + while leaving the ground truth untouched. + + Args: + path (Path): The cache file that is about to be written. + preserve_content (bool): If True, copy the link target's bytes first + (used for ``cache.db``, whose existing index rows must survive). + If False, simply unlink the symlink (used for result files that + are about to be fully overwritten). + """ + if not path.is_symlink(): + return + if preserve_content: + source = path.resolve() + tmp = path.with_name(path.name + '.materialize.tmp') + shutil.copy2(source, tmp) + # os.replace swaps the symlink entry for the real copy atomically; + # it does not write through the link. + os.replace(tmp, path) + else: + path.unlink() + def get_files( self, files: Union[List[Union[str, Path]], Path, str, List[List[str]]], @@ -312,6 +352,7 @@ def _store_data(self, dataset_id: str, name_tag: str, data, row_group_size=None) # Polars DataFrames and LazyFrames are stored as parquet if isinstance(data, (pl.DataFrame, pl.LazyFrame)): path = Path(path, f"{name_tag}.pq") + self._materialize_if_symlink(path, preserve_content=False) if isinstance(data, pl.LazyFrame): # Keep the streaming sink when no bounded row groups are requested # (default callers). Only materialize when row_group_size is set, @@ -326,12 +367,14 @@ def _store_data(self, dataset_id: str, name_tag: str, data, row_group_size=None) # Pandas DataFrames are stored as parquet elif isinstance(data, pd.DataFrame): path = Path(path, f"{name_tag}.pq") + self._materialize_if_symlink(path, preserve_content=False) with open(path, 'wb') as f: data.to_parquet(f, row_group_size=row_group_size) return path # Other data structures are stored as compressed pickle else: path = Path(path, f"{name_tag}.pkl.gz") + self._materialize_if_symlink(path, preserve_content=False) with gzip.open(path, 'wb') as f: pkl.dump(data, f) return path @@ -399,6 +442,9 @@ def store_file(self, dataset_id: str, name_tag: str, file: Path | BytesIO, ) target_path.parent.mkdir(parents=True, exist_ok=True) + # Never write through a symlink into the read-only demo ground truth + self._materialize_if_symlink(target_path, preserve_content=False) + # Store file in path if isinstance(file, BytesIO): with open(target_path, 'wb') as f: diff --git a/tests/test_filemanager_symlink_cow.py b/tests/test_filemanager_symlink_cow.py new file mode 100644 index 0000000..3888a8b --- /dev/null +++ b/tests/test_filemanager_symlink_cow.py @@ -0,0 +1,163 @@ +""" +Tests for FileManager copy-on-write protection of demo "ground truth". + +Bug being fixed: when a demo workspace is loaded in online mode on Linux, the +workspace is materialized by symlinking every committed demo file back to the +read-only ground truth under ``example-data/`` (see ``_symlink_tree`` / +``copy_demo_workspace`` in ``src/common/common.py``). The committed demo ships +writable cache artifacts (``cache.db`` and ``cache/files//*``). Because +writing to a symlink follows it to its target, reprocessing data used to write +through those symlinks and overwrite the committed ground truth: + + * ``cache.db`` (guaranteed): ``sqlite3`` writes the database in place on every + ``store_*`` call. + * result files: clobbered whenever the reprocessed ``dataset_id`` collided + with a demo dataset id. + +FileManager now performs copy-on-write: it materializes a real file in the +workspace before any in-place write, so the ground truth is never modified. + +These tests pin that behaviour by simulating a symlinked cache and asserting the +ground-truth bytes survive a reprocess while the workspace diverges. +""" + +import os +import sys + +import pytest + +# FileManager imports these at module import time; skip the suite if a stripped +# environment lacks them (matches the convention of the other test modules). +pd = pytest.importorskip("pandas") +pl = pytest.importorskip("polars") +pytest.importorskip("pyarrow") + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from pathlib import Path + +from src.workflow.FileManager import FileManager + + +# Result artifacts the demo ships, one per FileManager write path: +PICKLE_TAG = "orig_pickle" # store_data -> .pkl.gz (gzip.open write branch) +FRAME_TAG = "orig_frame" # store_data -> .pq (pandas to_parquet branch) +POLARS_TAG = "orig_polars" # store_data -> .pq (polars write_parquet branch) +FILE_TAG = "orig_file" # store_file -> .bin (shutil.copy branch) +DEMO_DS = "demods" + + +def _seed_ground_truth(gt_cache: Path) -> None: + """Populate a cache dir the way the committed demo does: a real cache.db + plus real result files under files//.""" + fm = FileManager(gt_cache.parent / "gt-wf", cache_path=gt_cache) + fm.store_data(DEMO_DS, PICKLE_TAG, {"demo": True}) + fm.store_data(DEMO_DS, FRAME_TAG, pd.DataFrame({"a": [1, 2, 3]})) + fm.store_data(DEMO_DS, POLARS_TAG, pl.DataFrame({"b": [4, 5, 6]})) + src = gt_cache.parent / "seed_src.bin" + src.write_bytes(b"GROUND TRUTH BIN") + fm.store_file(DEMO_DS, FILE_TAG, src, file_name="orig_file.bin") + fm.cache_connection.close() + + +def _symlink_workspace(gt_cache: Path, ws_cache: Path) -> None: + """Mimic _symlink_tree: real directories, but the cache files are symlinks + pointing at the ground-truth source (absolute, like item.resolve()).""" + (ws_cache / "files" / DEMO_DS).mkdir(parents=True) + (ws_cache / "cache.db").symlink_to((gt_cache / "cache.db").resolve()) + for name in ( + f"{PICKLE_TAG}.pkl.gz", + f"{FRAME_TAG}.pq", + f"{POLARS_TAG}.pq", + "orig_file.bin", + ): + link = ws_cache / "files" / DEMO_DS / name + link.symlink_to((gt_cache / "files" / DEMO_DS / name).resolve()) + + +def _gt_files(gt_cache: Path) -> list[Path]: + base = gt_cache / "files" / DEMO_DS + return [ + gt_cache / "cache.db", + base / f"{PICKLE_TAG}.pkl.gz", + base / f"{FRAME_TAG}.pq", + base / f"{POLARS_TAG}.pq", + base / "orig_file.bin", + ] + + +def test_cache_db_symlink_is_materialized_on_connect(tmp_path): + """Opening a workspace whose cache.db is a symlink must replace it with an + independent real copy (preserving the demo index) and leave ground truth + untouched.""" + gt_cache = tmp_path / "ground_truth" / "cache" + gt_cache.mkdir(parents=True) + _seed_ground_truth(gt_cache) + + gt_db = gt_cache / "cache.db" + gt_db_bytes = gt_db.read_bytes() + + ws_cache = tmp_path / "workspace" / "cache" + _symlink_workspace(gt_cache, ws_cache) + assert (ws_cache / "cache.db").is_symlink() # precondition + + fm = FileManager(tmp_path / "workspace" / "wf", cache_path=ws_cache) + try: + # Workspace cache.db is now a real, independent file... + assert not (ws_cache / "cache.db").is_symlink() + assert (ws_cache / "cache.db").is_file() + # ...the demo's index rows were preserved in the workspace copy... + assert fm.result_exists(DEMO_DS, PICKLE_TAG) + # ...and the ground-truth database is byte-for-byte unchanged. + assert gt_db.read_bytes() == gt_db_bytes + finally: + fm.cache_connection.close() + + +def test_reprocess_does_not_write_through_symlinks(tmp_path): + """Reprocessing a dataset whose id collides with the demo must not modify + any ground-truth file; the workspace copies diverge instead.""" + gt_cache = tmp_path / "ground_truth" / "cache" + gt_cache.mkdir(parents=True) + _seed_ground_truth(gt_cache) + + snapshot = {p: p.read_bytes() for p in _gt_files(gt_cache)} + + ws_cache = tmp_path / "workspace" / "cache" + _symlink_workspace(gt_cache, ws_cache) + + fm = FileManager(tmp_path / "workspace" / "wf", cache_path=ws_cache) + try: + # Reprocess: overwrite every artifact through what are currently symlinks. + fm.store_data(DEMO_DS, PICKLE_TAG, {"reprocessed": 999}) + fm.store_data(DEMO_DS, FRAME_TAG, pd.DataFrame({"a": [10, 20]})) + fm.store_data(DEMO_DS, POLARS_TAG, pl.DataFrame({"b": [70, 80]})) + new_src = tmp_path / "new_src.bin" + new_src.write_bytes(b"WORKSPACE NEW BIN") + fm.store_file(DEMO_DS, FILE_TAG, new_src, file_name="orig_file.bin") + + # Ground truth is completely unchanged. + for p, original in snapshot.items(): + assert p.read_bytes() == original, f"ground truth modified: {p}" + assert (gt_cache / "files" / DEMO_DS / "orig_file.bin").read_bytes() == ( + b"GROUND TRUTH BIN" + ) + + # Workspace files are now real files (not symlinks) with new content. + ws_files = ws_cache / "files" / DEMO_DS + for name in ( + f"{PICKLE_TAG}.pkl.gz", + f"{FRAME_TAG}.pq", + f"{POLARS_TAG}.pq", + "orig_file.bin", + ): + wp = ws_files / name + assert not wp.is_symlink(), f"{name} is still a symlink into ground truth" + assert wp.is_file() + assert (ws_files / "orig_file.bin").read_bytes() == b"WORKSPACE NEW BIN" + + # Reading back through the workspace FileManager returns new content. + assert fm.get_results(DEMO_DS, [PICKLE_TAG])[PICKLE_TAG] == {"reprocessed": 999} + assert fm.get_results(DEMO_DS, [FRAME_TAG])[FRAME_TAG]["a"].tolist() == [10, 20] + finally: + fm.cache_connection.close()