Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 42 additions & 29 deletions sdks/python/apache_beam/runners/worker/data_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
from apache_beam.utils.byte_limited_queue import ByteLimitedQueue

if TYPE_CHECKING:
import apache_beam.coders.slow_stream
Expand Down Expand Up @@ -455,11 +456,16 @@ class _GrpcDataChannel(DataChannel):

def __init__(self, data_buffer_time_limit_ms=0):
# type: (int) -> None

self._data_buffer_time_limit_ms = data_buffer_time_limit_ms
self._to_send = queue.Queue() # type: queue.Queue[DataOrTimers]
self._to_send = ByteLimitedQueue(
maxsize=10000,
maxbytes=100 << 20) # type: ByteLimitedQueue[DataOrTimers]
self._received = collections.defaultdict(
lambda: queue.Queue(maxsize=5)
) # type: DefaultDict[str, queue.Queue[DataOrTimers]]
lambda: ByteLimitedQueue(
maxsize=5,
maxbytes=100 << 20)
) # type: DefaultDict[str, ByteLimitedQueue[DataOrTimers]]

# Keep a cache of completed instructions. Data for completed instructions
# must be discarded. See input_elements() and _clean_receiving_queue().
Expand All @@ -474,15 +480,15 @@ def __init__(self, data_buffer_time_limit_ms=0):

def close(self):
# type: () -> None
self._to_send.put(self._WRITES_FINISHED)
self._to_send.put(self._WRITES_FINISHED, 0)
self._closed = True

def wait(self, timeout=None):
# type: (Optional[int]) -> None
self._reads_finished.wait(timeout)

def _receiving_queue(self, instruction_id):
# type: (str) -> Optional[queue.Queue[DataOrTimers]]
# type: (str) -> Optional[ByteLimitedQueue[DataOrTimers]]

"""
Gets or creates queue for a instruction_id. Or, returns None if the
Expand Down Expand Up @@ -585,21 +591,19 @@ def output_stream(self, instruction_id, transform_id):
def add_to_send_queue(data):
# type: (bytes) -> None
if data:
self._to_send.put(
beam_fn_api_pb2.Elements.Data(
instruction_id=instruction_id,
transform_id=transform_id,
data=data))
elem = beam_fn_api_pb2.Elements.Data(
instruction_id=instruction_id, transform_id=transform_id, data=data)
self._to_send.put(elem, self._get_element_bytes(elem))

def close_callback(data):
# type: (bytes) -> None
add_to_send_queue(data)
# End of stream marker.
self._to_send.put(
beam_fn_api_pb2.Elements.Data(
instruction_id=instruction_id,
transform_id=transform_id,
is_last=True))
elem = beam_fn_api_pb2.Elements.Data(
instruction_id=instruction_id,
transform_id=transform_id,
is_last=True)
self._to_send.put(elem, self._get_element_bytes(elem))

return ClosableOutputStream.create(
close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
Expand All @@ -614,23 +618,23 @@ def output_timer_stream(
def add_to_send_queue(timer):
# type: (bytes) -> None
if timer:
self._to_send.put(
beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
timers=timer,
is_last=False))
elem = beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
timers=timer,
is_last=False)
self._to_send.put(elem, self._get_element_bytes(elem))

def close_callback(timer):
# type: (bytes) -> None
add_to_send_queue(timer)
self._to_send.put(
beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
is_last=True))
elem = beam_fn_api_pb2.Elements.Timers(
instruction_id=instruction_id,
transform_id=transform_id,
timer_family_id=timer_family_id,
is_last=True)
self._to_send.put(elem, self._get_element_bytes(elem))

return ClosableOutputStream.create(
close_callback, add_to_send_queue, self._data_buffer_time_limit_ms)
Expand Down Expand Up @@ -665,6 +669,15 @@ def _write_outputs(self):
raise ValueError('Unexpected output element type %s' % type(stream))
yield beam_fn_api_pb2.Elements(data=data_stream, timers=timer_stream)

def _get_element_bytes(self, element):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_element_bytes sounds as though it returns raw bytes, let's name _get_element_size_bytes ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

# type: (Union[beam_fn_api_pb2.Elements.Data, beam_fn_api_pb2.Elements.Timers]) -> int
if isinstance(element, beam_fn_api_pb2.Elements.Data):
return len(element.data)
elif isinstance(element, beam_fn_api_pb2.Elements.Timers):
return len(element.timers)
else:
return 0

def _read_inputs(self, elements_iterator):
# type: (Iterable[beam_fn_api_pb2.Elements]) -> None

Expand All @@ -691,7 +704,7 @@ def _put_queue(instruction_id, element):
next_discard_log_time = current_time + 10
return
try:
input_queue.put(element, timeout=1)
input_queue.put(element, self._get_element_bytes(element), timeout=1)
return
except queue.Full:
current_time = time.time()
Expand Down
30 changes: 30 additions & 0 deletions sdks/python/apache_beam/utils/byte_limited_queue.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# cython: overflowcheck=True

cdef class ByteLimitedQueue(object):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add the .py counterpart to

extensions = cythonize([
for the cythonized extension to be built.

you can also test performance diffs with regular queue to make sure no surprises there.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new queue is faster than normal queue in the cases there is no blocking because normal queue does extra work for task tracking. The new queue is about the same when there is a single producer blocking (likely how it is used in batch where there is generally a single active bundle per SDK). I added caching of condition variables to help this case since the allocations and GC was slowing it down. The new queue is slower if it is blocking frequently due to the list being maintained due to being fair but it is still fast 23us per element.

cdef readonly int max_elements
cdef readonly int max_bytes
cdef readonly int _byte_size
cdef readonly object _mutex
cdef readonly object _not_empty
cdef readonly object _waiting_writers
cdef readonly object _queue
cdef readonly int _blocked_bytes

cpdef bint _is_full_locked(self, int item_bytes) except -1
Comment thread
scwhittle marked this conversation as resolved.
Outdated
196 changes: 196 additions & 0 deletions sdks/python/apache_beam/utils/byte_limited_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A thread-safe queue that limits capacity by total byte size."""

import collections
import queue
import threading
import time
import types


class ByteLimitedQueue(object):
"""A fair queue that limits by both element count and total byte size.

A single element is allowed to exceed the maxbytes to avoid deadlock.
"""
__class_getitem__ = classmethod(types.GenericAlias)

def __init__(
self,
maxsize=0, # type: int
maxbytes=0, # type: int
):
# type: (...) -> None

"""Initializes a ByteLimitedQueue.

Args:
maxsize: The maximum number of items allowed in the queue. If 0 or
negative, there is no limit on the number of elements.
maxbytes: The maximum accumulated bytes allowed in the queue. If 0 or
negative, there is no limit on the total bytes of the elements.
"""
self.max_elements = maxsize
self.max_bytes = maxbytes
self._byte_size = 0
self._blocked_bytes = 0
self._mutex = threading.Lock()
self._not_empty = threading.Condition(self._mutex)
self._waiting_writers = collections.deque()
self._queue = collections.deque()

def put(self, item, item_bytes, block=True, timeout=None):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider marking block and timeout as keyword-only args.

def put(self, item, item_bytes, *, block=True, timeout=None)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

"""Put an item into the queue.

If the queue is full, block until a free slot is available, unless `block`
is false or a timeout occurs.

Args:
item: The item to put into the queue.
item_bytes: The size of the item.
block: If True, block until space is available. If False, raise queue.Full
immediately if the queue is full.
timeout: If block is True, wait for at most `timeout` seconds. If None,
block indefinitely.

Raises:
ValueError: If timeout or item_bytes is negative.
queue.Full: If the queue is full and block is False or the timeout occurs.
"""
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
if item_bytes < 0:
raise ValueError("'item_bytes' must be a non-negative number")

with self._mutex:
if not self._waiting_writers and not self._is_full_locked(item_bytes):
self._queue.append((item, item_bytes))
self._byte_size += item_bytes
self._not_empty.notify()
return

if not block:
raise queue.Full

my_cond = threading.Condition(self._mutex)
endtime = time.monotonic() + timeout if timeout is not None else None
try:
self._blocked_bytes += item_bytes
self._waiting_writers.append(my_cond)
while True:
if timeout is None:
my_cond.wait()
else:
remaining = endtime - time.monotonic()
if remaining <= 0.0:
raise queue.Full
my_cond.wait(remaining)

if self._waiting_writers[0] is my_cond and not self._is_full_locked(
item_bytes):
break

self._queue.append((item, item_bytes))
self._byte_size += item_bytes
self._not_empty.notify()
finally:
self._blocked_bytes -= item_bytes
if self._waiting_writers:
was_first = (self._waiting_writers[0] is my_cond)
if was_first:
self._waiting_writers.popleft()
else:
self._waiting_writers.remove(my_cond)
if was_first and self._waiting_writers:
self._waiting_writers[0].notify()

def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.

If the queue is empty, block until an item is available, unless `block`
is false or a timeout occurs.

Args:
block: If True, block until an item is available. If False, raise
queue.Empty immediately if the queue is empty.
timeout: If block is True, wait for at most `timeout` seconds. If None,
block indefinitely.

Returns:
The item removed from the queue.

Raises:
ValueError: If timeout is negative.
queue.Empty: If the queue is empty and block is False or the timeout
occurs.
"""
if timeout is not None and timeout < 0:
raise ValueError("'timeout' must be a non-negative number")

with self._not_empty:
if not block:
if not self._queue:
raise queue.Empty
elif timeout is None:
while not self._queue:
self._not_empty.wait()
else:
endtime = time.monotonic() + timeout
Comment thread
tvalentyn marked this conversation as resolved.
while not self._queue:
remaining = endtime - time.monotonic()
if remaining <= 0.0:
raise queue.Empty
self._not_empty.wait(remaining)

item, item_bytes = self._queue.popleft()
self._byte_size -= item_bytes

if self._waiting_writers:
self._waiting_writers[0].notify()

return item

def get_nowait(self):
"""Remove and return an item from the queue without blocking."""
return self.get(block=False)

def byte_size(self):
"""Return the total byte size of elements in the queue."""
with self._mutex:
return self._byte_size

def blocked_byte_size(self):
"""Return the total byte size of elements in the queue that are blocked."""
with self._mutex:
return self._blocked_bytes

def qsize(self):
"""Return the total number of elements in the queue."""
with self._mutex:
return len(self._queue)

def _is_full_locked(self, item_bytes):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since it is used negated in the code, consider instead naming it _can_fit, reversing the logic below and removing negations.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

# Always let in a single element, regardless of size.
if not self._queue:
return False
if self.max_elements > 0 and len(self._queue) >= self.max_elements:
return True
if self.max_bytes > 0 and self._byte_size + item_bytes > self.max_bytes:
return True
return False
Loading
Loading