[ 
https://issues.apache.org/jira/browse/KAFKA-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16708392#comment-16708392
 ] 

Dmitry Buykin edited comment on KAFKA-7695 at 12/4/18 8:40 AM:
---------------------------------------------------------------

[~mjsax] I think it's a bug because KStreams derive settings from Consumer but 
not support Consumer's contract with these settings. I think "try and fail" 
approach is not the best option to find supported set of settings for KStreams.

About your question "How partition assignment could help with throttling?": 

My idea was to use the same approach like in 
[KIP-169|https://cwiki.apache.org/confluence/display/KAFKA/KIP-169+-+Lag-Aware+Partition+Assignment+Strategy]
 with restrictions specific for KStreams as stick partitions(tasks) to valid 
local state, extended by enabling/disabling some topics as function of consumer 
lag (temporarily, on some stage of processing) and redistribute topics between 
stream threads/consumer clients inside the same KStream instance (now it's 
round-robin). Then more topics per thread / consumer means it will slow down 
processing, less topics per thread / consumer will boost processing. So 
switching off base fast topic will help to pre-load data in rocksdbs for side 
streams in left joins then after restart continue processing with all topics 
enabled. Yes, it's complex but will help to solve current issues of 
disproportional processing between "slow" and "fast" streams.

I think it would be hard to make dynamic, but in my project it's acceptable. I 
working on a project to load data from legacy systems using CDC. So I deal with 
huge amount of events (~ one year) on initial refresh and big join windows to 
support reshuffled timestamps from source system, then switching to operational 
mode with small windows and fast processing.

One additional comment about the StreamsPartitionAssignor implementation and 
its ugly design. I think it mixes three different behaviors which should be 
segregated: a)  stick partitions as tasks to local state storage. b) balance 
work load between StreamThreads in the same KStreams instance. c) balance 
partitions between kafka consumers proportionally. And it's done by extending 
PartitionAssignor which was designed to balance consumers only. Sometimes it 
works really strange, for example when it's needed to load (and filter) some 
metadata from several "slow" topics with few partitions the 
StreamsPartitionAssignor assigns these topics to some unlucky StreamThread 
which has to retire after completing processing of these "slow" topics. 

I think distribution of topics between threads should be dynamically balanced 
with respect to local state storage.
{quote}Last, for KStream-GlobalKTable joins, there is no timestamp 
synchronization by design.
{quote}
 I've mentioned KStream-KStream joins only.


was (Author: zirx):
[~mjsax] I think it's a bug because KStreams derive settings from Consumer but 
not support Consumer's contract with these settings. I think "try and fail" 
approach is not the best option to find supported set of settings for KStreams.

About your question "How partition assignment could help with throttling?": 

My idea was to use the same approach like in 
[KIP-169|https://cwiki.apache.org/confluence/display/KAFKA/KIP-169+-+Lag-Aware+Partition+Assignment+Strategy]
 with restrictions specific for KStreams as stick partitions(tasks) to valid 
local state, extended by enabling/disabling some topics as function of consumer 
lag (temporarily, on some stage of processing) and redistribute topics between 
stream threads/consumer clients inside the same KStream instance (now it's 
round-robin). Then more topics per thread / consumer means it will slow down 
processing, less topics per thread / consumer will boost processing. So 
switching off base fast topic will help to pre-load data in rocksdbs for side 
streams in left joins then after restart continue processing with all topics 
enabled. Yes, it's complex but will help to solve current issues of 
disproportional processing between "slow" and "fast" streams.

I think it would be hard to make dynamic, but in my project it's acceptable. I 
working on a project to load data from legacy systems using CDC. So I deal with 
huge amount of events (~ one year) on initial refresh and big join windows to 
support reshuffled timestamps from source system, then switching to operational 
mode with small windows and fast processing.

One additional comment about the StreamsPartitionAssignor implementation and 
its ugly design. I think it mixes three different behaviors which should be 
segregated: a)  stick partitions as tasks to local state storage. b) balance 
work load between StreamThreads in the same KStreams instance. c) balance 
partitions between kafka consumers proportionally. And it's done by extending 
PartitionAssignor which was designed to balance consumers only. Sometimes it 
works really strange, for example when it's needed to load (and filter) some 
metadata from several "slow" topics with few partitions the 
StreamsPartitionAssignor assigns these topics to some unlucky StreamThread 
which has to retire after completing processing of these "slow" topics. 

I think distribution of topics between threads should be dynamically balanced 
with respect to local state storage.

 

 

> Cannot override StreamsPartitionAssignor in configuration 
> ----------------------------------------------------------
>
>                 Key: KAFKA-7695
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7695
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0, 2.0.1
>            Reporter: Dmitry Buykin
>            Priority: Major
>              Labels: configuration
>
> Cannot override StreamsPartitionAssignor by changing property 
> partition.assignment.strategy in KStreams 2.0.1 because the streams are 
> crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer 
> works only with RangeAssignor which configured by default.
> Could be reproduced by setting up
> `props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> StreamsPartitionAssignor.class.getName());`
> For me it looks like a bug.
> Opened a discussion here 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to