Source code for heizer._source.topic

from confluent_kafka import TopicPartition, admin

from heizer.types import Any, Dict, List, Optional


[docs]class Topic: name: str partition: int offset: Optional[int] = None metadata: Optional[str] = None leader_epoch: Optional[int] = None num_partitions: Optional[int] = None replication_factor: Optional[int] = None replica_assignment: Optional[List[Any]] = None config: Optional[Dict[Any, Any]] = None _topic_partition: TopicPartition _new_topic: admin.NewTopic def __init__( self, name: str, partition: Optional[int] = None, offset: Optional[int] = None, metadata: Optional[str] = None, leader_epoch: Optional[int] = None, num_partitions: Optional[int] = None, replication_factor: Optional[int] = None, replica_assignment: Optional[List[Any]] = None, config: Optional[Dict[Any, Any]] = None, ): self.name = name self.partition = partition if partition is not None else -1 self.offset = offset if offset is not None else -1 self.metadata = metadata self.leader_epoch = leader_epoch self.num_partitions = num_partitions if num_partitions is not None else 1 self.replication_factor = replication_factor self.replica_assignment = replica_assignment self.config = config topic_partition_args = { "topic": self.name, "partition": self.partition, } if self.offset is not None: topic_partition_args["offset"] = self.offset if self.metadata is not None: topic_partition_args["metadata"] = self.metadata if self.leader_epoch is not None: topic_partition_args["leader_epoch"] = self.leader_epoch self._topic_partition = TopicPartition(**topic_partition_args) new_topic_args = {"topic": self.name, "num_partitions": self.num_partitions} if self.replication_factor is not None: new_topic_args["replication_factor"] = self.replication_factor if self.replica_assignment is not None: new_topic_args["replica_assignment"] = self.replica_assignment if self.config is not None: new_topic_args["config"] = self.config self._new_topic = admin.NewTopic(**new_topic_args)