Skip to content

[KIP-932] Add deserializing share consumer#2265

Open
Kaushik Raina (k-raina) wants to merge 5 commits into
dev_kip-932_queues-for-kafkafrom
dev_kip-932_share_consumer_deserializing_consumer
Open

[KIP-932] Add deserializing share consumer#2265
Kaushik Raina (k-raina) wants to merge 5 commits into
dev_kip-932_queues-for-kafkafrom
dev_kip-932_share_consumer_deserializing_consumer

Conversation

@k-raina

Copy link
Copy Markdown
Member

Summary

  • Adds DeserializingShareConsumer, a high-level KIP-932 share consumer that deserializes record keys and values via the key.deserializer / value.deserializer config (same callables as DeserializingConsumer).
  • Overrides poll() to return a list of messages (mirroring ShareConsumer) and deserializes each one in place, so the returned messages stay valid arguments to acknowledge().
  • A per-record deserialization failure no longer drops the rest of the batch: the raw bytes are kept and msg.error() is set to KEY_DESERIALIZATION / _VALUE_DESERIALIZATION, so apps can detect it with the usual if msg.error(): check and acknowledge with REJECT.
  • Exports the class from confluent_kafka and adds the missing Message.set_error to the cimpl type stub.
  • Adds unit tests , an integration test, and supporting share-consumer test fixtures

@confluent-cla-assistant

Copy link
Copy Markdown

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@pranavrth Pranav Rathi (pranavrth) left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have provided few comments. Please fix those and then I will review this PR again.

Comment on lines +133 to +137
topic = msg.topic()
if topic is None:
# no topic to deserialize against; flag it rather than raise
msg.set_error(KafkaError(KafkaError._VALUE_DESERIALIZATION, "Message topic is None"))
return

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a TypeError. Check Deserializing Consumer and make sure we are following those conventions.

Comment thread src/confluent_kafka/deserializing_share_consumer.py

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should write integration tests with Avro, Protobuf and JSONSchema as well. A positive case and an error case with each is good enough to make sure that everything is fine.

Comment thread src/confluent_kafka/deserializing_share_consumer.py
Comment thread tests/integration/cluster_fixture.py Outdated
if conf is not None:
share_conf.update(conf)

reset = share_conf.pop('auto.offset.reset', 'earliest')

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect use of the share consumer. Let's fix this even in above share consumer scenario and extract a common private function to reuse the same code.

Comment thread src/confluent_kafka/cimpl.pyi Outdated
def set_headers(self, headers: HeadersType) -> None: ...
def set_key(self, key: Any) -> None: ...
def set_value(self, value: Any) -> None: ...
def set_error(self, error: KafkaError) -> None: ...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to expose this function to the user? If yes then we have to add error handling in the case when the user is sending not error in the error field.

@k-raina Kaushik Raina (k-raina) changed the base branch from dev_kip-932_share_consumer_existing_cb_support to dev_kip-932_queues-for-kafka June 8, 2026 06:57
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_share_consumer_deserializing_consumer branch from fb01516 to 3765147 Compare June 8, 2026 07:04
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_queues-for-kafka branch from d600e92 to 5e41108 Compare June 9, 2026 09:55
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_share_consumer_deserializing_consumer branch from 3765147 to 7087ef4 Compare June 10, 2026 21:47
@k-raina

Copy link
Copy Markdown
Member Author

Below are known test failures fixed in PR https://github.com/confluentinc/confluent-kafka-python/pull/2253/changes#top

1. test_subscribe_with_empty_list_raises
2. test_share_consumer_oauth_expired_token_surfaces_error_cb

if (new_error == Py_None) {
/* None clears the error; Message stores "no error" as NULL
* (Message_error then returns None for it). */
self->error = NULL;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never set error to NULL in python. It should be KafkaError or it should be None.

Comment on lines +598 to +603
if (new_error != Py_None &&
!PyObject_TypeCheck(new_error, &KafkaErrorType)) {
PyErr_SetString(PyExc_TypeError,
"error must be a KafkaError or None");
return NULL;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this breaking change in Changelog as well and add a TODO.


topic = msg.topic()
if topic is None:
raise TypeError("Message topic is None")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add TODO to verify if this is correct or not for GA.

We have 3 options:

  1. TypeError - but user didn't give the topic name so I think TypeError is not correct.
  2. RuntimeError - I feel this is correct as this is internal issue not related to user.
  3. KafkaError - set in the message field so that we don't throw exception.

"""

messages = super(DeserializingShareConsumer, self).poll(timeout)
for msg in messages:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check if we want to send error or exception in a message. We should use the pythonic way. Let's add a TODO to discuss it in the final interfaces part.

raise TypeError("Message topic is None")

try:
ctx = SerializationContext(topic, MessageField.VALUE, msg.headers())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this to outside as it is being used below as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not done.

if topic is None:
raise TypeError("Message topic is None")

try:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move try to after if self._value_deserializer is not None: part.

Comment thread src/confluent_kafka/deserializing_share_consumer.py

@pranavrth Pranav Rathi (pranavrth) left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 comment is missed.

raise TypeError("Message topic is None")

try:
ctx = SerializationContext(topic, MessageField.VALUE, msg.headers())

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not done.

Comment thread src/confluent_kafka/deserializing_share_consumer.py
@airlock-confluentinc airlock-confluentinc Bot force-pushed the dev_kip-932_share_consumer_deserializing_consumer branch from 2fabcb1 to 2dce544 Compare June 12, 2026 09:59

@pranavrth Pranav Rathi (pranavrth) left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. I am approving the PR but I would prefer a pass from Robert Yokota (@rayokota) or somebody from the @confluentinc/data-governance team to review this as well.

@sonarqube-confluent

Copy link
Copy Markdown

Quality Gate failed Quality Gate failed

Failed conditions
9.3% Coverage on New Code (required ≥ 80%)

See analysis details on SonarQube

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants