diff --git a/dlio_benchmark/data_generator/tf_generator.py b/dlio_benchmark/data_generator/tf_generator.py index b97c71ae..d4db97be 100644 --- a/dlio_benchmark/data_generator/tf_generator.py +++ b/dlio_benchmark/data_generator/tf_generator.py @@ -77,6 +77,6 @@ def generate(self): filename = os.path.basename(out_path_spec) self.storage.create_node(index_folder, exist_ok=True) tfrecord_idx = f"{index_folder}/{filename}.idx" - if not os.path.isfile(tfrecord_idx): - call([tfrecord2idx_script, out_path_spec, tfrecord_idx]) + if not self.storage.isfile(tfrecord_idx): + call([tfrecord2idx_script, out_path_spec, self.storage.get_uri(tfrecord_idx)]) np.random.seed() diff --git a/dlio_benchmark/framework/framework.py b/dlio_benchmark/framework/framework.py index f37a62b9..80a5729c 100644 --- a/dlio_benchmark/framework/framework.py +++ b/dlio_benchmark/framework/framework.py @@ -105,3 +105,6 @@ def put_data(self, id, data, offset=None, length=None): def get_data(self, id, data, offset=None, length=None): return None + def isfile(self, id): + return False + diff --git a/dlio_benchmark/framework/tf_framework.py b/dlio_benchmark/framework/tf_framework.py index 175ddbb9..6566ab39 100644 --- a/dlio_benchmark/framework/tf_framework.py +++ b/dlio_benchmark/framework/tf_framework.py @@ -31,6 +31,7 @@ DataLoaderType import tensorflow as tf +import tensorflow_io as tfio from tensorflow.python.framework import errors tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) @@ -102,7 +103,7 @@ def is_nativeio_available(self): @dlp.log def create_node(self, id, exist_ok=False): - tf.io.gfile.mkdir(id) + tf.io.gfile.makedirs(id) return True @dlp.log @@ -140,3 +141,7 @@ def get_data(self, id, data, offset=None, length=None): with tf.io.gfile.GFile(id, "r") as fd: data = fd.read() return data + + @dlp.log + def isfile(self, id): + return tf.io.gfile.exists(id) and not tf.io.gfile.isdir(id) diff --git a/dlio_benchmark/storage/file_storage.py b/dlio_benchmark/storage/file_storage.py index 8b2aa6a6..19208975 100644 --- a/dlio_benchmark/storage/file_storage.py +++ b/dlio_benchmark/storage/file_storage.py @@ -99,5 +99,9 @@ def get_data(self, id, data, offset=None, length=None): data = fd.read() return data + @dlp.log + def isfile(self, id): + return os.path.isfile(id) + def get_basename(self, id): return os.path.basename(id) diff --git a/dlio_benchmark/storage/s3_storage.py b/dlio_benchmark/storage/s3_storage.py index f28c2eaa..1e76bd52 100644 --- a/dlio_benchmark/storage/s3_storage.py +++ b/dlio_benchmark/storage/s3_storage.py @@ -72,5 +72,9 @@ def put_data(self, id, data, offset=None, length=None): def get_data(self, id, data, offset=None, length=None): return super().get_data(self.get_uri(id), data, offset, length) + @dlp.log + def isfile(self, id): + return super().isfile(self.get_uri(id)) + def get_basename(self, id): return os.path.basename(id) \ No newline at end of file diff --git a/dlio_benchmark/storage/storage_handler.py b/dlio_benchmark/storage/storage_handler.py index 44da1db8..3dd084fa 100644 --- a/dlio_benchmark/storage/storage_handler.py +++ b/dlio_benchmark/storage/storage_handler.py @@ -123,3 +123,10 @@ def get_data(self, id, data, offset=None, length=None): return self.framework.get_data(id, data, offset, length) return None + def isfile(self, id): + """ + This method checks if the given path is a file + """ + if self.is_framework_nativeio_available: + return self.framework.isfile(id) + return None diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index cf607cfd..e9ecf2f9 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -300,7 +300,9 @@ def validate(self): if self.num_checkpoints_write > 0: if self.num_checkpoints_read > self.num_checkpoints_write: raise Exception(f"Number of checkpoints to read {self.num_checkpoints_read} cannot be larger than number of checkpoints to write {self.num_checkpoints_write}") - + if self.ksm_present and self.checkpoint_randomize_tensor: + raise Exception(f"checkpoint.ksm is {self.ksm_present} which requires checkpoint.randomize_tensor to be False") + @staticmethod def reset(): ConfigArguments.__instance = None diff --git a/docs/source/config.rst b/docs/source/config.rst index 9f421350..49f83ba7 100644 --- a/docs/source/config.rst +++ b/docs/source/config.rst @@ -443,12 +443,13 @@ checkpoint | Available options are: default, subset. * - randomize_tensor - True - - | randomize the tensors data + - | randomize the tensors data. If it is False, all the checkpoint data will be tensor of ones. * - ksm - (omitted) - | Optional subsection to configure and enable Kernel Samepage Merging (KSM) optimization. | **Simply adding this ``ksm:`` section (even if empty, e.g., ``ksm: {}``) enables KSM features.** - | See the KSM Configuration table below for optional nested keys to fine-tune KSM behavior. + | See the KSM Configuration table below for optional nested keys to fine-tune KSM behavior. + | To use ksm, one has to set randomize_tensor = False. **KSM Configuration (Optional keys under `checkpoint.ksm`)** @@ -486,6 +487,7 @@ checkpoint checkpoint: checkpoint_folder: checkpoints/another_model # ... other checkpoint settings ... + randomize_tensor: False ksm: high_ram_trigger: 25.0 await_time: 150 diff --git a/requirements.txt b/requirements.txt index 11c7539c..8eb1a5d3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,8 @@ pandas>=1.5.1 psutil>=5.9.8 pydftracer==1.0.11 pytest -tensorflow>=2.11.0 +tensorflow>=2.13.1 +tensorflow_io>=0.33.0 torch>=2.2.0 torchaudio torchvision diff --git a/setup.py b/setup.py index 7df0cff7..3b694eff 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,8 @@ x86_deps = [ f"hydra-core>={HYDRA_VERSION}", "nvidia-dali-cuda120>=1.34.0", - "tensorflow>=2.11.0", + "tensorflow>=2.13.1", + "tensorflow_io>=0.33.0", "torch>=2.2.0", "torchaudio", "torchvision", diff --git a/tests/dlio_benchmark_test.py b/tests/dlio_benchmark_test.py index 3f2d2f09..b9a0e0e7 100644 --- a/tests/dlio_benchmark_test.py +++ b/tests/dlio_benchmark_test.py @@ -352,8 +352,9 @@ def test_checkpoint_ksm_config() -> None: '++workload.checkpoint.ksm={}', '++workload.workflow.generate_data=False', '++workload.workflow.train=False', - '++workload.checkpoint.num_checkpoints_write=0', - '++workload.checkpoint.num_checkpoints_read=0' + '++workload.checkpoint.num_checkpoints_write=1', + '++workload.checkpoint.num_checkpoints_read=1', + '++workload.checkpoint.randomize_tensor=False', ]) ConfigArguments.reset() # Pass only the workload part of the config @@ -384,8 +385,9 @@ def test_checkpoint_ksm_config() -> None: '++workload.checkpoint.ksm.await_time=100', '++workload.workflow.generate_data=False', '++workload.workflow.train=False', - '++workload.checkpoint.num_checkpoints_write=0', - '++workload.checkpoint.num_checkpoints_read=0' + '++workload.checkpoint.num_checkpoints_write=1', + '++workload.checkpoint.num_checkpoints_read=1', + '++workload.checkpoint.randomize_tensor=False' ]) ConfigArguments.reset() benchmark = DLIOBenchmark(cfg['workload']) @@ -412,8 +414,9 @@ def test_checkpoint_ksm_config() -> None: '++workload.workflow.checkpoint=True', '++workload.workflow.generate_data=False', '++workload.workflow.train=False', - '++workload.checkpoint.num_checkpoints_write=0', - '++workload.checkpoint.num_checkpoints_read=0' + '++workload.checkpoint.num_checkpoints_write=1', + '++workload.checkpoint.num_checkpoints_read=1', + '++workload.checkpoint.randomize_tensor=False' ]) ConfigArguments.reset() benchmark = DLIOBenchmark(cfg['workload'])