Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dlio_benchmark/data_generator/tf_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ def generate(self):
filename = os.path.basename(out_path_spec)
self.storage.create_node(index_folder, exist_ok=True)
tfrecord_idx = f"{index_folder}/{filename}.idx"
if not os.path.isfile(tfrecord_idx):
call([tfrecord2idx_script, out_path_spec, tfrecord_idx])
if not self.storage.isfile(tfrecord_idx):
call([tfrecord2idx_script, out_path_spec, self.storage.get_uri(tfrecord_idx)])
np.random.seed()
3 changes: 3 additions & 0 deletions dlio_benchmark/framework/framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,6 @@ def put_data(self, id, data, offset=None, length=None):
def get_data(self, id, data, offset=None, length=None):
return None

def isfile(self, id):
return False

30 changes: 28 additions & 2 deletions dlio_benchmark/framework/tf_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
DataLoaderType

import tensorflow as tf
import tensorflow_io as tfio
from tensorflow.python.framework import errors
import boto3

tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

Expand All @@ -52,6 +54,9 @@ def __init__(self, profiling):
else:
self.tensorboard = ProfilerFactory.get_profiler(Profiler.TENSORBOARD)
self.reader_handler = None
self.s3 = boto3.client("s3", \
endpoint_url=os.getenv('S3_ENDPOINT'), \
region_name=os.getenv('AWS_REGION'))

@dlp.log
def init_loader(self, format_type, epoch=0, data_loader=None):
Expand Down Expand Up @@ -102,7 +107,7 @@ def is_nativeio_available(self):

@dlp.log
def create_node(self, id, exist_ok=False):
tf.io.gfile.mkdir(id)
tf.io.gfile.makedirs(id)
return True

@dlp.log
Expand All @@ -119,7 +124,24 @@ def get_node(self, id):
def walk_node(self, id, use_pattern=False):
try:
if not use_pattern:
return tf.io.gfile.listdir(id)
# parse id to get bucket name and prefix

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the motivation in overriding the function?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the bug that you referred to?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because when I run tf.io.gfile.listdir, I encountered an IndexError, saying the position passed into s.substr() is greater than s.size().
I looked into tensorflow code, tf.io.gfile.listdir is defined by list_directory_v2(), which calls GetChildren(). GetChildren() is defined in tensorflow/io and will basically return all the objects in the bucket with the given prefix. This func tries to do s.substr(s.size()+1). There is a detailed code walk in tensorflow/io#2149 that explains the bug.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnugeorge does this look good to you? Are you able to do a quick test?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this break the storage/framework abstraction? Right now, this PR will tie S3 storage with the tf framework.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnugeorge yes, but this is just to fix the original implementation, right? The original implementation for tf framework as well. All the create_node, walk_node functions are part of the framework class.

For pytorch, we can implement the same storage functions under pytorch_framework class.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@annie-anna , after discussing with Johnu, we cannot accept the change in the framework layer. All the change should be inside the storage layer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenghh04 @johnugeorge ack, I reverted changes in walk_node.

scheme_end = id.find('://') + 2
bucket_end = id.find('/', scheme_end + 1)
bucket = id[scheme_end + 1 : bucket_end]
prefix = id[bucket_end + 1 :]
if not prefix.endswith('/'):
prefix = prefix + '/'

resp = []
paginator = self.s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/')
for page in pages:
if page['KeyCount'] == 0:
continue
for obj in page['Contents']:
filename = obj['Key'].split('/')[-1]
resp.append(filename)
return resp
else:
return tf.io.gfile.glob(id)
except errors.NotFoundError:
Expand All @@ -140,3 +162,7 @@ def get_data(self, id, data, offset=None, length=None):
with tf.io.gfile.GFile(id, "r") as fd:
data = fd.read()
return data

@dlp.log
def isfile(self, id):
return tf.io.gfile.exists(id) and not tf.io.gfile.isdir(id)
4 changes: 4 additions & 0 deletions dlio_benchmark/storage/file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,9 @@ def get_data(self, id, data, offset=None, length=None):
data = fd.read()
return data

@dlp.log
def isfile(self, id):
return os.path.isfile(id)

def get_basename(self, id):
return os.path.basename(id)
4 changes: 4 additions & 0 deletions dlio_benchmark/storage/s3_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,9 @@ def put_data(self, id, data, offset=None, length=None):
def get_data(self, id, data, offset=None, length=None):
return super().get_data(self.get_uri(id), data, offset, length)

@dlp.log
def isfile(self, id):
return super().isfile(self.get_uri(id))

def get_basename(self, id):
return os.path.basename(id)
7 changes: 7 additions & 0 deletions dlio_benchmark/storage/storage_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,10 @@ def get_data(self, id, data, offset=None, length=None):
return self.framework.get_data(id, data, offset, length)
return None

def isfile(self, id):
"""
This method checks if the given path is a file
"""
if self.is_framework_nativeio_available:
return self.framework.isfile(id)
return None
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ pandas>=1.5.1
psutil>=5.9.8
pydftracer==1.0.11
pytest
tensorflow>=2.11.0
tensorflow==2.13.1
tensorflow_io==0.33.0
torch>=2.2.0
torchaudio
torchvision
boto3
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
"omegaconf>=2.2.0",
"pandas>=1.5.1",
"psutil>=5.9.8",
"boto3",
]
x86_deps = [
f"hydra-core>={HYDRA_VERSION}",
"nvidia-dali-cuda120>=1.34.0",
"tensorflow>=2.11.0",
"tensorflow==2.13.1",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we set tensorflow>=2.13.1?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below.

"tensorflow_io==0.33.0",
"torch>=2.2.0",
"torchaudio",
"torchvision",
Expand Down
Loading