Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Kafka requests isolation concept

Andrey Dyachkov edited this page Nov 9, 2016 · 8 revisions

Status: Draft

Summary

Avoid Nakadi complete failure due to Kafka slow nodes.

Goals

Do we really need this? Identify criteria for circuit to be opened. Add support for short circuiting of Kafka client request: send and poll.

Non-Goals

Requests isolation for other components other than Kafka Which short circuit implementation to use. Just use Hystrix (it is in tech radar)

Success Metrics

Even If the whole Kafka cluster went down Nakadi still responds with proper HTTP codes.

Motivation

Currently Kafka unavailability introduces higher response time to the client. For the consumer it as following: since we are using one thread per consumer it blocks the whole thread and it waits 30 secs at maximum. That in its turn block other clients from accessing Nakadi as well as AWS health check mechanism. Almost the same for the producer except it is working only in one thread. This time can be significantly decreased by implementing circuit breaker. Motivation behind it is to have more resilient Nakadi cluster which does not go down once Kafka cluster(full or partially) goes down.

Description

This section will evolve over time as the work progresses, ultimately becoming the authoritative high-level description of the end result. Include hyperlinks to additional documents as

Hystrix

Hystrix is a latency and fault tolerance library designed to isolate points of access to remote systems, services and 3rd party libraries, stop cascading failure and enable resilience in complex distributed systems where failure is inevitable.

Hystrix group key

Topic

Implementation for this is already done(producer part) and it didn’t require a lot of effort. https://github.com/zalando/nakadi/pull/470

The circuit is opened once the topic reached errors threshold. The downside of this approach that the whole topic is unavailable even if some of the partitions are available for read/writes.

Reason not to use: In case we have a lot of requests it is possible that circuit isn’t opened due to long Kafka timeout and Nakadi goes down because a lot threads wait and no one can be served even health check.

Topic + partition

If Hystrix is okay to handle such load (800 topics * 8 partitions ~ 2000 group keys) then we also can wrap it in Hystrix command and that’s that. I have already asked question about it Hystrix Google Group The current mechanism for sending events to Kafka has to be reimplemented to support 207 for provided partitions because right now we do not care about partitions.

In order to implement it for consumption Kafka client has to be improved to return failed partition. Maybe Kafka Stream can help, investigation is needed.

Reason not to use: In case we have a lot of requests it is possible that circuit isn’t opened due to long Kafka timeout and Nakadi goes down because a lot threads wait and no one can be served even health check.

Broker

In that case group key has to be broker id (ip or some internal Kafka client representation). This approach has less overhead comparing to topic + partition because hystrix group keys will be limited by number of brokers. Downside - we do not know the broker id before hand that’s why some logic has to be implemented to identify it before creating Hystrix command.

In order to implement it for consumption Kafka client has to be improved to return failed brokers.

Short circuit criteria

Producer: TimeoutException, NetworkException, UnknowServerException. Consumer: TimeoutException, NetworkException, UnknowServerException, NoAvailableBrokersException

Implementation

  • Messages order should be preserved.

It is not possible just to wrap the existing logic in Hystrix command because the current implementation sends batches in loop, gathers futures and waits for all of them are completed but Hystrix command has to be succeeded/failed for each batch separately. The solution is to put one batch sending within the command and wait for the future to be done.

The method KafkaTopicRepository.syncPostBatch is changed to throw RuntimeException which is wrapped in EventPublishingException in the calling method.

ProducerSendCommand is a Hystrix command which does call to Kafka and waits for response.

Alternatives

One of the alternatives is to have support from Kafka client but it does not support any kind of short circuiting. After a short discussion with Kafka developers I understood they react really fast to the issues to prevent complete failures. Also it is not clear (they did not share it) what their setup is (maybe they use several small clusters etc.) Also a possible solution is to implement Kafka client returns broker-id/partition in TimeoutException for example and we use it in the next Hystrix cmd calls. Another alternative is to have a several clusters. From my point of view it is not related to the topic of isolation Kafka calls because even a few clusters can have a few slow nodes we will still encounter the problem with blocked threads for 30 secs for some of the topics/partitions/brokers. Smaller cluster per several event types.

Testing

There are several ways to test it: Load Kafka node cpu Saturate network between Nakadi and Kafka Break Kafka cluster

Risks and Assumptions

This solution has to be tested and adopted very careful because it involves configuration parameters which has to be tuned for our current load. For example: circuit opening threshold.