Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
version,
)
from .deserializing_consumer import DeserializingConsumer
from .deserializing_share_consumer import DeserializingShareConsumer
from .error import KafkaError, KafkaException
from .serializing_producer import SerializingProducer

Expand All @@ -73,6 +74,7 @@
"OFFSET_STORED",
"Producer",
"DeserializingConsumer",
"DeserializingShareConsumer",
"SerializingProducer",
"TIMESTAMP_CREATE_TIME",
"TIMESTAMP_LOG_APPEND_TIME",
Expand Down
1 change: 1 addition & 0 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ class Message:
def set_headers(self, headers: HeadersType) -> None: ...
def set_key(self, key: Any) -> None: ...
def set_value(self, value: Any) -> None: ...
def set_error(self, error: Optional[KafkaError]) -> None: ...
def __len__(self) -> int: ...

class TopicPartition:
Expand Down
162 changes: 162 additions & 0 deletions src/confluent_kafka/deserializing_share_consumer.py
Comment thread
pranavrth marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2026 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Any, Dict, List

from confluent_kafka.cimpl import KafkaError, Message
from confluent_kafka.cimpl import ShareConsumer as _ShareConsumerImpl

from .serialization import MessageField, SerializationContext


class DeserializingShareConsumer(_ShareConsumerImpl):
"""
A high level KIP-932 share consumer with deserialization capabilities.

`This class is experimental and likely to be removed, or subject to incompatible API
changes in future versions of the library. To avoid breaking changes on upgrading, we
recommend using deserializers directly.`

Derived from the :py:class:`ShareConsumer` class, overriding the
:py:func:`ShareConsumer.poll` method to add deserialization capabilities.

Additional configuration properties:

+-------------------------+---------------------+-----------------------------------------------------+
| Property Name | Type | Description |
+=========================+=====================+=====================================================+
| | | Callable(bytes, SerializationContext) -> obj |
| ``key.deserializer`` | callable | |
| | | Deserializer used for message keys. |
+-------------------------+---------------------+-----------------------------------------------------+
| | | Callable(bytes, SerializationContext) -> obj |
| ``value.deserializer`` | callable | |
| | | Deserializer used for message values. |
+-------------------------+---------------------+-----------------------------------------------------+

Deserializers for string, integer and double (:py:class:`StringDeserializer`, :py:class:`IntegerDeserializer`
and :py:class:`DoubleDeserializer`) are supplied out-of-the-box in the ``confluent_kafka.serialization``
namespace.

Deserializers for Protobuf, JSON Schema and Avro (:py:class:`ProtobufDeserializer`, :py:class:`JSONDeserializer`
and :py:class:`AvroDeserializer`) with Confluent Schema Registry integration are supplied out-of-the-box
in the ``confluent_kafka.schema_registry`` namespace.

Unlike :py:class:`DeserializingConsumer`, :py:func:`poll` returns a *list* of
messages (mirroring :py:class:`ShareConsumer`), and a deserialization failure on
one record does not discard the rest of the batch. A record whose key or value
cannot be deserialized is left in the returned list with its raw bytes intact and
its :py:func:`Message.error` set to a ``_KEY_DESERIALIZATION`` or
``_VALUE_DESERIALIZATION`` error, so the application can detect it with the same
``if msg.error():`` check it already uses for broker errors and acknowledge it
accordingly (e.g. with :py:attr:`AcknowledgeType.REJECT`).

Deserialization mutates each message in place, so the returned messages remain
valid arguments to :py:func:`ShareConsumer.acknowledge` (acknowledgement is keyed
on topic, partition and offset, which are left untouched).

See Also:
- The :ref:`Configuration Guide <pythonclient_configuration>` for in depth information on how to configure the client.
- `CONFIGURATION.md <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_ for a comprehensive set of configuration properties.
- The :py:class:`ShareConsumer` class for inherited methods.

Args:
conf (dict): DeserializingShareConsumer configuration.

Raises:
ValueError: if configuration validation fails
""" # noqa: E501

def __init__(self, conf: Dict[str, Any]) -> None:
conf_copy = conf.copy()
self._key_deserializer = conf_copy.pop('key.deserializer', None)
self._value_deserializer = conf_copy.pop('value.deserializer', None)

super(DeserializingShareConsumer, self).__init__(conf_copy)

def poll(self, timeout: float = -1) -> List[Message]:
"""
Consume messages and deserialize their keys and values in place.

Args:
timeout (float): Maximum time to block waiting for messages (Seconds).

Returns:
list(:py:class:`Message`): A list of messages (possibly empty on
timeout). Each message is the same object returned by the underlying
:py:class:`ShareConsumer`, with its key and value replaced by the
deserialized objects.

Records that arrived with an error (``msg.error()`` is not None) are
returned unchanged. Records whose key or value fails to deserialize are
returned with their raw bytes preserved and ``msg.error()`` set to a
``_KEY_DESERIALIZATION`` or ``_VALUE_DESERIALIZATION`` error. That error
is a :py:class:`KafkaError`, so a caller can tell the two cases apart
with ``msg.error().code()``.
"""

messages = super(DeserializingShareConsumer, self).poll(timeout)
for msg in messages:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check if we want to send error or exception in a message. We should use the pythonic way. Let's add a TODO to discuss it in the final interfaces part.

# broker/transport errors carry no payload to deserialize
if msg.error() is not None:
continue
self._deserialize(msg)
return messages

def _deserialize(self, msg: Message) -> None:
"""
Deserialize a single message's value and key.

Both fields are deserialized into locals and written back to the
message only once *both* succeed, so a deserialization failure leaves
the record's raw key and value bytes untouched (and therefore still
acknowledgeable). On a deserialization failure the record is marked via
:py:func:`Message.set_error` rather than raising, so the rest of the
batch (already fetched from the broker) is not lost. The deserializer
calls are guarded, so a failure marks only this record instead of
aborting the batch.

A message with no topic is a broken invariant rather than a per-record
data error, so it raises :py:exc:`TypeError` (matching
:py:class:`DeserializingConsumer`).
"""

topic = msg.topic()
if topic is None:
raise TypeError("Message topic is None")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add TODO to verify if this is correct or not for GA.

We have 3 options:

  1. TypeError - but user didn't give the topic name so I think TypeError is not correct.
  2. RuntimeError - I feel this is correct as this is internal issue not related to user.
  3. KafkaError - set in the message field so that we don't throw exception.


ctx = SerializationContext(topic, MessageField.VALUE, msg.headers())
try:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move try to after if self._value_deserializer is not None: part.

value = msg.value()
if self._value_deserializer is not None:
value = self._value_deserializer(value, ctx)
except Exception as se:
msg.set_error(KafkaError(KafkaError._VALUE_DESERIALIZATION, str(se)))
return

try:
key = msg.key()
if self._key_deserializer is not None:
ctx.field = MessageField.KEY
key = self._key_deserializer(key, ctx)
except Exception as se:
msg.set_error(KafkaError(KafkaError._KEY_DESERIALIZATION, str(se)))
return
Comment thread
pranavrth marked this conversation as resolved.
Comment thread
pranavrth marked this conversation as resolved.

msg.set_key(key)
msg.set_value(value)
22 changes: 21 additions & 1 deletion tests/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import time
import uuid

from confluent_kafka import Consumer, ShareConsumer
from confluent_kafka import Consumer, DeserializingShareConsumer, ShareConsumer
from confluent_kafka.admin import AlterConfigOpType, ConfigEntry, ConfigResource

_GROUP_PROTOCOL_ENV = 'TEST_CONSUMER_GROUP_PROTOCOL'
Expand All @@ -43,7 +43,7 @@
time.sleep(delay_seconds)
os.kill(os.getpid(), signal.SIGINT)

# TODO KIP-932: broker_version() previously branched on

Check warning on line 46 in tests/common/__init__.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Complete the task associated to this "TODO" comment.

[S1135] Track uses of "TODO" tags See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2265&issues=e4fcd6e6-91e6-4e46-865f-49f7e1bb8f38&open=e4fcd6e6-91e6-4e46-865f-49f7e1bb8f38
# use_group_protocol_consumer() to return '4.0.0' or '3.9.0'. It is now
# hardcoded to '4.2.0' because share groups require >=4.2.0.
# Remove this method if not needed in other contexts
Expand All @@ -70,7 +70,7 @@
'group.share.min.record.lock.duration.ms=1000',
]

# TODO KIP-932: use_kraft() used to honor the TEST_TRIVUP_CLUSTER_TYPE env

Check warning on line 73 in tests/common/__init__.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Complete the task associated to this "TODO" comment.

[S1135] Track uses of "TODO" tags See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2265&issues=4a8c31db-cb29-42de-9fc9-8ec5706080ba&open=4a8c31db-cb29-42de-9fc9-8ec5706080ba
# var (and the now-deleted _trivup_cluster_type_kraft helper) so callers
# could opt into ZooKeeper. It now hardcodes True because broker 4.2.0 is
# KRaft-only. Callers that need ZK (e.g. tests/integration/admin/
Expand Down Expand Up @@ -214,7 +214,7 @@
return received


def poll_ack_commit_loop(

Check failure on line 217 in tests/common/__init__.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Refactor this function to reduce its Cognitive Complexity from 23 to the 15 allowed.

[S3776] Cognitive Complexity of functions should not be too high See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2265&issues=c96e8378-b431-48eb-9549-d7b584d607f1&open=c96e8378-b431-48eb-9549-d7b584d607f1
consumers,
total_messages,
*,
Expand Down Expand Up @@ -292,3 +292,23 @@
if conf:
effective_conf.update(conf)
super().__init__(effective_conf, **kwargs)


class TestDeserializingShareConsumer(DeserializingShareConsumer):
"""Test wrapper around DeserializingShareConsumer.

Mirrors :class:`TestShareConsumer`: injects a default bootstrap.servers so
unit tests can construct an instance without a broker. Share consumers have
their own group protocol, so (unlike TestDeserializingConsumer) there is no
group.protocol fixup here.
"""

__test__ = False # not a pytest collection target despite the Test* prefix

def __init__(self, conf=None, **kwargs):
effective_conf = {
'bootstrap.servers': DEFAULT_BOOTSTRAP_SERVERS,
}
if conf:
effective_conf.update(conf)
super().__init__(effective_conf, **kwargs)
60 changes: 51 additions & 9 deletions tests/integration/cluster_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
)
from confluent_kafka.schema_registry._async.schema_registry_client import AsyncSchemaRegistryClient
from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient
from tests.common import TestConsumer, TestShareConsumer
from tests.common import TestConsumer, TestDeserializingShareConsumer, TestShareConsumer
from tests.common._async.consumer import TestAsyncDeserializingConsumer
from tests.common._async.producer import TestAsyncSerializingProducer
from tests.common.schema_registry import TestDeserializingConsumer
Expand Down Expand Up @@ -134,23 +134,23 @@
Consumer: A new Consumer instance

"""
consumer_conf = self.client_conf({'group.id': str(uuid1()), 'auto.offset.reset': 'earliest'})

Check failure on line 137 in tests/integration/cluster_fixture.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal 'auto.offset.reset' 4 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2265&issues=c2ed395a-a2d8-4c54-af71-f60a66f2da62&open=c2ed395a-a2d8-4c54-af71-f60a66f2da62

Check failure on line 137 in tests/integration/cluster_fixture.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal 'group.id' 5 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2265&issues=15701dc3-e956-444a-938c-60875fd0a0f0&open=15701dc3-e956-444a-938c-60875fd0a0f0

if conf is not None:
consumer_conf.update(conf)

return TestConsumer(consumer_conf)

def share_consumer(self, conf=None):
def _share_consumer_conf(self, conf=None):
"""
Returns a share consumer bound to this cluster.
Builds share-consumer config bound to this cluster, including the
per-group share.auto.offset.reset setup shared by every share consumer.

Args:
conf (dict): ShareConsumer config overrides

Returns:
ShareConsumer: A new TestShareConsumer instance

dict: the resolved share-consumer configuration
"""
share_conf = self.client_conf({'group.id': str(uuid1())})

Expand All @@ -159,7 +159,7 @@

# KIP-932: share.auto.offset.reset is a per-group broker-side config with default as "latest".
# Set to "earliest" so tests don't have to set it in every call.
# TODO KIP-932: this shouldn't live in the share_consumer fixture —
# TODO KIP-932: this shouldn't live in a per-consumer fixture —

Check warning on line 162 in tests/integration/cluster_fixture.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Complete the task associated to this "TODO" comment.

[S1135] Track uses of "TODO" tags See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2265&issues=5d972e24-059a-44d1-95f8-0cc72420430f&open=5d972e24-059a-44d1-95f8-0cc72420430f
# share.auto.offset.reset is a property of the group, not the consumer.
# Re-issuing the alter on every consumer construction for the same
# group.id is unnecessary. Lift this to a per-group setup step.
Expand All @@ -175,10 +175,52 @@
),
],
)
for f in self.admin().incremental_alter_configs([res]).values():
f.result()
for future in self.admin().incremental_alter_configs([res]).values():
future.result()

return share_conf

def share_consumer(self, conf=None):
"""
Returns a share consumer bound to this cluster.

Args:
conf (dict): ShareConsumer config overrides

Returns:
ShareConsumer: A new TestShareConsumer instance

"""
return TestShareConsumer(self._share_consumer_conf(conf))

def deserializing_share_consumer(self, conf=None, key_deserializer=None, value_deserializer=None):
"""
Returns a DeserializingShareConsumer bound to this cluster.

Mirrors :func:`share_consumer` (including the per-group
share.auto.offset.reset=earliest setup) but returns a
DeserializingShareConsumer with the supplied key/value deserializers.

Args:
conf (dict): ShareConsumer config overrides

key_deserializer (Deserializer): deserializer to apply to message key

value_deserializer (Deserializer): deserializer to apply to
message value

Returns:
DeserializingShareConsumer: A new TestDeserializingShareConsumer instance
"""
share_conf = self._share_consumer_conf(conf)

if key_deserializer is not None:
share_conf['key.deserializer'] = key_deserializer

Check failure on line 218 in tests/integration/cluster_fixture.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal 'key.deserializer' 3 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2265&issues=68852308-d909-420b-bdb1-d60c78dfe500&open=68852308-d909-420b-bdb1-d60c78dfe500

if value_deserializer is not None:
share_conf['value.deserializer'] = value_deserializer

Check failure on line 221 in tests/integration/cluster_fixture.py

View check run for this annotation

SonarQube-Confluent / SonarQube Code Analysis

Define a constant instead of duplicating this literal 'value.deserializer' 3 times.

[S1192] String literals should not be duplicated See more on https://sonarqube.confluent.io/project/issues?id=confluent-kafka-python&pullRequest=2265&issues=14e95830-e48b-404a-895e-98dd4a56b0b5&open=14e95830-e48b-404a-895e-98dd4a56b0b5

return TestShareConsumer(share_conf)
return TestDeserializingShareConsumer(share_conf)

def consumer(self, conf=None, key_deserializer=None, value_deserializer=None):
"""
Expand Down
Loading
Loading