-
Notifications
You must be signed in to change notification settings - Fork 132
Adding ignore list to skim load for unused skims while running with sharrow #1036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c031c8b
ad83a54
0fa965b
27414c1
0d342b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -896,8 +896,8 @@ def _should_ignore(ignore, x): | |
| ) | ||
|
|
||
| d = _drop_unused_names(state, d) | ||
| # apply non-zarr dependent digital encoding | ||
| d = _apply_digital_encoding(d, skim_digital_encoding) | ||
| # note: digital encoding is deferred and applied later, after | ||
| # data is in its final memory location (shared memory or local) | ||
|
|
||
| if skim_tag in ("taz", "maz"): | ||
| # check alignment of TAZs that it matches land_use table | ||
|
|
@@ -909,11 +909,79 @@ def _should_ignore(ignore, x): | |
| else: | ||
| land_use_zone_id = None | ||
|
|
||
| time_periods = _dedupe_time_periods(network_los_preload) | ||
| return _finalize_skim_dataset( | ||
| d, | ||
| omx_file_paths=omx_file_paths, | ||
| omx_file_handles=omx_file_handles, | ||
| time_periods=time_periods, | ||
| land_use_zone_id=land_use_zone_id, | ||
| land_use_index=land_use.index.to_numpy(), | ||
| zone_system=network_los_preload.zone_system, | ||
| store_skims_in_shm=state.settings.store_skims_in_shm, | ||
| backing=backing, | ||
| skim_digital_encoding=skim_digital_encoding, | ||
| omx_ignore_patterns=state.settings.omx_ignore_patterns, | ||
| ) | ||
|
|
||
|
|
||
| def _finalize_skim_dataset( | ||
| d, | ||
| omx_file_paths, | ||
| omx_file_handles, | ||
| time_periods, | ||
| land_use_zone_id, | ||
| land_use_index, | ||
| zone_system, | ||
| store_skims_in_shm, | ||
| backing, | ||
| skim_digital_encoding, | ||
| omx_ignore_patterns=None, | ||
| ): | ||
| """ | ||
| Align, optionally share, and encode a skim dataset. | ||
|
|
||
| This covers the final phase of ``load_skim_dataset_to_shared_memory``: | ||
| zone alignment checks, writing into shared memory (with the deferred | ||
| ``reload_from_omx_3d`` path when possible), and digital encoding. | ||
|
|
||
| Parameters | ||
| ---------- | ||
| d : xr.Dataset | ||
| The dataset as loaded from OMX / zarr, with unused variables already | ||
| dropped. May be backed by dask arrays. | ||
| omx_file_paths : list[Path] | ||
| Paths to the OMX source files (needed for `reload_from_omx_3d`). | ||
| omx_file_handles : list[openmatrix.File] | ||
| Already-open OMX file handles. Closed before returning. | ||
| time_periods : list[str] | ||
| Deduplicated time-period labels (e.g. ``["AM", "MD", "PM"]``). | ||
| land_use_zone_id : array-like or None | ||
| Original (pre-remap) zone IDs from the land-use table, or ``None`` | ||
| for non-taz/maz skim tags. | ||
| land_use_index : array-like or None | ||
| Zero-based contiguous land-use index, or ``None``. | ||
| zone_system : int | ||
| ``ONE_ZONE``, ``TWO_ZONE``, or ``THREE_ZONE``. | ||
| store_skims_in_shm : bool | ||
| Whether to store the dataset in shared memory. | ||
| backing : str | ||
| Shared-memory backing token / memmap path. | ||
| skim_digital_encoding : list[dict] | ||
| Digital encoding instructions to apply after data is in its final | ||
| memory location. | ||
| omx_ignore_patterns : list[str] or None | ||
| User-supplied OMX ignore patterns (from settings). | ||
|
|
||
| Returns | ||
| ------- | ||
| xr.Dataset | ||
| """ | ||
| from activitysim.core.los import ONE_ZONE | ||
|
|
||
| dask_required = False | ||
| if network_los_preload.zone_system == ONE_ZONE: | ||
| if zone_system == ONE_ZONE and land_use_zone_id is not None: | ||
| # check TAZ alignment for ONE_ZONE system. | ||
| # other systems use MAZ for most lookups, which dynamically | ||
| # resolves to TAZ inside the Dataset code. | ||
| if d["otaz"].attrs.get("preprocessed") != "zero-based-contiguous": | ||
| try: | ||
| np.testing.assert_array_equal(land_use_zone_id, d.otaz) | ||
|
|
@@ -923,10 +991,10 @@ def _should_ignore(ignore, x): | |
| dask_required = True | ||
| else: | ||
| logger.info("otaz alignment ok") | ||
| d["otaz"] = land_use.index.to_numpy() | ||
| d["otaz"] = np.asarray(land_use_index) | ||
| d["otaz"].attrs["preprocessed"] = "zero-based-contiguous" | ||
| else: | ||
| np.testing.assert_array_equal(land_use.index, d.otaz) | ||
| np.testing.assert_array_equal(land_use_index, d.otaz) | ||
|
|
||
| if d["dtaz"].attrs.get("preprocessed") != "zero-based-contiguous": | ||
| try: | ||
|
|
@@ -937,24 +1005,28 @@ def _should_ignore(ignore, x): | |
| dask_required = True | ||
| else: | ||
| logger.info("dtaz alignment ok") | ||
| d["dtaz"] = land_use.index.to_numpy() | ||
| d["dtaz"] = np.asarray(land_use_index) | ||
| d["dtaz"].attrs["preprocessed"] = "zero-based-contiguous" | ||
| else: | ||
| np.testing.assert_array_equal(land_use.index, d.dtaz) | ||
| np.testing.assert_array_equal(land_use_index, d.dtaz) | ||
|
|
||
| if d.shm.is_shared_memory: | ||
| for f in omx_file_handles: | ||
| f.close() | ||
| return d | ||
| elif not state.settings.store_skims_in_shm: | ||
| elif not store_skims_in_shm: | ||
| logger.info( | ||
| "store_skims_in_shm is False, keeping skims in process-local memory" | ||
| ) | ||
| for f in omx_file_handles: | ||
| f.close() | ||
| d = _apply_digital_encoding(d, skim_digital_encoding) | ||
| return d | ||
|
Comment on lines
1018
to
1024
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dhensle this is a legit point, can you consider/address? I think leaving file handles dangling open indefinitely isn't the best, but it might be a better solution than closing them prematurely. Computing the dataset to force-load everything isn't ideal because we might not need everything -- especially if we're not running a full end-to-end model run. |
||
| else: | ||
| logger.info("writing skims to shared memory") | ||
| if dask_required: | ||
| # setting `load` to True uses dask to load the data into memory | ||
| d = _apply_digital_encoding(d, skim_digital_encoding) | ||
| d_shared_mem = d.shm.to_shared_memory(backing, mode="r", load=True) | ||
| else: | ||
| # setting `load` to false then calling `reload_from_omx_3d` avoids | ||
|
|
@@ -963,11 +1035,34 @@ def _should_ignore(ignore, x): | |
| # requires no realignment (i.e. the land use table and skims match | ||
| # exactly in order and length). | ||
| d_shared_mem = d.shm.to_shared_memory(backing, mode="r", load=False) | ||
| # Build an extended ignore list that includes any skims that were | ||
| # dropped as unused, so reload_from_omx_3d doesn't try to load them. | ||
| # We must account for 3D skims where OMX names have time period | ||
| # suffixes (e.g. WLK_Bus_Ivtt__AM) but the dataset variable is | ||
| # the collapsed name (e.g. WLK_Bus_Ivtt). | ||
| reload_ignore = list(omx_ignore_patterns or []) | ||
| ds_var_names = set(d.variables.keys()) | ||
| # expand dataset variable names to include their time-period | ||
| # suffixed OMX equivalents, so we don't accidentally drop them | ||
| ds_omx_names = set() | ||
| for var_name in ds_var_names: | ||
| ds_omx_names.add(var_name) | ||
| for tp in time_periods: | ||
| ds_omx_names.add(f"{var_name}__{tp}") | ||
| all_omx_names = set() | ||
| for f in omx_file_handles: | ||
| all_omx_names.update(f.list_matrices()) | ||
| dropped_names = all_omx_names - ds_omx_names | ||
| for name in dropped_names: | ||
| reload_ignore.append(f"^{re.escape(name)}$") | ||
| sh.dataset.reload_from_omx_3d( | ||
| d_shared_mem, | ||
| [str(i) for i in omx_file_paths], | ||
| ignore=state.settings.omx_ignore_patterns, | ||
| ignore=reload_ignore, | ||
| ) | ||
| # apply digital encoding AFTER reload so raw OMX values are | ||
| # properly encoded in the shared memory dataset | ||
| d_shared_mem = _apply_digital_encoding(d_shared_mem, skim_digital_encoding) | ||
| for f in omx_file_handles: | ||
| f.close() | ||
| return d_shared_mem | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.