kafka.consumer package

Submodules

kafka.consumer.base module

class kafka.consumer.base.Consumer(client, group, topic, partitions=None, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000)

Bases: object

Base class to be used by other consumers. Not to be used directly

This base class provides logic for

  • initialization and fetching metadata of partitions
  • Auto-commit logic
  • APIs for fetching pending message count
commit(partitions=None)

Commit offsets for this consumer

Keyword Arguments:
 partitions (list) – list of partitions to commit, default is to commit all of them
fetch_last_known_offsets(partitions=None)
pending(partitions=None)

Gets the pending message count

Keyword Arguments:
 partitions (list) – list of partitions to check for, default is to check all
stop()

kafka.consumer.kafka module

class kafka.consumer.kafka.KafkaConsumer(*topics, **configs)

Bases: object

A simpler kafka consumer

# A very basic 'tail' consumer, with no stored offset management
kafka = KafkaConsumer('topic1')
for m in kafka:
  print m

# Alternate interface: next()
print kafka.next()

# Alternate interface: batch iteration
while True:
  for m in kafka.fetch_messages():
    print m
  print "Done with batch - let's do another!"
# more advanced consumer -- multiple topics w/ auto commit offset management
kafka = KafkaConsumer('topic1', 'topic2',
                      group_id='my_consumer_group',
                      auto_commit_enable=True,
                      auto_commit_interval_ms=30 * 1000,
                      auto_offset_reset='smallest')

# Infinite iteration
for m in kafka:
  process_message(m)
  kafka.task_done(m)

# Alternate interface: next()
m = kafka.next()
process_message(m)
kafka.task_done(m)

# If auto_commit_enable is False, remember to commit() periodically
kafka.commit()

# Batch process interface
while True:
  for m in kafka.fetch_messages():
    process_message(m)
    kafka.task_done(m)

messages (m) are namedtuples with attributes:

  • m.topic: topic name (str)
  • m.partition: partition number (int)
  • m.offset: message offset on topic-partition log (int)
  • m.key: key (bytes - can be None)
  • m.value: message (output of deserializer_class - default is raw bytes)

Configuration settings can be passed to constructor, otherwise defaults will be used:

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi

commit()

Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.

Note: this functionality requires server version >=0.8.1.1 See this wiki page.

configure(**configs)

Configuration settings can be passed to constructor, otherwise defaults will be used:

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
auto_commit_interval_messages=None,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi

fetch_messages()

Sends FetchRequests for all topic/partitions set for consumption Returns a generator that yields KafkaMessage structs after deserializing with the configured deserializer_class

Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy

Key configuration parameters:

  • fetch_message_max_bytes
  • fetch_max_wait_ms
  • fetch_min_bytes
  • deserializer_class
  • auto_offset_reset
get_partition_offsets(topic, partition, request_time_ms, max_num_offsets)

Request available fetch offsets for a single topic/partition

Parameters:
  • (str) (topic) –
  • (int) (max_num_offsets) –
  • request_time_ms (int) – Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.
  • (int)
Returns:

offsets (list)

next()

Return a single message from the message iterator If consumer_timeout_ms is set, will raise ConsumerTimeout if no message is available Otherwise blocks indefinitely

Note that this is also the method called internally during iteration:

for m in consumer:
    pass
offsets(group=None)
Keyword Arguments:
 group – Either “fetch”, “commit”, “task_done”, or “highwater”. If no group specified, returns all groups.
Returns:A copy of internal offsets struct
set_topic_partitions(*topics)

Set the topic/partitions to consume Optionally specify offsets to start from

Accepts types:

  • str (utf-8): topic name (will consume all available partitions)

  • tuple: (topic, partition)

  • dict:
    • { topic: partition }
    • { topic: [partition list] }
    • { topic: (partition tuple,) }

Optionally, offsets can be specified directly:

  • tuple: (topic, partition, offset)
  • dict: { (topic, partition): offset, ... }

Example:

kafka = KafkaConsumer()

# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})

# Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
# using tuples --
kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))

# using dict --
kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
task_done(message)

Mark a fetched message as consumed. Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()

class kafka.consumer.kafka.OffsetsStruct

Bases: tuple

OffsetsStruct(fetch, highwater, commit, task_done)

__getnewargs__()

Return self as a plain tuple. Used by copy and pickle.

__getstate__()

Exclude the OrderedDict from pickling

__repr__()

Return a nicely formatted representation string

commit

Alias for field number 2

fetch

Alias for field number 0

highwater

Alias for field number 1

task_done

Alias for field number 3

kafka.consumer.multiprocess module

class kafka.consumer.multiprocess.MultiProcessConsumer(client, group, topic, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0)

Bases: kafka.consumer.base.Consumer

A consumer implementation that consumes partitions for a topic in parallel using multiple processes

Parameters:
  • client – a connected KafkaClient
  • group – a name for this consumer, used for offset storage and must be unique
  • topic – the topic to consume
Keyword Arguments:
 
  • auto_commit – default True. Whether or not to auto commit the offsets
  • auto_commit_every_n – default 100. How many messages to consume before a commit
  • auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
  • num_procs – Number of processes to start for consuming messages. The available partitions will be divided among these processes
  • partitions_per_proc – Number of partitions to be allocated per process (overrides num_procs)

Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers

__iter__()

Iterator to consume the messages available on this consumer

get_messages(count=1, block=True, timeout=10)

Fetch the specified number of messages

Keyword Arguments:
 
  • count – Indicates the maximum number of messages to be fetched
  • block – If True, the API will block till some messages are fetched.
  • timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
stop()

kafka.consumer.simple module

class kafka.consumer.simple.FetchContext(consumer, block, timeout)

Bases: object

Class for managing the state of a consumer during fetch

__enter__()

Set fetch values based on blocking status

__exit__(type, value, traceback)

Reset values

class kafka.consumer.simple.SimpleConsumer(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None)

Bases: kafka.consumer.base.Consumer

A simple consumer implementation that consumes all/specified partitions for a topic

Parameters:
  • client – a connected KafkaClient
  • group – a name for this consumer, used for offset storage and must be unique
  • topic – the topic to consume
Keyword Arguments:
 
  • partitions – An optional list of partitions to consume the data from
  • auto_commit – default True. Whether or not to auto commit the offsets
  • auto_commit_every_n – default 100. How many messages to consume before a commit
  • auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
  • fetch_size_bytes – number of bytes to request in a FetchRequest
  • buffer_size – default 4K. Initial number of bytes to tell kafka we have available. This will double as needed.
  • max_buffer_size – default 16K. Max number of bytes to tell kafka we have available. None means no limit.
  • iter_timeout – default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever.

Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers

get_message(block=True, timeout=0.1, get_partition_info=None)
get_messages(count=1, block=True, timeout=0.1)

Fetch the specified number of messages

Keyword Arguments:
 
  • count – Indicates the maximum number of messages to be fetched
  • block – If True, the API will block till some messages are fetched.
  • timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
provide_partition_info()

Indicates that partition info must be returned by the consumer

seek(offset, whence)

Alter the current offset in the consumer, similar to fseek

Parameters:
  • offset – how much to modify the offset
  • whence

    where to modify it from

    • 0 is relative to the earliest available offset (head)
    • 1 is relative to the current offset
    • 2 is relative to the latest known offset (tail)

Module contents

class kafka.consumer.SimpleConsumer(client, group, topic, auto_commit=True, partitions=None, auto_commit_every_n=100, auto_commit_every_t=5000, fetch_size_bytes=4096, buffer_size=4096, max_buffer_size=32768, iter_timeout=None)

Bases: kafka.consumer.base.Consumer

A simple consumer implementation that consumes all/specified partitions for a topic

Parameters:
  • client – a connected KafkaClient
  • group – a name for this consumer, used for offset storage and must be unique
  • topic – the topic to consume
Keyword Arguments:
 
  • partitions – An optional list of partitions to consume the data from
  • auto_commit – default True. Whether or not to auto commit the offsets
  • auto_commit_every_n – default 100. How many messages to consume before a commit
  • auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
  • fetch_size_bytes – number of bytes to request in a FetchRequest
  • buffer_size – default 4K. Initial number of bytes to tell kafka we have available. This will double as needed.
  • max_buffer_size – default 16K. Max number of bytes to tell kafka we have available. None means no limit.
  • iter_timeout – default None. How much time (in seconds) to wait for a message in the iterator before exiting. None means no timeout, so it will wait forever.

Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers

get_message(block=True, timeout=0.1, get_partition_info=None)
get_messages(count=1, block=True, timeout=0.1)

Fetch the specified number of messages

Keyword Arguments:
 
  • count – Indicates the maximum number of messages to be fetched
  • block – If True, the API will block till some messages are fetched.
  • timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
provide_partition_info()

Indicates that partition info must be returned by the consumer

seek(offset, whence)

Alter the current offset in the consumer, similar to fseek

Parameters:
  • offset – how much to modify the offset
  • whence

    where to modify it from

    • 0 is relative to the earliest available offset (head)
    • 1 is relative to the current offset
    • 2 is relative to the latest known offset (tail)
class kafka.consumer.MultiProcessConsumer(client, group, topic, auto_commit=True, auto_commit_every_n=100, auto_commit_every_t=5000, num_procs=1, partitions_per_proc=0)

Bases: kafka.consumer.base.Consumer

A consumer implementation that consumes partitions for a topic in parallel using multiple processes

Parameters:
  • client – a connected KafkaClient
  • group – a name for this consumer, used for offset storage and must be unique
  • topic – the topic to consume
Keyword Arguments:
 
  • auto_commit – default True. Whether or not to auto commit the offsets
  • auto_commit_every_n – default 100. How many messages to consume before a commit
  • auto_commit_every_t – default 5000. How much time (in milliseconds) to wait before commit
  • num_procs – Number of processes to start for consuming messages. The available partitions will be divided among these processes
  • partitions_per_proc – Number of partitions to be allocated per process (overrides num_procs)

Auto commit details: If both auto_commit_every_n and auto_commit_every_t are set, they will reset one another when one is triggered. These triggers simply call the commit method on this class. A manual call to commit will also reset these triggers

__iter__()

Iterator to consume the messages available on this consumer

get_messages(count=1, block=True, timeout=10)

Fetch the specified number of messages

Keyword Arguments:
 
  • count – Indicates the maximum number of messages to be fetched
  • block – If True, the API will block till some messages are fetched.
  • timeout – If block is True, the function will block for the specified time (in seconds) until count messages is fetched. If None, it will block forever.
stop()
class kafka.consumer.KafkaConsumer(*topics, **configs)

Bases: object

A simpler kafka consumer

# A very basic 'tail' consumer, with no stored offset management
kafka = KafkaConsumer('topic1')
for m in kafka:
  print m

# Alternate interface: next()
print kafka.next()

# Alternate interface: batch iteration
while True:
  for m in kafka.fetch_messages():
    print m
  print "Done with batch - let's do another!"
# more advanced consumer -- multiple topics w/ auto commit offset management
kafka = KafkaConsumer('topic1', 'topic2',
                      group_id='my_consumer_group',
                      auto_commit_enable=True,
                      auto_commit_interval_ms=30 * 1000,
                      auto_offset_reset='smallest')

# Infinite iteration
for m in kafka:
  process_message(m)
  kafka.task_done(m)

# Alternate interface: next()
m = kafka.next()
process_message(m)
kafka.task_done(m)

# If auto_commit_enable is False, remember to commit() periodically
kafka.commit()

# Batch process interface
while True:
  for m in kafka.fetch_messages():
    process_message(m)
    kafka.task_done(m)

messages (m) are namedtuples with attributes:

  • m.topic: topic name (str)
  • m.partition: partition number (int)
  • m.offset: message offset on topic-partition log (int)
  • m.key: key (bytes - can be None)
  • m.value: message (output of deserializer_class - default is raw bytes)

Configuration settings can be passed to constructor, otherwise defaults will be used:

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi

commit()

Store consumed message offsets (marked via task_done()) to kafka cluster for this consumer_group.

Note: this functionality requires server version >=0.8.1.1 See this wiki page.

configure(**configs)

Configuration settings can be passed to constructor, otherwise defaults will be used:

client_id='kafka.consumer.kafka',
group_id=None,
fetch_message_max_bytes=1024*1024,
fetch_min_bytes=1,
fetch_wait_max_ms=100,
refresh_leader_backoff_ms=200,
metadata_broker_list=None,
socket_timeout_ms=30*1000,
auto_offset_reset='largest',
deserializer_class=lambda msg: msg,
auto_commit_enable=False,
auto_commit_interval_ms=60 * 1000,
auto_commit_interval_messages=None,
consumer_timeout_ms=-1

Configuration parameters are described in more detail at http://kafka.apache.org/documentation.html#highlevelconsumerapi

fetch_messages()

Sends FetchRequests for all topic/partitions set for consumption Returns a generator that yields KafkaMessage structs after deserializing with the configured deserializer_class

Refreshes metadata on errors, and resets fetch offset on OffsetOutOfRange, per the configured auto_offset_reset policy

Key configuration parameters:

  • fetch_message_max_bytes
  • fetch_max_wait_ms
  • fetch_min_bytes
  • deserializer_class
  • auto_offset_reset
get_partition_offsets(topic, partition, request_time_ms, max_num_offsets)

Request available fetch offsets for a single topic/partition

Parameters:
  • (str) (topic) –
  • (int) (max_num_offsets) –
  • request_time_ms (int) – Used to ask for all messages before a certain time (ms). There are two special values. Specify -1 to receive the latest offset (i.e. the offset of the next coming message) and -2 to receive the earliest available offset. Note that because offsets are pulled in descending order, asking for the earliest offset will always return you a single element.
  • (int)
Returns:

offsets (list)

next()

Return a single message from the message iterator If consumer_timeout_ms is set, will raise ConsumerTimeout if no message is available Otherwise blocks indefinitely

Note that this is also the method called internally during iteration:

for m in consumer:
    pass
offsets(group=None)
Keyword Arguments:
 group – Either “fetch”, “commit”, “task_done”, or “highwater”. If no group specified, returns all groups.
Returns:A copy of internal offsets struct
set_topic_partitions(*topics)

Set the topic/partitions to consume Optionally specify offsets to start from

Accepts types:

  • str (utf-8): topic name (will consume all available partitions)

  • tuple: (topic, partition)

  • dict:
    • { topic: partition }
    • { topic: [partition list] }
    • { topic: (partition tuple,) }

Optionally, offsets can be specified directly:

  • tuple: (topic, partition, offset)
  • dict: { (topic, partition): offset, ... }

Example:

kafka = KafkaConsumer()

# Consume topic1-all; topic2-partition2; topic3-partition0
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})

# Consume topic1-0 starting at offset 123, and topic2-1 at offset 456
# using tuples --
kafka.set_topic_partitions(("topic1", 0, 123), ("topic2", 1, 456))

# using dict --
kafka.set_topic_partitions({ ("topic1", 0): 123, ("topic2", 1): 456 })
task_done(message)

Mark a fetched message as consumed. Offsets for messages marked as “task_done” will be stored back to the kafka cluster for this consumer group on commit()