Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .formatignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ tests/integration/schema_registry/data/proto/TestProto_pb2.py
tests/integration/schema_registry/data/proto/common_proto_pb2.py
tests/integration/schema_registry/data/proto/exampleProtoCriteo_pb2.py
tests/integration/schema_registry/data/proto/metadata_proto_pb2.py

# Hand-maintained type stub. tools/style-format.sh runs clang-format on any
# file that isn't .py, and that corrupts the stub. Keep it out.
src/confluent_kafka/cimpl.pyi
2 changes: 1 addition & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ global_job_config:
value: v2.14.2
# TODO KIP-932: Remove LIBRDKAFKA_BRANCH once LIBRDKAFKA_VERSION includes share consumer support
- name: LIBRDKAFKA_BRANCH
value: dev_kip-932_queues-for-kafka_additional_api_python
value: dev_kip-932_queues-for-kafka
prologue:
commands:
- checkout
Expand Down
1 change: 1 addition & 0 deletions src/confluent_kafka/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@
# These are defined here to avoid circular imports
DeliveryCallback = Callable[[Optional[Any], Any], None] # (KafkaError, Message) -> None
RebalanceCallback = Callable[[Any, List[Any]], None] # (Consumer, List[TopicPartition]) -> None
AcknowledgementCommitCallback = Callable[[Dict[Any, Any], Optional[Any]], None] # (offsets, KafkaException) -> None
25 changes: 15 additions & 10 deletions src/confluent_kafka/cimpl.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ maintenance burden and get type hints directly from the implementation.
"""

import builtins
from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union, overload
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, Union, overload

try:
from typing import Self
Expand All @@ -51,6 +51,10 @@ from ._types import HeadersType
# Callback types with proper class references (defined locally to avoid circular imports)
DeliveryCallback = Callable[[Optional['KafkaError'], 'Message'], None]
RebalanceCallback = Callable[['Consumer', List['TopicPartition']], None]
# (offsets, exception) — note the order is offsets-first, opposite of on_commit.
AcknowledgementCommitCallback = Callable[
[Dict['TopicPartition', Set[int]], Optional['KafkaException']], None
]

# ===== CLASSES (Manual - stubgen missed these) =====

Expand Down Expand Up @@ -574,22 +578,23 @@ class ShareConsumer:
def subscribe(self, topics: List[str]) -> None: ...
def unsubscribe(self) -> None: ...
def subscription(self) -> List[str]: ...
# TODO KIP-932: poll() returns List[Message] today. Java returns a
# dedicated container object (ConsumerRecords) instead of a list so it
# can carry extra metadata alongside the records. Replace List[Message]
# with a Messages container class once we have a clear use for that
# metadata in Python.
# TODO KIP-932: poll() returns List[Message] today. Replace it with a
# Messages container class once we have a clear use for carrying extra
# metadata alongside the records.
def poll(self, timeout: float = -1) -> List[Message]: ...
def acknowledge(self, message: Message, ack_type: AcknowledgeType = ...) -> None: ...
def acknowledge_offset(
self, topic: str, partition: int, offset: int, ack_type: AcknowledgeType = ...
) -> None: ...
# TODO KIP-932: Java's share-consumer commit returns a map keyed by
# TopicIdPartition (topic name + topic UUID + partition). Python uses
# the existing TopicPartition (no UUID) for now. Add a TopicIdPartition
# class once the interface is finalized.
# TODO KIP-932: commit_sync() is keyed by the existing TopicPartition
# (no UUID) for now. A future TopicIdPartition (topic name + topic UUID
# + partition) would carry the UUID; add that class once the interface
# is finalized.
def commit_sync(self, timeout: float = 60) -> Dict[TopicPartition, Optional[KafkaError]]: ...
def commit_async(self) -> None: ...
def set_acknowledgement_commit_callback(
self, callback: Optional[AcknowledgementCommitCallback]
) -> None: ...
def close(self) -> None: ...
def __enter__(self) -> "ShareConsumer": ...
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
Expand Down
205 changes: 154 additions & 51 deletions src/confluent_kafka/kafkatest/README.md
Original file line number Diff line number Diff line change
@@ -1,89 +1,192 @@
# Running Apache Kafka's client system-tests (kafkatests) with the Python client
# Verifiable Clients for Apache Kafka System Tests

Apache Kafka's system-tests (also called kafkatests) allow the Java
Producer and Consumer to be replaced with an alternate client using the
pluggable VerifiableClient mixin.
Two Python programs that implement Apache Kafka's "verifiable client"
contract so the Ducktape-based Python system tests in `apache/kafka` can
exercise confluent-kafka-python the same way they exercise the Java client:

These instructions walks you through the required steps to run
the Confluent Kafka Python client with kafkatests.
- `verifiable_producer.py` — produces messages with optional throttling
- `verifiable_share_consumer.py` — KIP-932 share consumer

## Pre-requisites
Each program accepts a standardized CLI and emits newline-delimited JSON
events on stdout. The Python harness in
`apache/kafka/tests/kafkatest/services/verifiable_*.py` parses those events
to track progress and verify correctness.

The assumption is that the host system is running Linux with Docker installed,
and that the kafkatest environment is set up according to the instructions in
the Kafka repository's `tests/README.md` file.
Both clients share `verifiable_client.py`, which provides event emission,
signal handling, and Java -> librdkafka config translation.

Only the Vagrant-based kafkatest runs are supported.
(Ducker (docker-based) kafkatest runs are currently not supported due
to deployment issues.)
## Events

1. Clone kafka into a directory of your choice
(hereby after referred to as `$KAFKA_DIR`)
2. Build Kafka
3. Set up the Vagrant-based kafkatest environmnent according
to `$KAFKA_DIR/tests/README.md`
4. Run `vagrant status` to verify that workers are runnning.
Both clients emit `startup_complete` on start and `shutdown_complete` after
a clean SIGTERM/SIGINT. Beyond that:

**Producer**
- `producer_send_success` `{topic, partition, offset, key, value}`
- `producer_send_error` `{topic, key, value, message, exception}`
- `tool_data` `{sent, acked, target_throughput, avg_throughput}` (final summary)

**Share consumer**
- `records_consumed` `{count, partitions:[{topic, partition, count, offsets:[]}]}`
- `offsets_acknowledged` `{count, partitions:[...], success, error?}`
- `record_data` `{topic, partition, offset, key, value}` (only with `--verbose`)
- `offset_reset_strategy_set` `{offsetResetStrategy}` (only with `--offset-reset-strategy`)

## Create self-contained wheels
All events also carry a `timestamp` field (epoch milliseconds).

To ease deployment of the Python client and its dependencies to
the kafkatest worker instances, we'll build self-contained binary linux
wheels.
## CLI

From the confluent-kafka-python top-level directory, run:
Both accept `--bootstrap-server` / `--broker-list`, `--command-config <file>`
(Java properties), `-X key=value` (raw librdkafka properties), and `--debug
<flags>`.

$ CIBW_SKIP="cp3* *i686*" tools/cibuildwheel-build.sh wheels
**Producer:** `--topic`, `--max-messages` (-1=infinite), `--throughput`
(-1=unlimited), `--acks`, `--value-prefix`, `--repeating-keys`,
`--message-create-time`, `--producer.config` (deprecated alias of
`--command-config`).

After about 5 minutes the resulting Python wheels should be available in
the wheels/ directory.
**Share consumer:** `--topic`, `--group-id`, `--max-messages` (-1=infinite),
`--acknowledgement-mode auto|sync|async`, `--offset-reset-strategy
earliest|latest`, `--verbose`.

The CIBW_SKIP part makes sure only Python 2.7-x64 packages are built.
The three acknowledgement modes all run with
`share.acknowledgement.mode=implicit` (poll() auto-accepts the previous
batch); they differ only in how they commit:

| mode | commit | offsets_acknowledged source |
|-------|-----------------------|-------------------------------------|
| auto | implicit (next poll) | acknowledgement-commit callback |
| sync | `commit_sync()` | acknowledgement-commit callback |
| async | `commit_async()` | acknowledgement-commit callback |

## Prepare deploy directory
`offsets_acknowledged` is always emitted from the acknowledgement-commit
callback (`ShareConsumer.set_acknowledgement_commit_callback`), so async/auto
acks carry real broker per-partition results, not optimistic guesses.

The Python wheels and a deploy script needs to be copied to a location reachable
from the kafkatest worker instances (which is anywhere in `$KAFKA_DIR`).
Run the provided script to take care of this:
## How System Tests Work

$ confluent_kafka/kafkatests/deploy.sh --prepare $KAFKA_DIR wheels
Apache Kafka's system tests run on
[Ducktape](https://github.com/confluentinc/ducktape). A driver container
orchestrates worker containers over SSH: it spins up brokers, starts the
producer/consumer programs, and parses their stdout events.

Synchronize shared directory with workers:
To plug in a custom client, point Ducktape at a `--globals` file. The Python
harness resolves `VerifiableProducer` / `VerifiableShareConsumer` to
`VerifiableClientApp`, which on each worker:

$ cd $KAFKA_DIR
$ vagrant rsync
1. Runs the `deploy` script once before the first test.
2. Invokes `exec_cmd` (with the harness-supplied flags appended) and parses
its stdout as JSON events.

This directory's `globals.json` wires both clients to
`python -m confluent_kafka.kafkatest.verifiable_*` running from a virtualenv
built by `deploy.sh`. Paths assume the confluent-kafka-python repo is mounted
at `/confluent-kafka-python` and a librdkafka checkout at `/librdkafka` inside
each worker.

## Verify Python client setup
`deploy.sh` apt-installs build deps, builds and installs the **mounted**
librdkafka (this branch needs a newer librdkafka than any distro
`librdkafka-dev` package provides — KIP-932 share APIs, IncrementalAlterConfigs,
the modern admin types — so the apt package is intentionally not used), creates
a virtualenv at `/opt/cfk-python/venv`, and `pip install -e`'s the repo into it,
linking the C extension against the just-installed librdkafka. It honors
`REPO_DIR` (default `/confluent-kafka-python`), `LIBRDKAFKA_DIR` (default
`/librdkafka`), `VENV_DIR` (default `/opt/cfk-python/venv`), and
`DEPLOY_SKIP_APT=1` for pre-provisioned images. It uses a flock'd sentinel so
concurrent deploys sharing the librdkafka mount don't race the build.

$ cd $KAFKA_DIR
$ ducktape --globals tests/confluent-kafka-python/globals.json tests/kafkatest/tests/client/pluggable_test.py
$ grep confluent-kafka-python results/latest/PluggableConsumerTest/test_start_stop/1/test_log.debug
## Running Share Consumer Tests

The test should PASS and the grep command should return some output.
Assumes Docker is running with ~10 GB of free RAM for containers.

### Prerequisites

## Run kafkatests
- A checkout of `confluentinc/confluent-kafka-python` (this repo)
- A checkout of `confluentinc/librdkafka` with the KIP-932 share APIs
- A checkout of `apache/kafka`

$ cd $KAFKA_DIR
```bash
export CFK_PYTHON_DIR=/path/to/confluent-kafka-python
export LIBRDKAFKA_DIR=/path/to/librdkafka
export KAFKA_DIR=/path/to/apache/kafka
```

To run a sub-set of tests:
### 1. Compile apache/kafka's system-test libraries

$ ducktape --globals tests/confluent-kafka-python/globals.json tests/kafkatest/tests/client/consumer_test.py::AssignmentValidationTest
```bash
cd "$KAFKA_DIR"
./gradlew systemTestLibs
```

To run the full kafkatest client test-suite:
First run takes ~10 minutes; subsequent runs are fast.

$ ducktape --globals tests/confluent-kafka-python/globals.json tests/kafkatest/tests/client
### 2. Patch ducker-ak to mount confluent-kafka-python and librdkafka

Open `$KAFKA_DIR/tests/docker/ducker-ak`, find the `docker_run()` function,
and add `-v` flags for both repos:

```diff
must_do -v ${container_runtime} run --init --privileged \
-d -t -h "${node}" --network ducknet "${expose_ports}" \
--memory=${docker_run_memory_limit} ... \
- -v "${kafka_dir}:/opt/kafka-dev" --name "${node}" -- "${image_name}"
+ -v "${kafka_dir}:/opt/kafka-dev" \
+ ${CFK_PYTHON_DIR:+-v "${CFK_PYTHON_DIR}:/confluent-kafka-python"} \
+ ${LIBRDKAFKA_DIR:+-v "${LIBRDKAFKA_DIR}:/librdkafka"} \
+ --name "${node}" -- "${image_name}"
```

This is a one-time local edit.

## Stand-alone usage (fwiw)
### 3. Make `deploy.sh` executable

The kafkatest client can be ran directly with:
```bash
chmod +x "$CFK_PYTHON_DIR/src/confluent_kafka/kafkatest/deploy.sh"
```

python -m confluent_kafka.kafkatest.verifiable_consumer <options>
### 4. Bring up the cluster and run the tests

python -m confluent_kafka.kafkatest.verifiable_producer <options>
```bash
cd "$KAFKA_DIR"
CFK_PYTHON_DIR=$CFK_PYTHON_DIR LIBRDKAFKA_DIR=$LIBRDKAFKA_DIR ./tests/docker/ducker-ak up

CFK_PYTHON_DIR=$CFK_PYTHON_DIR LIBRDKAFKA_DIR=$LIBRDKAFKA_DIR ./tests/docker/ducker-ak test \
tests/kafkatest/tests/client/share_consumer_test.py \
-- --globals /confluent-kafka-python/src/confluent_kafka/kafkatest/globals.json
```

The harness runs `deploy.sh` once per worker before the first test; that
script builds the virtualenv and installs the client. The first test starts a
few minutes after launch (deploy time); subsequent tests reuse the venv.

### 5. View results

```bash
cat "$KAFKA_DIR/results/latest/report.txt"
```

Per-test stdout (debugging):

```bash
find "$KAFKA_DIR/results/latest" -name "verifiable_*.stdout"
```

### 6. Tear down

```bash
cd "$KAFKA_DIR"
./tests/docker/ducker-ak down
```

## Stand-alone usage

The clients can be run directly for local debugging:

```bash
python -m confluent_kafka.kafkatest.verifiable_producer \
--topic t --bootstrap-server localhost:9092 --max-messages 10

python -m confluent_kafka.kafkatest.verifiable_share_consumer \
--topic t --group-id g --bootstrap-server localhost:9092 \
--acknowledgement-mode sync --offset-reset-strategy earliest \
--max-messages 10
```
Loading