Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@

* Fixed BigQueryEnrichmentHandler batch mode dropping earlier requests when multiple requests share the same enrichment key (Python) ([#38035](https://github.com/apache/beam/issues/38035)).
* Added `max_batch_duration_secs` passthrough support in Python Enrichment BigQuery and CloudSQL handlers so batching duration can be forwarded to `BatchElements` ([#38243](https://github.com/apache/beam/issues/38243)).
* `DoFn.process` returning a `str`, `bytes`, or `dict` (instead of an iterable wrapping one) now raises a clear `TypeError` rather than silently iterating per-character/byte/key (Python) ([#18712](https://github.com/apache/beam/issues/18712)).

## Security Fixes

Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/runners/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ cdef class _OutputHandler(OutputHandler):
cdef object output_batch_converter
cdef bint _process_batch_yields_elements
cdef bint _process_yields_batches
cdef bint _check_user_dofn_output

@cython.locals(windowed_value=WindowedValue,
windowed_batch=WindowedBatch,
Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1475,6 +1475,7 @@ def __init__(
do_fn_signature.process_batch_method.method_value,
'_beam_yields_elements',
False),
check_user_dofn_output=not isinstance(fn, core.CallableWrapperDoFn),
)

if do_fn_signature.is_stateful_dofn() and not user_state_context:
Expand Down Expand Up @@ -1633,6 +1634,7 @@ def __init__(
output_batch_converter, # type: Optional[BatchConverter]
process_yields_batches, # type: bool
process_batch_yields_elements, # type: bool
check_user_dofn_output=False, # type: bool
):
"""Initializes ``_OutputHandler``.

Expand All @@ -1642,6 +1644,12 @@ def __init__(
tagged_receivers: main receiver object.
per_element_output_counter: per_element_output_counter of one work_item.
could be none if experimental flag turn off
check_user_dofn_output: if True, validate that a user-class DoFn does not
return a str/bytes/dict (a common bug — see
https://github.com/apache/beam/issues/18712).
Skipped for callable-wrapped DoFns (Map/FlatMap)
where iterating a returned str/bytes/dict is a
legitimate flatten use case.
"""
self.window_fn = window_fn
self.main_receivers = main_receivers
Expand All @@ -1654,6 +1662,7 @@ def __init__(
self.output_batch_converter = output_batch_converter
self._process_yields_batches = process_yields_batches
self._process_batch_yields_elements = process_batch_yields_elements
self._check_user_dofn_output = check_user_dofn_output

def handle_process_outputs(
self, windowed_input_element, results, watermark_estimator=None):
Expand All @@ -1667,6 +1676,13 @@ def handle_process_outputs(
if results is None:
results = []

if self._check_user_dofn_output and isinstance(results, (str, bytes, dict)):
object_type = type(results).__name__
raise TypeError(
'Returning a %s from a ParDo or FlatMap is discouraged. '
'Please use list("%s") if you really want this behavior.' %
(object_type, results))
Comment thread
chrisqiqiu marked this conversation as resolved.

# TODO(https://github.com/apache/beam/issues/20404): Verify that the
# results object is a valid iterable type if
# performance_runtime_type_check is active, without harming performance
Expand Down
37 changes: 37 additions & 0 deletions sdks/python/apache_beam/runners/common_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,43 @@ def process(self, element, mykey=DoFn.KeyParam):
test_stream = (TestStream().advance_watermark_to(10).add_elements([1, 2]))
(p | test_stream | beam.ParDo(DoFnProcessWithKeyparam()))

def test_dofn_returning_str_raises_clear_error(self):
"""Regression test for https://github.com/apache/beam/issues/18712.

A DoFn returning a str instead of an iterable wrapping one used to
silently iterate per-character. It should now raise a clear TypeError.
"""
class BadDoFn(DoFn):
def process(self, element):
return 'hello'

with self.assertRaisesRegex(
Exception, 'Returning a str from a ParDo or FlatMap is discouraged'):
Comment thread
chrisqiqiu marked this conversation as resolved.
with TestPipeline() as p:
_ = p | beam.Create([0]) | beam.ParDo(BadDoFn())

def test_dofn_returning_bytes_raises_clear_error(self):
"""Regression test for https://github.com/apache/beam/issues/18712."""
class BadDoFn(DoFn):
def process(self, element):
return b'hello'

with self.assertRaisesRegex(
Exception, 'Returning a bytes from a ParDo or FlatMap is discouraged'):
Comment thread
chrisqiqiu marked this conversation as resolved.
with TestPipeline() as p:
_ = p | beam.Create([0]) | beam.ParDo(BadDoFn())

def test_dofn_returning_dict_raises_clear_error(self):
"""Regression test for https://github.com/apache/beam/issues/18712."""
class BadDoFn(DoFn):
def process(self, element):
return {'k': 'v'}

with self.assertRaisesRegex(
Exception, 'Returning a dict from a ParDo or FlatMap is discouraged'):
Comment thread
chrisqiqiu marked this conversation as resolved.
with TestPipeline() as p:
_ = p | beam.Create([0]) | beam.ParDo(BadDoFn())

def test_pardo_with_unbounded_per_element_dofn(self):
class UnboundedDoFn(beam.DoFn):
@beam.DoFn.unbounded_per_element()
Expand Down
Loading