Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
dd45b6b
scaffold oauthbearer-aws extra + subpackage placeholders
prashah-confluent May 27, 2026
8a9610e
Implement KvStringParser + marker constants + extensions parser + JWT…
prashah-confluent May 27, 2026
8219064
Implement AwsOAuthBearerConfig + AwsStsTokenProvider
prashah-confluent May 27, 2026
8ab0309
Implement aws autowire contract
prashah-confluent May 27, 2026
975d1a5
C-extension dispatcher resolve_aws_oauthbearer_marker
prashah-confluent May 27, 2026
f6558de
real-AWS integration test for the autowire path
prashah-confluent May 27, 2026
96c2523
Fix comments
prashah-confluent Jun 4, 2026
3e7da4d
Fix black/isort + flake8
prashah-confluent Jun 8, 2026
92c908d
Add boto3 version check
prashah-confluent Jun 8, 2026
05144a9
Fix black formatting in kv_string_parser.py
prashah-confluent Jun 8, 2026
d6ce11c
Treat unsupported aws_debug values uniformly; drop .NET framing
prashah-confluent Jun 8, 2026
f6293b2
Add example for aws_iam
prashah-confluent Jun 8, 2026
8da9923
Add requirements-oauthbearer-aws entry in requirements-tests-install
prashah-confluent Jun 9, 2026
491ba0e
Add missing entries requirements-oauthbearer-aws.txt
prashah-confluent Jun 9, 2026
df5f96a
Update the version in semaphore.yml
prashah-confluent Jun 9, 2026
31ae270
version update to v2.14.2-aws-iam.2-dev
prashah-confluent Jun 10, 2026
b3aded6
Merge branch 'master' into dev_prashah_aws_iam_tag
prashah-confluent Jun 10, 2026
9069e89
Fix the tag version string
prashah-confluent Jun 10, 2026
d3699c7
Change the version to v2.14.2.dev2
prashah-confluent Jun 10, 2026
bab8859
Remove strip + sentinel
prashah-confluent Jun 11, 2026
003caa2
Disable python github assestations
prashah-confluent Jun 11, 2026
f2cd9de
Update the version to v2.14.2.dev3
prashah-confluent Jun 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .semaphore/publish-test-pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ blocks:
- name: Verify
commands:
- checkout
# mise refuses to install some Python versions when GitHub artifact
# attestations are missing; disable that check for the verify loop.
- export MISE_PYTHON_GITHUB_ATTESTATIONS=false
- sem-version python 3.11
- export CK_VERSION=$(python -c "import tomllib; print(tomllib.load(open('pyproject.toml','rb'))['project']['version'])")
- tools/test-released-wheels.sh $CK_VERSION test
Expand Down
2 changes: 1 addition & 1 deletion .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ execution_time_limit:
global_job_config:
env_vars:
- name: LIBRDKAFKA_VERSION
value: v2.14.2
value: v2.14.2-aws-iam.2-dev
prologue:
commands:
- checkout
Expand Down
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
# Confluent Python Client for Apache Kafka - CHANGELOG

## v2.xx.x
## v2.14.2.dev3

### Enhancements

- Add AWS IAM OAUTHBEARER authentication via optional `oauthbearer-aws` extra. Set
`sasl.oauthbearer.method=oidc` + `sasl.oauthbearer.metadata.authentication.type=aws_iam`
+ `sasl.oauthbearer.config="region=... audience=..."` to autowire a refresh
callback that mints fresh JWTs via AWS STS `GetWebIdentityToken` on every
token refresh, using boto3's default credential chain. Cross-language wire
parity with .NET / JS / Go. See `examples/oauth_oidc_ccloud_aws_iam.py`.

### Fixes

Expand Down
2 changes: 1 addition & 1 deletion examples/docker/Dockerfile.alpine
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ FROM alpine:3.12

COPY . /usr/src/confluent-kafka-python

ENV LIBRDKAFKA_VERSION="v2.14.2"
ENV LIBRDKAFKA_VERSION="v2.14.2-aws-iam.2-dev"
ENV KCAT_VERSION="master"
ENV CKP_VERSION="master"

Expand Down
227 changes: 227 additions & 0 deletions examples/oauth_oidc_ccloud_aws_iam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2026 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""End-to-end example for AWS IAM OAUTHBEARER authentication.

Creates a unique topic, produces messages for ``--run-for`` seconds, and
consumes them back — exercising the autowire path across AdminClient,
Producer, and Consumer.

Activation is config-only: setting
``sasl.oauthbearer.metadata.authentication.type=aws_iam`` is enough. The C
extension detects the marker and wires up the OAUTHBEARER refresh callback —
no ``import confluent_kafka.oauthbearer.aws`` is needed at the call site.

With ``--duration-seconds 60`` (the AWS-STS minimum) and the default
``--run-for 120``, the ``debug=security`` log stream shows librdkafka refresh
the token mid-run (it refreshes at ~80% of the token lifetime).

Install:
pip install 'confluent-kafka[oauthbearer-aws]'

Prerequisites:
1. Runs on AWS compute (EC2 / EKS / ECS / Fargate / Lambda) with an IAM role
attached — boto3's default credential chain resolves it, no static keys.
2. The role's trust policy allows ``sts:GetWebIdentityToken`` for the audience.
3. ``aws iam enable-outbound-web-identity-federation`` has been run once on
the account by an administrator.
4. The role has produce + consume + create-topic rights on the cluster.

To run:
python oauth_oidc_ccloud_aws_iam.py \\
-b pkc-xxxx.aws.confluent.cloud:9092 \\
--region us-east-1 \\
--audience https://confluent.cloud/oidc \\
--extensions logicalCluster=lkc-abc,identityPoolId=pool-xyz
"""

import argparse
import logging
import time
import uuid

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.serialization import StringSerializer


def common_config(args):
"""SASL config shared by Producer, Consumer, and AdminClient."""
conf = {
'bootstrap.servers': args.bootstrap_servers,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
# The four AWS IAM autowire keys: method=oidc is required (the AWS path
# runs inside librdkafka's OIDC subsystem); the marker triggers
# autowiring; the config string is the AWS wire grammar
# (whitespace-separated key=value — region and audience required).
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.metadata.authentication.type': 'aws_iam',
'sasl.oauthbearer.config': f'region={args.region} '
f'audience={args.audience} '
f'duration_seconds={args.duration_seconds}',
# Surfaces the SASL handshake + OAUTHBEARER refresh events on stderr.
'debug': 'security',
}

# Optional SASL extensions (RFC 7628) forwarded verbatim to the broker,
# e.g. logicalCluster=lkc-abc,identityPoolId=pool-xyz
if args.extensions:
conf['sasl.oauthbearer.extensions'] = args.extensions

return conf


def consumer_config(args, group_id):
cfg = common_config(args)
cfg['group.id'] = group_id
cfg['auto.offset.reset'] = 'earliest'
cfg['enable.auto.offset.store'] = False # commit offsets manually
return cfg


def create_topic(admin_conf, topic_name, num_partitions=1, replication_factor=3):
"""Create the topic (RF=3 is the Confluent Cloud default)."""
admin = AdminClient(admin_conf)
futures = admin.create_topics(
[
NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor),
]
)
for topic, future in futures.items():
try:
future.result()
print(f"[admin] Topic '{topic}' created " f"({num_partitions} partition(s), RF={replication_factor})")
except Exception as exc:
print(f"[admin] Failed to create topic '{topic}': {exc}")
raise


def delivery_report(err, msg):
if err is not None:
print(f"[producer] Delivery failed: {err}")
return
print(
f"[producer] Produced to {msg.topic()} [{msg.partition()}] "
f"at offset {msg.offset()}: {msg.value().decode('utf-8')}"
)


def main(args):
# Unique topic + group per run so the example is self-contained.
topic_name = f"aws-iam-{uuid.uuid4()}"
group_id = f"aws-iam-consumer-{uuid.uuid4()}"

p_conf = common_config(args)
c_conf = consumer_config(args, group_id)
a_conf = common_config(args)

logging.basicConfig(level=logging.INFO)

print("\n=== AWS IAM OAUTHBEARER end-to-end example ===")
print(f"bootstrap.servers: {args.bootstrap_servers}")
print(f"region: {args.region}")
print(f"audience: {args.audience}")
print(f"duration_seconds: {args.duration_seconds} " f"(auto-refresh at ~{int(args.duration_seconds * 0.8)}s)")
print(f"run-for: {args.run_for}s")
print(f"topic (generated): {topic_name}")
print(f"group.id (generated): {group_id}\n")

create_topic(a_conf, topic_name)

producer = Producer(p_conf)
consumer = Consumer(c_conf)
consumer.subscribe([topic_name])
serializer = StringSerializer('utf_8')

start = time.time()
end_at = start + args.run_for
produced = 0
consumed = 0

print(
f"[loop] Producing/consuming for {args.run_for}s — "
f"watch the debug=security logs for token-refresh events.\n"
)

try:
while time.time() < end_at:
elapsed = time.time() - start
msg = f"hello-from-aws-iam T+{elapsed:.1f}s"

producer.produce(
topic_name,
value=serializer(msg),
on_delivery=delivery_report,
)
producer.poll(0)
produced += 1

received = consumer.poll(1.0)
if received is None:
pass # poll timeout, no message yet
elif received.error() is not None:
print(f"[consumer] error: {received.error()}")
else:
consumer.store_offsets(received)
consumed += 1
print(
f"[consumer] Received from "
f"{received.topic()} [{received.partition()}] "
f"at offset {received.offset()}: "
f"{received.value().decode('utf-8')}"
)

time.sleep(args.interval)
except KeyboardInterrupt:
print("\n[main] Interrupted — flushing.")
finally:
print(f"\n[summary] Produced {produced}, consumed {consumed} " f"in {time.time() - start:.1f}s. Flushing...")
producer.flush(timeout=10)
consumer.close()
print("[summary] Done.")


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='End-to-end OAUTHBEARER example via AWS IAM autowire ' '(produce + consume + admin).',
)
parser.add_argument('-b', dest='bootstrap_servers', required=True, help='Bootstrap broker(s) (host[:port])')
parser.add_argument('--region', required=True, help='AWS region (e.g. us-east-1)')
parser.add_argument(
'--audience',
required=True,
help='OIDC audience claim the broker expects ' '(e.g. https://confluent.cloud/oidc)',
)
parser.add_argument(
'--extensions',
default=None,
help='Optional sasl.oauthbearer.extensions value ' '(comma-separated key=value pairs)',
)
parser.add_argument(
'--duration-seconds',
dest='duration_seconds',
type=int,
default=60,
help='STS DurationSeconds (default 60 = AWS minimum); ' 'librdkafka auto-refreshes at ~80%% of it.',
)
parser.add_argument(
'--run-for', dest='run_for', type=int, default=120, help='Run duration in seconds (default 120).'
)
parser.add_argument('--interval', type=float, default=5.0, help='Seconds between produce calls (default 5).')

main(parser.parse_args())
12 changes: 8 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "confluent-kafka"
version = "2.14.2"
version = "2.14.2.dev3"
description = "Confluent's Python client for Apache Kafka"
classifiers = [
"Development Status :: 5 - Production/Stable",
Expand Down Expand Up @@ -106,6 +106,7 @@ optional-dependencies.rules = { file = ["requirements/requirements-rules.txt", "
optional-dependencies.avro = { file = ["requirements/requirements-avro.txt", "requirements/requirements-schemaregistry.txt"] }
optional-dependencies.json = { file = ["requirements/requirements-json.txt", "requirements/requirements-schemaregistry.txt"] }
optional-dependencies.protobuf = { file = ["requirements/requirements-protobuf.txt", "requirements/requirements-schemaregistry.txt"] }
optional-dependencies.oauthbearer-aws = { file = ["requirements/requirements-oauthbearer-aws.txt"] }
optional-dependencies.dev = { file = [
"requirements/requirements-docs.txt",
"requirements/requirements-examples.txt",
Expand All @@ -114,7 +115,8 @@ optional-dependencies.dev = { file = [
"requirements/requirements-rules.txt",
"requirements/requirements-avro.txt",
"requirements/requirements-json.txt",
"requirements/requirements-protobuf.txt"] }
"requirements/requirements-protobuf.txt",
"requirements/requirements-oauthbearer-aws.txt"] }
optional-dependencies.docs = { file = [
"requirements/requirements-docs.txt",
"requirements/requirements-schemaregistry.txt",
Expand All @@ -128,7 +130,8 @@ optional-dependencies.tests = { file = [
"requirements/requirements-rules.txt",
"requirements/requirements-avro.txt",
"requirements/requirements-json.txt",
"requirements/requirements-protobuf.txt"] }
"requirements/requirements-protobuf.txt",
"requirements/requirements-oauthbearer-aws.txt"] }
optional-dependencies.examples = { file = ["requirements/requirements-examples.txt"] }
optional-dependencies.soaktest = { file = ["requirements/requirements-soaktest.txt"] }
optional-dependencies.all = { file = [
Expand All @@ -140,7 +143,8 @@ optional-dependencies.all = { file = [
"requirements/requirements-rules.txt",
"requirements/requirements-avro.txt",
"requirements/requirements-json.txt",
"requirements/requirements-protobuf.txt"] }
"requirements/requirements-protobuf.txt",
"requirements/requirements-oauthbearer-aws.txt"] }

[tool.pytest.ini_options]
asyncio_mode = "auto"
Expand Down
3 changes: 2 additions & 1 deletion requirements/requirements-all.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
-r requirements-examples.txt
-r requirements-tests.txt
-r requirements-docs.txt
-r requirements-soaktest.txt
-r requirements-soaktest.txt
-r requirements-oauthbearer-aws.txt
1 change: 1 addition & 0 deletions requirements/requirements-oauthbearer-aws.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
boto3>=1.42.25
1 change: 1 addition & 0 deletions requirements/requirements-tests-install.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
-r requirements-avro.txt
-r requirements-protobuf.txt
-r requirements-json.txt
-r requirements-oauthbearer-aws.txt
tests/trivup/trivup-0.14.0.tar.gz
Loading