-
Notifications
You must be signed in to change notification settings - Fork 46
FAQs
- What is error_code X?
- What is
offsets_not_available_at_group_coordinator? - What is
offsets_out_of_range? - What is
MessageTooBigException? - How do I prevent data loss?
- How should I configure Kafunk to run across a WAN?
- How do I consume faster?
- How do I produce faster?
- Should I use
Consumer.streamorConsumer.consume? - I'm seeing lots of consumer heartbeat errors, and group rebalancing. What's up?
- How does Kafunk handle failures?
- How does logging work?
- How do I monitor lag/progress?
The event offsets_not_available_at_group_coordinator indicates that the Kafka broker responsible for storing a consumer's offsets - the group coordinator - doesn't have offsets for some or all of the partitions for a given topic, consumer group combination. This can happen if the consumer's ConsumerConfig.autoOffsetReset strategy is set to AutoOffsetReset.Halt or AutoOffsetReset.TryStartFromCommittedOffsets and one of is true:
- The consumer is running for the first time.
- The consumer hasn't committed offsets for longer than the offset retention period.
- The broker lost of the offsets.
In any case, the consumer should be pointed to offsets explicitly using either Consumer.commitOffsets or Consumer.commitOffsetsToTime or the consumer should use AutoOffsetReset.StartFromTime. Note that with the latter, there is potential of message loss.
Scenario 2 can happen if the consumer goes offline for longer than the offset retention period - since the consumer is offline, offsets aren't being committed and eventually get deleted by the broker. The offset retention period can be extended, either by updating the server side configuration offsets.retention.minutes, or on the client side by overriding ConsumerConfig.offsetRetentionTime. If consuming with periodic offset commits (with Consumer.consumePeriodicCommit, or otherwise) then Kafunk will commit offsets periodically even if no new messages are received. However, it will only commit offsets that exist. Therefore, if offsets are missing, they must be committed explicitly as described above.
Scenario 3 shouldn't happen in theory, however there have been incidents in practice.
The event offsets_out_of_range indicates that the offsets requested in a FetchRequest aren't available. This can happen in the following scenarios:
- The consumer is running for the first time and doesn't have an automatic
autoOffsetResetvalue.Consumer.commitOffsetsshould be called to explicitly initialize a consumer to an offset. - The consumer was assigned to invalid offsets, using
Consumer.commitOffsets, for example. This is simply user error. Valid offsets for a topic can be discovered using the Offsets module. - The consumer fell behind the topic retention window. This can happen if the consumer was offline for a period of time, or the consumer is unable to keep up with the pace of retention even when online. In this case, messages have been lost. Action must be taken to ensure the consumer can keep up with both the producers and the retention window.
- A lagging broker became the leader for a partition, and it didn't have an offset that the prior leader had. This is the opposite of the previous scenario in that the consumer is ahead of where the broker is rather than behind. Note that simply having the consumer reset from an earlier available offset will not prevent message loss because the desired offset will be overwritten by a different message than what was there prior to the new broker becoming leader.
By default, Kafunk is configured to halt when it detects out of range offsets. This behavior can be changed by changing ConsumerConfig.autoOffsetReset.
See also:
- What is
offsets_not_available_at_group_coordinator? - How do I consume faster?
- How do I prevent data loss?
Kafunk raises a MessageTooBigException when attempting to decode a FetchResponse and a message in a message set is bigger than the entire message set. This means that the message was truncated before being sent, and therefore corrupt. To fix this, make sure that ConsumerConfig.fetchMaxBytes is greater than the size of the largest message in a topic. An applicable Kafka server configuration point is max.message.bytes.
By default, Kafunk is configured to either prevent data loss or to fail fast in case a potential loss of data has been detected. The applicable Kafunk configuration points are:
-
ProducerConfig.requiredAcks = RequiredAcks.AllInSync: this setting works in conjunction with the server side
min.insync.replicassetting to determine the number of replicas which acknowledge a produce request. By default, Kafunk requires all in-sync replicas (ISR) to acknowledge a produce request. By default, Kafka configuresmin.insync.replicas= 1 so you may want to increase this setting. Note that this comes as the cost of increased produce latency since more synchronization is required. -
ConsumerConfig.autoOffsetReset = AutoOffsetReset.TryStartFromCommittedOffsets: this setting controls the behavior of the consumer when it doesn't have offsets to start from and when it detects an out of range offset from a fetch request. The former can happen if the consumer is running for the first time or if the consumer's offsets have been lost due to offset retention. The latter can happen if the consumer is falling behind the topic retention window, or a lagging broker has been elected as the leader for a partition.
-
ConsumerConfig.offsetRetentionTime: this setting overrides the server side offset retention setting. This can be used to extend the offset retention window to allow consumers to be offline for longer periods.
The applicable server side configuration points are:
min.insync.replicasoffsets.retention.minutesoffsets.topic.replication.factorunclean.leader.election.enablelog.retention.*
Increased latencies across a WAN connection can warrant larger TCP window sizes. The applicable configuration points are:
- ChanConfig.receiveBufferSize controls the TCP receive buffer size.
- ChanConfig.sendBufferSize controls the TCP send buffer size.
The following consumer and producer configuration points are also applicable:
- ConsumerConfig.fetchMaxBytes controls the maximum amount of data Kafka will send for each partition of a fetch request. Increasing this value will increase throughput. Note however that setting this value too high may cause heartbeat requests to slow down beyond the session timeout period, causing the consumer instance to be ejected from the group.
- ProducerConfig.batchSizeBytes controls the maximum size of a batch of client side produce requests before being sent to the server. Increasing this value will increase throughput at the cost of latency.
- ProducerConfig.batchLingerMs controls the maximum amount of time to wait before sending a batch of client side produce requests to the server. Increasing this value will increase throughput at the cost of latency.
Note that the tradeoff being made is that of throughput versus latency. For some rules of thumb on tuning the TCP window with respect to network latency, look here.
See How should I configure Kafunk to run across a WAN?
See How should I configure Kafunk to run across a WAN?
Both Consumer.stream and Consumer.consume are implemented using Consumer.generations, but they differ in terms of delegation of control. Consumer.consume takes a function which is then called for every received message set, in parallel across partitions, but sequentially within a partition. The messages sets are pushed to the provided function. Consumer.stream returns an async sequence which must be pulled. Before being iterated however, this async sequence can be transformed, allowing for data-driven parallelism.
As a rule of thumb, use Consumer.stream only if you need fine grained control over parallelism, or if you're explicitly defining a streaming workflow. In all other cases, prefer to use Consumer.consume or its variants.
Symptoms:
- Slow or stalled consumer progress.
- Event logs:
heartbeat_errorrejoining_groupjoin_group_errorsync_group_error
- Kafka error codes:
- 22 - IllegalGenerationCode
- 25 - UnknownMemberIdCode
- 27 - RebalanceInProgressCode
Possible causes:
- Restarting consumers: if consumers are frequently restarting due to errors, they will attempt to rejoin the consumer group and this will cause frequent rebalancing which can prevent the consumer from making progress.
- Low session timeout: if ConsumerConfig.sessionTimeout is too low, consumers may have insufficient time to join the consumer group and synchronize before a rebalance is initiated. Related to this is ConsumerConfig.fetchMaxBytes which if set very high can slow the rate at which a group coordinator, which is also a leader for a partition, acknowledges heartbeats. As a result, it can initiate a rebalance. Increasing ConsumerConfig.sessionTimeout often times fixes the issue.
In general, Kafunk handles failed operations by retrying using RetryPolicy. If all retry attempts have failed, Kafunk will throw an exception. Users of Kafunk shouldn't have to retry Kafunk operations; instead, tune the retry policies to suit your needs.
Note that even though you can tune the retry policy to retry indefinitely consider letting it crash instead to allow other environmental factors to take effect. Take for example the retry policy for a TCP connection to an individual broker. It might make sense to retry an operation on that connection once or twice, but beyond that, you're better off rediscovering the state of the cluster and attempting the operation with the new state taken into account. The broker may no longer be active, in which case retrying the operation is pointless. Likewise, you may try connecting to the configured list of bootstrap hosts several times, but its possible that the host name has changed entirely, in which case its better to restart the entire application and pick up new configuration. As such, tune the retry policies with respect to your particular environment. Retry policies are effective for transient errors, allowing you to continue operation without incurring a costly application restart. However, they aren't a complete fault tolerance solution for your entire service.
The applicable configuration points are:
- ChanConfig.requestRetryPolicy controls the retry policy for requests to an individual broker. This is helpful for recovering from transient network failures or timeouts, but it can't recover from cluster topology changes. Most of the time, this retry policy should be short so that cluster topology changes are detected sooner. See KafkaConfig.requestRetryPolicy bellow.
- ChanConfig.requestTimeout controls the maximum duration of requests to individual brokers after which the request is cancelled, making it available for retry.
- ChanConfig.connectRetryPolicy controls the retry policy for connection attempts to an individual Kafka broker. This is helpful for recovering from transient network failures or timeout, but it can't recover from cluster topology changes. See KafkaConfig.bootstrapConnectRetryPolicy bellow.
- ChanConfig.connectTimeout controls the maximum duration of a TCP connection attempt to an individual broker.
-
KafkaConfig.requestRetryPolicy controls the retry policy for all requests. This setting works in conjunction with ChanConfig.requestRetryPolicy to determine how a request is retried. The latter controls the retry policy with respect to an individual broker and defines the first tier of retries. These are meant to recover from transient network issues or timeouts, but aren't able to recover from cluster topology changes. This is where
KafkaConfig.requestRetryPolicycomes into play - it controls retries which consist of re-discovering the state of the cluster and retrying the operation on a potentially new broker. - KafkaConfig.bootstrapConnectRetryPolicy controls the retry policy for connecting to a bootstrap broker. This setting works in conjunction with ChanConfig.connectRetryPolicy to determine how connections are retries. The latter controls the connection retry policy with respect to an individual broker. The former controls the connection retries with respect to the entire bootstrap broker list.
A Kafunk consumer handles consumer group failures by rejoining the group. The applicable configuration points are:
- ConsumerConfig.sessionTimeout controls the time window during which a consumer must send heartbeats to the group coordinator, and where failure to do so results in a group rebalance.
-
ConsumerConfig.heartbeatFrequency controls the number of times heartbeats are sent during
sessionTimeout.
Kafunk publishes all logging events to Log.Event. By default, log events are written to STDOUT at the INFO level. This may be disabled by disposing Log.ConsolePrinterSubscription. The default log level is INFO and may be adjusted by mutating the field Log.MinLevel. You may add your own logger by simply subscribing to Log.Event. This area will soon undergo changes as tracked here.
You may use existing tools for monitoring Kafka progress. However, you may also monitor consumer progress directly using the Kafka API. The ConsumerInfo.progress function takes a connection, a group id, a topic and an option set of partitions and returns an CosumerProgressInfo object representing the progress of the consumer in the topic. Take a look at the ConsumerProgress script for an example.