KafkaConsumer
public class KafkaConsumer: KafkaClient
A KafkaClient
for consuming messages on a topic from a broker.
Usage Example:
do {
let config = KafkaConfig()
config.groupId = "Kitura"
config.autoOffsetReset = .beginning
let consumer = try KafkaConsumer(config: config)
consumer.connect(brokers: "localhost:9092")
try consumer.subscribe(topics: ["test"])
let records = consumer.poll()
print(records)
} catch {
print("Error creating consumer: \(error)")
}
-
Create a new
KafkaConsumer
. If a groupID is not provided in the config, a random UUID will be used.Declaration
Swift
public init(config: KafkaConfig = KafkaConfig()) throws
Parameters
config
The
KafkaConfig
that will configure your Kafka consumer. -
Subscribe to one or more topics. The consumer will be assigned partitions based on its consumer group. The subscribe() method is asynchronouswhich and will returns immediately. Background threads will (re)join the group, wait for group rebalance, issue any registered rebalance_cb, assign() the assigned partitions, and then start fetching messages. This cycle may take up to session.timeout.ms * 2 or more to complete.
Throws
AKafkaError
if the consumer fails to subscribe the the topic.Declaration
Swift
public func subscribe(topics: [String]) throws
Parameters
topics
An array of Kafka topics to subscribe to.
-
Assign this consumer to a single topic with a specified partition to consume from. The offset to begin consuming from can also be specified, otherwise the consumer default to the end of the current messages.
Throws
AKafkaError
if the consumer fails to subscribe the the topic.Declaration
Swift
public func assign(topic: String, partition: Int, offset: Int = Int(RD_KAFKA_OFFSET_END)) throws
Parameters
topic
The Kafka topic to subscribe to.
partition
The topic partition to consume from.
offset
The topic offset to begin consuming from. Defaults to -1 representing the end of current messages.
-
Consume messages from the topic you are subscribed to. The messages will be consumed from your last call to poll. This function will block for at most timeout seconds.
Declaration
Swift
public func poll(timeout: TimeInterval = 1) throws -> [KafkaConsumerRecord]
Parameters
timeout
The maximum
TimeInterval
in seconds that this call will block for while consuming messages.Return Value
An array of
KafkaConsumerRecord
that have been consumed. -
Commit the offsets for the current partition assignment on the broker.
This marks the records since the last poll as processed so they will not be reassigned during a rebalance. If you are usingcommitSync()
,enableAutoCommit
onKafkaConfig
should be set to false.Declaration
Swift
public func commitSync() throws
-
Close down the
KafkaConsumer
. This call will block until the consumer has revoked its assignment. The maximum blocking time is roughly limited to session.timeout.ms. If you don’t call this function, the consumer will be closed when it is deinitialized.Declaration
Swift
public func close() throws