Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyrightconfig.stricter.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"stubs/icalendar/icalendar/timezone/provider.pyi",
"stubs/jsonschema",
"stubs/jwcrypto",
"stubs/kafka-python",
"stubs/ldap3",
"stubs/m3u8/m3u8/model.pyi",
"stubs/Markdown",
Expand Down
21 changes: 21 additions & 0 deletions stubs/kafka-python/@tests/stubtest_allowlist.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Command-line entry points are not a useful typed API surface.
kafka.__main__
kafka.admin.__main__
kafka.consumer.__main__
kafka.producer.__main__

# Concrete subclasses define these abstract properties as class attributes.
kafka.protocol.api.Request.API_KEY
kafka.protocol.api.Request.API_VERSION
kafka.protocol.api.Request.RESPONSE_TYPE
kafka.protocol.api.Request.SCHEMA
kafka.protocol.api.Response.API_KEY
kafka.protocol.api.Response.API_VERSION
kafka.protocol.api.Response.SCHEMA

# Vendored compatibility modules are implementation details.
kafka.vendor
kafka.vendor.enum34
kafka.vendor.selectors34
kafka.vendor.six
kafka.vendor.socketpair
2 changes: 2 additions & 0 deletions stubs/kafka-python/@tests/stubtest_allowlist_darwin.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# The bytes subclass object layout differs by platform/Python build.
kafka.protocol.message.PartialMessage
5 changes: 5 additions & 0 deletions stubs/kafka-python/METADATA.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
version = "2.3.*"
upstream-repository = "https://github.com/dpkp/kafka-python"

[tool.stubtest]
stubtest-dependencies = ["pyperf"]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this dependency needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is required by the kafka.benchmark.* modules. But arguably those modules should be deemed internal and allowlisted instead.

13 changes: 13 additions & 0 deletions stubs/kafka-python/kafka/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging

from kafka.admin import KafkaAdminClient as KafkaAdminClient
from kafka.client_async import KafkaClient as KafkaClient
from kafka.conn import BrokerConnection as BrokerConnection
from kafka.consumer import KafkaConsumer as KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener as ConsumerRebalanceListener
from kafka.producer import KafkaProducer as KafkaProducer

__all__ = ["BrokerConnection", "ConsumerRebalanceListener", "KafkaAdminClient", "KafkaClient", "KafkaConsumer", "KafkaProducer"]

class NullHandler(logging.Handler):
def emit(self, record) -> None: ...
30 changes: 30 additions & 0 deletions stubs/kafka-python/kafka/admin/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from kafka.admin.acl_resource import (
ACL as ACL,
ACLFilter as ACLFilter,
ACLOperation as ACLOperation,
ACLPermissionType as ACLPermissionType,
ACLResourcePatternType as ACLResourcePatternType,
ResourcePattern as ResourcePattern,
ResourcePatternFilter as ResourcePatternFilter,
ResourceType as ResourceType,
)
from kafka.admin.client import KafkaAdminClient as KafkaAdminClient
from kafka.admin.config_resource import ConfigResource as ConfigResource, ConfigResourceType as ConfigResourceType
from kafka.admin.new_partitions import NewPartitions as NewPartitions
from kafka.admin.new_topic import NewTopic as NewTopic

__all__ = [
"ConfigResource",
"ConfigResourceType",
"KafkaAdminClient",
"NewTopic",
"NewPartitions",
"ACL",
"ACLFilter",
"ResourcePattern",
"ResourcePatternFilter",
"ACLOperation",
"ResourceType",
"ACLPermissionType",
"ACLResourcePatternType",
]
91 changes: 91 additions & 0 deletions stubs/kafka-python/kafka/admin/acl_resource.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from enum import IntEnum

class ResourceType(IntEnum):
UNKNOWN = 0
ANY = 1
CLUSTER = 4
DELEGATION_TOKEN = 6
GROUP = 3
TOPIC = 2
TRANSACTIONAL_ID = 5

class ACLOperation(IntEnum):
UNKNOWN = 0
ANY = 1
ALL = 2
READ = 3
WRITE = 4
CREATE = 5
DELETE = 6
ALTER = 7
DESCRIBE = 8
CLUSTER_ACTION = 9
DESCRIBE_CONFIGS = 10
ALTER_CONFIGS = 11
IDEMPOTENT_WRITE = 12
CREATE_TOKENS = 13
DESCRIBE_TOKENS = 13

class ACLPermissionType(IntEnum):
UNKNOWN = 0
ANY = 1
DENY = 2
ALLOW = 3

class ACLResourcePatternType(IntEnum):
UNKNOWN = 0
ANY = 1
MATCH = 2
LITERAL = 3
PREFIXED = 4

class ACLFilter:
principal: str | None
host: str | None
operation: ACLOperation
permission_type: ACLPermissionType
resource_pattern: ResourcePatternFilter
def __init__(
self,
principal: str | None,
host: str | None,
operation: ACLOperation,
permission_type: ACLPermissionType,
resource_pattern: ResourcePatternFilter,
) -> None: ...
def validate(self) -> None: ...
def __eq__(self, other): ...
def __hash__(self): ...

class ACL(ACLFilter):
resource_pattern: ResourcePattern
def __init__(
self,
principal: str,
host: str,
operation: ACLOperation,
permission_type: ACLPermissionType,
resource_pattern: ResourcePattern,
) -> None: ...
def validate(self) -> None: ...

class ResourcePatternFilter:
resource_type: ResourceType
resource_name: str | None
pattern_type: ACLResourcePatternType
def __init__(self, resource_type: ResourceType, resource_name: str | None, pattern_type: ACLResourcePatternType) -> None: ...
def validate(self) -> None: ...
def __eq__(self, other): ...
def __hash__(self): ...

class ResourcePattern(ResourcePatternFilter):
resource_name: str
def __init__(
self,
resource_type: ResourceType,
resource_name: str,
pattern_type: ACLResourcePatternType = ACLResourcePatternType.LITERAL,
) -> None: ...
def validate(self) -> None: ...

def valid_acl_operations(int_vals) -> set[ACLOperation]: ...
114 changes: 114 additions & 0 deletions stubs/kafka-python/kafka/admin/client.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import selectors
import ssl
from _typeshed import Incomplete
from collections.abc import Callable, Iterable, Mapping, Sequence
from typing import Literal, TypeAlias, TypedDict, type_check_only
from typing_extensions import Unpack

from kafka.admin.acl_resource import ACL, ACLFilter
from kafka.admin.config_resource import ConfigResource
from kafka.admin.new_partitions import NewPartitions
from kafka.admin.new_topic import NewTopic
from kafka.errors import KafkaError
from kafka.protocol.admin import ElectionType
from kafka.structs import GroupInformation, OffsetAndMetadata, TopicPartition

_ApiVersion: TypeAlias = tuple[int, ...]
_BootstrapServers: TypeAlias = str | Sequence[str]
_KafkaClientFactory: TypeAlias = Callable[..., object]
_SaslMechanism: TypeAlias = Literal["PLAIN", "GSSAPI", "OAUTHBEARER", "SCRAM-SHA-256", "SCRAM-SHA-512"]
_SecurityProtocol: TypeAlias = Literal["PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL"]
_SocketOption: TypeAlias = tuple[int, int, int]

@type_check_only
class _KafkaAdminClientConfig(TypedDict, total=False):
bootstrap_servers: _BootstrapServers
client_id: str
request_timeout_ms: int
connections_max_idle_ms: int
reconnect_backoff_ms: int
reconnect_backoff_max_ms: int
max_in_flight_requests_per_connection: int
receive_buffer_bytes: int | None
send_buffer_bytes: int | None
socket_options: Sequence[_SocketOption]
sock_chunk_bytes: int
sock_chunk_buffer_count: int
retry_backoff_ms: int
metadata_max_age_ms: int
security_protocol: _SecurityProtocol
ssl_context: ssl.SSLContext | None
ssl_check_hostname: bool
ssl_cafile: str | None
ssl_certfile: str | None
ssl_keyfile: str | None
ssl_password: str | None
ssl_crlfile: str | None
api_version: _ApiVersion | None
api_version_auto_timeout_ms: int
selector: type[selectors.BaseSelector]
sasl_mechanism: _SaslMechanism | None
sasl_plain_username: str | None
sasl_plain_password: str | None
sasl_kerberos_name: object | None
sasl_kerberos_service_name: str
sasl_kerberos_domain_name: str | None
sasl_oauth_token_provider: object | None
socks5_proxy: str | None
metric_reporters: Sequence[type[object]]
metrics_num_samples: int
metrics_sample_window_ms: int
kafka_client: _KafkaClientFactory

@type_check_only
class _CreateAclsResult(TypedDict):
succeeded: list[ACL]
failed: list[tuple[ACL, KafkaError]]

log: Incomplete

class KafkaAdminClient:
DEFAULT_CONFIG: Incomplete
config: Incomplete
def __init__(self, **configs: Unpack[_KafkaAdminClientConfig]) -> None: ...
def close(self) -> None: ...
def send_request(self, request, node_id=None): ...
def send_requests(self, requests_and_node_ids, response_fn=...): ...
def create_topics(self, new_topics: Sequence[NewTopic], timeout_ms: int | None = None, validate_only: bool = False): ...
def delete_topics(self, topics: Sequence[str], timeout_ms: int | None = None): ...
def list_topics(self) -> list[str]: ...
def describe_topics(self, topics: Sequence[str] | None = None) -> list[dict[str, Incomplete]]: ...
def describe_cluster(self) -> dict[str, Incomplete]: ...
def describe_acls(self, acl_filter: ACLFilter) -> tuple[list[ACL], KafkaError]: ...
def create_acls(self, acls: Sequence[ACL]) -> _CreateAclsResult: ...
def delete_acls(
self, acl_filters: Sequence[ACLFilter]
) -> list[tuple[ACLFilter, list[tuple[ACL, KafkaError]], KafkaError]]: ...
def describe_configs(self, config_resources: Sequence[ConfigResource], include_synonyms: bool = False): ...
def alter_configs(self, config_resources: Sequence[ConfigResource]): ...
def create_partitions(
self, topic_partitions: Mapping[str, NewPartitions], timeout_ms: int | None = None, validate_only: bool = False
): ...
def delete_records(
self,
records_to_delete: Mapping[TopicPartition, int],
timeout_ms: float | None = None,
partition_leader_id: int | None = None,
) -> dict[TopicPartition, Incomplete]: ...
def describe_consumer_groups(
self, group_ids: Sequence[str], group_coordinator_id: int | None = None, include_authorized_operations: bool = False
) -> list[GroupInformation]: ...
def list_consumer_groups(self, broker_ids: Sequence[int] | None = None) -> list[tuple[str, str]]: ...
def list_consumer_group_offsets(
self, group_id: str, group_coordinator_id: int | None = None, partitions: Iterable[TopicPartition] | None = None
) -> dict[TopicPartition, OffsetAndMetadata]: ...
def delete_consumer_groups(
self, group_ids: Sequence[str], group_coordinator_id: int | None = None
) -> list[tuple[str, KafkaError]]: ...
def perform_leader_election(
self,
election_type: int | ElectionType,
topic_partitions: Mapping[str, Sequence[int]] | None = None,
timeout_ms: int | None = None,
): ...
def describe_log_dirs(self): ...
12 changes: 12 additions & 0 deletions stubs/kafka-python/kafka/admin/config_resource.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from collections.abc import Mapping
from enum import IntEnum

class ConfigResourceType(IntEnum):
BROKER = 4
TOPIC = 2

class ConfigResource:
resource_type: ConfigResourceType
name: str
configs: Mapping[str, str] | None
def __init__(self, resource_type: ConfigResourceType, name: str, configs: Mapping[str, str] | None = None) -> None: ...
6 changes: 6 additions & 0 deletions stubs/kafka-python/kafka/admin/new_partitions.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from collections.abc import Sequence

class NewPartitions:
total_count: int
new_assignments: Sequence[Sequence[int]] | None
def __init__(self, total_count: int, new_assignments: Sequence[Sequence[int]] | None = None) -> None: ...
16 changes: 16 additions & 0 deletions stubs/kafka-python/kafka/admin/new_topic.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from collections.abc import Mapping, Sequence

class NewTopic:
name: str
num_partitions: int
replication_factor: int
replica_assignments: Mapping[int, Sequence[int]] | None
topic_configs: Mapping[str, str] | None
def __init__(
self,
name: str,
num_partitions: int = -1,
replication_factor: int = -1,
replica_assignments: Mapping[int, Sequence[int]] | None = None,
topic_configs: Mapping[str, str] | None = None,
) -> None: ...
Empty file.
18 changes: 18 additions & 0 deletions stubs/kafka-python/kafka/benchmarks/consumer_performance.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import threading
from _typeshed import Incomplete

class ConsumerPerformance:
@staticmethod
def run(args) -> None: ...

class StatsReporter(threading.Thread):
interval: Incomplete
consumer: Incomplete
event: Incomplete
raw_metrics: Incomplete
def __init__(self, interval, consumer, event=None, raw_metrics: bool = False) -> None: ...
def print_stats(self) -> None: ...
def print_final(self) -> None: ...
def run(self) -> None: ...

def get_args_parser(): ...
24 changes: 24 additions & 0 deletions stubs/kafka-python/kafka/benchmarks/load_example.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import threading
from _typeshed import Incomplete

class Producer(threading.Thread):
bootstrap_servers: Incomplete
topic: Incomplete
stop_event: Incomplete
big_msg: Incomplete
def __init__(self, bootstrap_servers, topic, stop_event, msg_size) -> None: ...
sent: int
def run(self) -> None: ...

class Consumer(threading.Thread):
bootstrap_servers: Incomplete
topic: Incomplete
stop_event: Incomplete
msg_size: Incomplete
def __init__(self, bootstrap_servers, topic, stop_event, msg_size) -> None: ...
valid: int
invalid: int
def run(self) -> None: ...

def get_args_parser(): ...
def main(args) -> None: ...
18 changes: 18 additions & 0 deletions stubs/kafka-python/kafka/benchmarks/producer_performance.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import threading
from _typeshed import Incomplete

class ProducerPerformance:
@staticmethod
def run(args) -> None: ...

class StatsReporter(threading.Thread):
interval: Incomplete
producer: Incomplete
event: Incomplete
raw_metrics: Incomplete
def __init__(self, interval, producer, event=None, raw_metrics: bool = False) -> None: ...
def print_stats(self) -> None: ...
def print_final(self) -> None: ...
def run(self) -> None: ...

def get_args_parser(): ...
Loading
Loading