[ 
https://issues.apache.org/jira/browse/KAFKA-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069595#comment-16069595
 ] 

Seweryn Habdank-Wojewodzki edited comment on KAFKA-5530 at 6/30/17 6:44 AM:
----------------------------------------------------------------------------

Comment to the "allTopics" parameter. 
Yes, I thought about that, but it is not that simple. The main point here is 
that streamFilter is not a static-like being. 
It is created per topic, as I have different filters for different topics, that 
is why I made loop over all input topics and not used "bulk" API.
Do the API I would expect is something like: 

{code}
kBuilder.stream( STRING_SERDE, STRING_SERDE, Strings[] ).filter( Predicate<? 
super K,? super V>[] ); // array of predicates
{code}

IMHO such a thing has no sense, as dimentions from both arrays shall fit 
together.

If you have better idea, just snap a hint, please :-).

And I am happy to test the new relase. I will send you feedback till next 
wednesday. Is it OK for you?


was (Author: habdank):
Comment to the "allTopics" parameter. 
Yes, I thought about that, but it is not that simple. The main point here is 
that streamFilter is not a static-like being. 
It is created per topic, as I have different filters for different topics, that 
is why I made loop over all input topics and not used "bulk" API.
Do the API I would expect is something like: 

{code}
kBuilder.stream( STRING_SERDE, STRING_SERDE, Strings[] ).filter( Predicate<? 
super K,? super V>[] ); // array of predicates
{code}

IMHO such a thing has no sense, as dimentions from both arrays shall fit 
together.

If you have better idea, just snap a hint, please :-).

And I am happy to test the new Relase. I will send you feedback till next 
wednesday. Is it OK for you?

> Balancer is dancing with KStream all the time, and due to that Kafka cannot 
> work :-)
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5530
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5530
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.2.1
>         Environment: Linux, Windows
>            Reporter: Seweryn Habdank-Wojewodzki
>         Attachments: streamer-2.zip, streamer.zip
>
>
> Dears,
> There are problems with balancer in KStreams, when _num.stream.threads_ is 
> bigger than 1 and the number of the input topics are bigger than 1.
> I am doing more less such a setup in the code:
> {code:java}
> // loop over the inTopicName(s) {
> KStream<String, String> stringInput = kBuilder.stream( STRING_SERDE, 
> STRING_SERDE, inTopicName );
> stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer 
> ).to( outTopicName );
> // } end of loop
> streams = new KafkaStreams( kBuilder, streamsConfig );
> streams.cleanUp();
> streams.start();
> {code}
> And if there are *_num.stream.threads=4_* but there are 2 or more but less 
> than num.stream.threads inTopicNames, then complete application startup is 
> totally self-blocked, by writing endless starnge things in log and not 
> starting.
> Even more problematic is when the nuber of topics is higher than 
> _num.stream.threads_ what I had commented in *KAFKA-5167 streams task gets 
> stuck after re-balance due to LockException*.
> I am attaching logs for two scenarios:
>  * when: 1 < num.stream.threads < numer of topics (KAFKA-5167)
>  * when: 1 < numer of topics < num.stream.threads (this ticket).
> I can fully reproduce the behaviour. Even I found workaround for it, but is 
> not desired. When _num.stream.threads=1_ than all works fine :-(.
> {code:bash}
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-3] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 2.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2701
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-1] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-1] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-1] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-1] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread 
> [StreamThread-1] Constructed client metadata 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, 
> consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-1-consumer-ab798efe-16a6-4686-bdee-ccd50c937cd7],
>  state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member 
> subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-1] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-1] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-1] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2702
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-3] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-3] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-3] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-3] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-2] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-4] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-2] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-4] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-2] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-4] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-4] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-2] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread 
> [StreamThread-2] Constructed client metadata 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, 
> consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-3-consumer-16274860-9a0f-4df9-8af3-10f4c3c23d50,
>  
> stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-4-consumer-be7bc520-7174-4d6e-9258-9761b6c45bd9,
>  
> stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-2-consumer-401f1542-c311-4b1f-8f4e-72d6ade12583],
>  state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
> prevAssignedTasks: ([]) capacity: 3.0 cost: 0.0]}} from the member 
> subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-2] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-2] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-2] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 3.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2703
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2703
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2703
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-4] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-2] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-1] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-1] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-1] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-1] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread 
> [StreamThread-1] Constructed client metadata 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, 
> consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-1-consumer-b35886f7-0525-458b-9b3e-8856554d0afb],
>  state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
> prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member 
> subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-1] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-1] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-1] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2704
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-2] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-4] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-2] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-4] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-2] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-4] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-2] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-4] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously 
> assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-3] 
> partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-3] 
> Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-3] 
> Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-3] 
> Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:290 - stream-thread 
> [StreamThread-2] Constructed client metadata 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, 
> consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-3-consumer-142cd5c5-a52d-494a-a8be-ee1f9ae831e2,
>  
> stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-2-consumer-f8a93346-c322-4e9e-ab38-c9a9eb4a9fa4,
>  
> stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-4-consumer-0726705d-c88f-4ad2-81c0-9ab02175b53e],
>  state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) 
> prevAssignedTasks: ([]) capacity: 3.0 cost: 0.0]}} from the member 
> subscriptions.
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-2] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:630 - stream-thread 
> [StreamThread-2] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:466 - stream-thread 
> [StreamThread-2] Assigned tasks to clients as 
> {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) 
> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 3.0 cost: 0.0]}.
> 2017-06-27 19:45:01 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2705
> 2017-06-27 19:45:01 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2705
> 2017-06-27 19:45:01 INFO AbstractCoordinator:375 - Successfully joined group 
> stream with generation 2705
> 2017-06-27 19:45:01 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:01 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:01 INFO StreamThread:228 - stream-thread [StreamThread-4] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:01 INFO ConsumerCoordinator:252 - Setting newly assigned 
> partitions [] for group stream
> 2017-06-27 19:45:01 INFO StreamThread:228 - stream-thread [StreamThread-3] 
> New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:01 INFO StreamThread:228 - stream-thread [StreamThread-2] 
> New partitions [[]] assigned at the end of consumer rebalance. 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to