[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-21 Thread Flower.min (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16797927#comment-16797927
 ] 

Flower.min commented on KAFKA-8106:
---

_*We only don't decompress key and value of a record, it still can validate the 
offset and timestamp.*_

When all of the following conditions are met, we recommend removing unnecessary 
decompression operations when validating compressed messages:
*1. The compression type of Kafka servers and topic is consistent with Kafka 
producer.*
*2. The magic value to use is above 1*
*3. No format conversion and value overwriting is required for messages 
compressed.*

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7989) Flaky Test RequestQuotaTest#testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated

2019-03-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16797963#comment-16797963
 ] 

ASF GitHub Bot commented on KAFKA-7989:
---

rajinisivaram commented on pull request #6482: KAFKA-7989: RequestQuotaTest 
should wait for quota config change before running tests
URL: https://github.com/apache/kafka/pull/6482
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test 
> RequestQuotaTest#testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated
> -
>
> Key: KAFKA-7989
> URL: https://issues.apache.org/jira/browse/KAFKA-7989
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Anna Povzner
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/27/]
> {quote}java.util.concurrent.ExecutionException: java.lang.AssertionError: 
> Throttle time metrics for consumer quota not updated: 
> small-quota-consumer-client at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
> java.util.concurrent.FutureTask.get(FutureTask.java:206) at 
> kafka.server.RequestQuotaTest.$anonfun$waitAndCheckResults$1(RequestQuotaTest.scala:415)
>  at scala.collection.immutable.List.foreach(List.scala:392) at 
> scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:38)
>  at 
> scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:38)
>  at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:47) at 
> kafka.server.RequestQuotaTest.waitAndCheckResults(RequestQuotaTest.scala:413) 
> at 
> kafka.server.RequestQuotaTest.testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(RequestQuotaTest.scala:134){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7989) Flaky Test RequestQuotaTest#testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated

2019-03-21 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7989.
---
Resolution: Fixed
  Reviewer: Rajini Sivaram

> Flaky Test 
> RequestQuotaTest#testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated
> -
>
> Key: KAFKA-7989
> URL: https://issues.apache.org/jira/browse/KAFKA-7989
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0
>Reporter: Matthias J. Sax
>Assignee: Anna Povzner
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/27/]
> {quote}java.util.concurrent.ExecutionException: java.lang.AssertionError: 
> Throttle time metrics for consumer quota not updated: 
> small-quota-consumer-client at 
> java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
> java.util.concurrent.FutureTask.get(FutureTask.java:206) at 
> kafka.server.RequestQuotaTest.$anonfun$waitAndCheckResults$1(RequestQuotaTest.scala:415)
>  at scala.collection.immutable.List.foreach(List.scala:392) at 
> scala.collection.generic.TraversableForwarder.foreach(TraversableForwarder.scala:38)
>  at 
> scala.collection.generic.TraversableForwarder.foreach$(TraversableForwarder.scala:38)
>  at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:47) at 
> kafka.server.RequestQuotaTest.waitAndCheckResults(RequestQuotaTest.scala:413) 
> at 
> kafka.server.RequestQuotaTest.testResponseThrottleTimeWhenBothFetchAndRequestQuotasViolated(RequestQuotaTest.scala:134){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8123) Flaky Test RequestQuotaTest#testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated

2019-03-21 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-8123.
---
   Resolution: Duplicate
Fix Version/s: 2.2.1

> Flaky Test 
> RequestQuotaTest#testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated
>  
> 
>
> Key: KAFKA-8123
> URL: https://issues.apache.org/jira/browse/KAFKA-8123
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Anna Povzner
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3474/tests]
> {quote}java.util.concurrent.ExecutionException: java.lang.AssertionError: 
> Throttle time metrics for produce quota not updated: Client 
> small-quota-producer-client apiKey PRODUCE requests 1 requestTime 
> 0.015790873650539786 throttleTime 1000.0
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:206)
> at 
> kafka.server.RequestQuotaTest$$anonfun$waitAndCheckResults$1.apply(RequestQuotaTest.scala:423)
> at 
> kafka.server.RequestQuotaTest$$anonfun$waitAndCheckResults$1.apply(RequestQuotaTest.scala:421)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
> at 
> kafka.server.RequestQuotaTest.waitAndCheckResults(RequestQuotaTest.scala:421)
> at 
> kafka.server.RequestQuotaTest.testResponseThrottleTimeWhenBothProduceAndRequestQuotasViolated(RequestQuotaTest.scala:130){quote}
> STDOUT
> {quote}[2019-03-18 21:42:16,637] ERROR [KafkaApi-0] Error when handling 
> request: clientId=unauthorized-CONTROLLED_SHUTDOWN, correlationId=1, 
> api=CONTROLLED_SHUTDOWN, body=\{broker_id=0,broker_epoch=9223372036854775807} 
> (kafka.server.KafkaApis:76)
> org.apache.kafka.common.errors.ClusterAuthorizationException: Request 
> Request(processor=2, connectionId=127.0.0.1:42118-127.0.0.1:47612-1, 
> session=Session(User:Unauthorized,/127.0.0.1), 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> buffer=null) is not authorized.
> [2019-03-18 21:42:16,655] ERROR [KafkaApi-0] Error when handling request: 
> clientId=unauthorized-STOP_REPLICA, correlationId=1, api=STOP_REPLICA, 
> body=\{controller_id=0,controller_epoch=2147483647,broker_epoch=9223372036854775807,delete_partitions=true,partitions=[{topic=topic-1,partition_ids=[0]}]}
>  (kafka.server.KafkaApis:76)
> org.apache.kafka.common.errors.ClusterAuthorizationException: Request 
> Request(processor=0, connectionId=127.0.0.1:42118-127.0.0.1:47614-2, 
> session=Session(User:Unauthorized,/127.0.0.1), 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> buffer=null) is not authorized.
> [2019-03-18 21:42:16,657] ERROR [KafkaApi-0] Error when handling request: 
> clientId=unauthorized-LEADER_AND_ISR, correlationId=1, api=LEADER_AND_ISR, 
> body=\{controller_id=0,controller_epoch=2147483647,broker_epoch=9223372036854775807,topic_states=[{topic=topic-1,partition_states=[{partition=0,controller_epoch=2147483647,leader=0,leader_epoch=2147483647,isr=[0],zk_version=2,replicas=[0],is_new=true}]}],live_leaders=[\{id=0,host=localhost,port=0}]}
>  (kafka.server.KafkaApis:76)
> org.apache.kafka.common.errors.ClusterAuthorizationException: Request 
> Request(processor=1, connectionId=127.0.0.1:42118-127.0.0.1:47616-2, 
> session=Session(User:Unauthorized,/127.0.0.1), 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> buffer=null) is not authorized.
> [2019-03-18 21:42:16,668] ERROR [KafkaApi-0] Error when handling request: 
> clientId=unauthorized-UPDATE_METADATA, correlationId=1, api=UPDATE_METADATA, 
> body=\{controller_id=0,controller_epoch=2147483647,broker_epoch=9223372036854775807,topic_states=[{topic=topic-1,partition_states=[{partition=0,controller_epoch=2147483647,leader=0,leader_epoch=2147483647,isr=[0],zk_version=2,replicas=[0],offline_replicas=[]}]}],live_brokers=[\{id=0,end_points=[{port=0,host=localhost,listener_name=PLAINTEXT,security_protocol_type=0}],rack=null}]}
>  (kafka.server.KafkaApis:76)
> org.apache.kafka.common.errors.ClusterAuthorizationException: Request 
> Request(processor=2, connectionId=127.0.0.1:42118-127.0.0.1:47618-2, 
> session=Session(User:Unauthorized,/127.0.0.1), 
> listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, 
> buffer=null) is not authorized.
> [2019-03-18 21:42:16,725] ERROR [KafkaApi-0] Error when handling request: 
> clientId=unauthorized-STOP_REPLICA, correlation

[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-03-21 Thread Andrew Klopper (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798017#comment-16798017
 ] 

Andrew Klopper commented on KAFKA-7895:
---

Hi

I am still seeing this behaviour using the 2.2.0 Maven artifacts from 
[https://repository.apache.org/content/groups/staging]

With the topology below, I invariably get emissions of previously emitted 
windows for the same key on restart of my streams application, and sometimes 
the re-emitted windows have earlier timestamps and aggregated data that is 
consistent with the latter part of the window being lost (i.e., state seems to 
be resetting to an earlier version):
{code:java}
return sourceStream
.groupByKey()

.windowedBy(TimeWindows.of(rollupInterval).grace(config.getGracePeriodDuration()))
.aggregate(
() -> rollupFactory.createRollup(windowDuration),
aggregator,
Materialized.>as(outputTopic + 
"_state")
.withValueSerde(rollupSerde)
)
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
.withName(outputTopic + "_suppress_state"))
.toStream((stringWindowed, rollup) -> stringWindowed.key());
{code}
 I have tried disabling caching using:
{code:java}
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
{code}
 but this does not appear to make a difference.

Regards

Andrew

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-03-21 Thread Andrew Klopper (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798017#comment-16798017
 ] 

Andrew Klopper edited comment on KAFKA-7895 at 3/21/19 11:38 AM:
-

Hi

I am still seeing this behaviour using the 2.2.0 Maven artifacts from 
[https://repository.apache.org/content/groups/staging]

With the topology below, I invariably get emissions of previously emitted 
windows for the same key on restart of my streams application, and sometimes 
the re-emitted windows have earlier timestamps and aggregated data that is 
consistent with the latter part of the window being lost (i.e., state seems to 
be resetting to an earlier version):
{code:java}
return sourceStream
.groupByKey()

.windowedBy(TimeWindows.of(rollupInterval).grace(config.getGracePeriodDuration()))
.aggregate(
() -> rollupFactory.createRollup(windowDuration),
aggregator,
Materialized.>as(outputTopic + 
"_state")
.withValueSerde(rollupSerde)
)
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
.withName(outputTopic + "_suppress_state"))
.toStream((stringWindowed, rollup) -> stringWindowed.key());
{code}
 I have tried disabling caching using:
{code:java}
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
{code}
 but this does not appear to make a difference.

There also seems to be no difference in behaviour between 2.2.0 and 2.1.1.

Regards

Andrew


was (Author: andrewrk):
Hi

I am still seeing this behaviour using the 2.2.0 Maven artifacts from 
[https://repository.apache.org/content/groups/staging]

With the topology below, I invariably get emissions of previously emitted 
windows for the same key on restart of my streams application, and sometimes 
the re-emitted windows have earlier timestamps and aggregated data that is 
consistent with the latter part of the window being lost (i.e., state seems to 
be resetting to an earlier version):
{code:java}
return sourceStream
.groupByKey()

.windowedBy(TimeWindows.of(rollupInterval).grace(config.getGracePeriodDuration()))
.aggregate(
() -> rollupFactory.createRollup(windowDuration),
aggregator,
Materialized.>as(outputTopic + 
"_state")
.withValueSerde(rollupSerde)
)
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())
.withName(outputTopic + "_suppress_state"))
.toStream((stringWindowed, rollup) -> stringWindowed.key());
{code}
 I have tried disabling caching using:
{code:java}
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
{code}
 but this does not appear to make a difference.

Regards

Andrew

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any ex

[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer

2019-03-21 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798210#comment-16798210
 ] 

Lee Dongjin commented on KAFKA-7996:


[~bbejeck] Great. I will prepare the KIP. Thanks for your feedback!

> KafkaStreams does not pass timeout when closing Producer
> 
>
> Key: KAFKA-7996
> URL: https://issues.apache.org/jira/browse/KAFKA-7996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Patrik Kleindl
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
>
> [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/]
> We are running 2.1 and have a case where the shutdown of a streams 
> application takes several minutes
> I noticed that although we call streams.close with a timeout of 30 seconds 
> the log says
> [Producer 
> clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> Matthias J Sax [vor 3 Tagen]
> I just checked the code, and yes, we don't provide a timeout for the producer 
> on close()...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7819) Trogdor - Improve RoundTripWorker

2019-03-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798273#comment-16798273
 ] 

ASF GitHub Bot commented on KAFKA-7819:
---

cmccabe commented on pull request #6187: KAFKA-7819: Improve RoundTripWorker
URL: https://github.com/apache/kafka/pull/6187
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor - Improve RoundTripWorker
> -
>
> Key: KAFKA-7819
> URL: https://issues.apache.org/jira/browse/KAFKA-7819
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> Trogdor's RoundTripWorker task has a couple of shortcomings:
>  * Consumer GroupID is hardcoded and consumers use `KafkaConsumer#assign()`: 
> [https://github.com/apache/kafka/blob/12947f4f944955240fd14ce8b75fab5464ea6808/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java#L314]
> Leaving you unable to run two separate instances of this worker on the same 
> partition in the same cluster, as the consumers would overwrite each other's 
> commits. It's probably better to add the task ID to the consumer group
>  * the task spec's `maxMessages` [is an 
> integer|https://github.com/apache/kafka/blob/12947f4f944955240fd14ce8b75fab5464ea6808/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java#L39],
>  leaving you unable to schedule long-winded tasks



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8142) Kafka Streams fails with NPE if records contains null-value in header

2019-03-21 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-8142:
--

Assignee: Matthias J. Sax

> Kafka Streams fails with NPE if records contains null-value in header
> -
>
> Key: KAFKA-8142
> URL: https://issues.apache.org/jira/browse/KAFKA-8142
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> [2019-03-14 13:14:49,756] ERROR stream-thread 
> [-2-1-1f30b4e6-1204-4aa7-9426-a395ab06ad64-StreamThread-2] 
> Failed to process stream task 9_2 due to the following error: 
> (org.apache.kafka.str
> eams.processor.internals.AssignedStreamsTasks)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.processor.internals.ProcessorRecordContext.sizeBytes(ProcessorRecordContext.java:93)
> at 
> org.apache.kafka.streams.state.internals.ContextualRecord.sizeBytes(ContextualRecord.java:42)
> at 
> org.apache.kafka.streams.state.internals.LRUCacheEntry.(LRUCacheEntry.java:53)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:160)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:36)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:114)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:124)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
> at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8142) Kafka Streams fails with NPE if records contains null-value in header

2019-03-21 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8142:
--

 Summary: Kafka Streams fails with NPE if records contains 
null-value in header
 Key: KAFKA-8142
 URL: https://issues.apache.org/jira/browse/KAFKA-8142
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.1
Reporter: Matthias J. Sax


[2019-03-14 13:14:49,756] ERROR stream-thread 
[-2-1-1f30b4e6-1204-4aa7-9426-a395ab06ad64-StreamThread-2] Failed 
to process stream task 9_2 due to the following error: (org.apache.kafka.str
eams.processor.internals.AssignedStreamsTasks)
java.lang.NullPointerException
at 
org.apache.kafka.streams.processor.internals.ProcessorRecordContext.sizeBytes(ProcessorRecordContext.java:93)
at 
org.apache.kafka.streams.state.internals.ContextualRecord.sizeBytes(ContextualRecord.java:42)
at 
org.apache.kafka.streams.state.internals.LRUCacheEntry.(LRUCacheEntry.java:53)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:160)
at 
org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:36)
at 
org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:114)
at 
org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:124)
at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8142) Kafka Streams fails with NPE if records contains null-value in header

2019-03-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798309#comment-16798309
 ] 

ASF GitHub Bot commented on KAFKA-8142:
---

mjsax commented on pull request #6484: KAFKA-8142: Fix NPE for nulls in Headers
URL: https://github.com/apache/kafka/pull/6484
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams fails with NPE if records contains null-value in header
> -
>
> Key: KAFKA-8142
> URL: https://issues.apache.org/jira/browse/KAFKA-8142
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> [2019-03-14 13:14:49,756] ERROR stream-thread 
> [-2-1-1f30b4e6-1204-4aa7-9426-a395ab06ad64-StreamThread-2] 
> Failed to process stream task 9_2 due to the following error: 
> (org.apache.kafka.str
> eams.processor.internals.AssignedStreamsTasks)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.processor.internals.ProcessorRecordContext.sizeBytes(ProcessorRecordContext.java:93)
> at 
> org.apache.kafka.streams.state.internals.ContextualRecord.sizeBytes(ContextualRecord.java:42)
> at 
> org.apache.kafka.streams.state.internals.LRUCacheEntry.(LRUCacheEntry.java:53)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:160)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:36)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:114)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:124)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
> at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8142) Kafka Streams fails with NPE if record contains null-value in header

2019-03-21 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8142:
---
Summary: Kafka Streams fails with NPE if record contains null-value in 
header  (was: Kafka Streams fails with NPE if records contains null-value in 
header)

> Kafka Streams fails with NPE if record contains null-value in header
> 
>
> Key: KAFKA-8142
> URL: https://issues.apache.org/jira/browse/KAFKA-8142
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> [2019-03-14 13:14:49,756] ERROR stream-thread 
> [-2-1-1f30b4e6-1204-4aa7-9426-a395ab06ad64-StreamThread-2] 
> Failed to process stream task 9_2 due to the following error: 
> (org.apache.kafka.str
> eams.processor.internals.AssignedStreamsTasks)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.processor.internals.ProcessorRecordContext.sizeBytes(ProcessorRecordContext.java:93)
> at 
> org.apache.kafka.streams.state.internals.ContextualRecord.sizeBytes(ContextualRecord.java:42)
> at 
> org.apache.kafka.streams.state.internals.LRUCacheEntry.(LRUCacheEntry.java:53)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:160)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:36)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:114)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:124)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
> at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8042) Kafka Streams creates many segment stores on state restore

2019-03-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798432#comment-16798432
 ] 

Matthias J. Sax commented on KAFKA-8042:


Is this different to KAFKA-7934 or a duplicate?

> Kafka Streams creates many segment stores on state restore
> --
>
> Key: KAFKA-8042
> URL: https://issues.apache.org/jira/browse/KAFKA-8042
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Adrian McCague
>Priority: Major
> Attachments: StateStoreSegments-StreamsConfig.txt
>
>
> Note that this is from the perspective of one instance of an application, 
> where there are 8 instances total, with partition count 8 for all topics and 
> of course stores. Standby replicas = 1.
> In the process there are multiple instances of {{KafkaStreams}} so the below 
> detail is from one of these.
> h2. Actual Behaviour
> During state restore of an application, many segment stores are created (I am 
> using MANIFEST files as a marker since they preallocate 4MB each). As can be 
> seen this topology has 5 joins - which is the extent of its state.
> {code:java}
> bash-4.2# pwd
> /data/fooapp/0_7
> bash-4.2# for dir in $(find . -maxdepth 1 -type d); do echo "${dir}: $(find 
> ${dir} -type f -name 'MANIFEST-*' -printf x | wc -c)"; done
> .: 8058
> ./KSTREAM-JOINOTHER-25-store: 851
> ./KSTREAM-JOINOTHER-40-store: 819
> ./KSTREAM-JOINTHIS-24-store: 851
> ./KSTREAM-JOINTHIS-29-store: 836
> ./KSTREAM-JOINOTHER-35-store: 819
> ./KSTREAM-JOINOTHER-30-store: 819
> ./KSTREAM-JOINOTHER-45-store: 745
> ./KSTREAM-JOINTHIS-39-store: 819
> ./KSTREAM-JOINTHIS-44-store: 685
> ./KSTREAM-JOINTHIS-34-store: 819
> There are many (x800 as above) of these segment files:
> ./KSTREAM-JOINOTHER-25-store.155146629
> ./KSTREAM-JOINOTHER-25-store.155155902
> ./KSTREAM-JOINOTHER-25-store.155149269
> ./KSTREAM-JOINOTHER-25-store.155154879
> ./KSTREAM-JOINOTHER-25-store.155169861
> ./KSTREAM-JOINOTHER-25-store.155153064
> ./KSTREAM-JOINOTHER-25-store.155148444
> ./KSTREAM-JOINOTHER-25-store.155155671
> ./KSTREAM-JOINOTHER-25-store.155168673
> ./KSTREAM-JOINOTHER-25-store.155159565
> ./KSTREAM-JOINOTHER-25-store.155175735
> ./KSTREAM-JOINOTHER-25-store.155168574
> ./KSTREAM-JOINOTHER-25-store.155163525
> ./KSTREAM-JOINOTHER-25-store.155165241
> ./KSTREAM-JOINOTHER-25-store.155146662
> ./KSTREAM-JOINOTHER-25-store.155178177
> ./KSTREAM-JOINOTHER-25-store.155158740
> ./KSTREAM-JOINOTHER-25-store.155168145
> ./KSTREAM-JOINOTHER-25-store.155166231
> ./KSTREAM-JOINOTHER-25-store.155172171
> ./KSTREAM-JOINOTHER-25-store.155175075
> ./KSTREAM-JOINOTHER-25-store.155163096
> ./KSTREAM-JOINOTHER-25-store.155161512
> ./KSTREAM-JOINOTHER-25-store.155179233
> ./KSTREAM-JOINOTHER-25-store.155146266
> ./KSTREAM-JOINOTHER-25-store.155153691
> ./KSTREAM-JOINOTHER-25-store.155159235
> ./KSTREAM-JOINOTHER-25-store.155152734
> ./KSTREAM-JOINOTHER-25-store.155160687
> ./KSTREAM-JOINOTHER-25-store.155174415
> ./KSTREAM-JOINOTHER-25-store.155150820
> ./KSTREAM-JOINOTHER-25-store.155148642
> ... etc
> {code}
> Once re-balancing and state restoration is complete - the redundant segment 
> files are deleted and the segment count drops to 508 total (where the above 
> mentioned state directory is one of many).
> We have seen the number of these segment stores grow to as many as 15000 over 
> the baseline 508 which can fill smaller volumes. *This means that a state 
> volume that would normally have ~300MB total disk usage would use in excess 
> of 30GB during rebalancing*, mostly preallocated MANIFEST files.
> h2. Expected Behaviour
> For this particular application we expect 508 segment folders total to be 
> active and existing throughout rebalancing. Give or take migrated tasks that 
> are subject to the {{state.cleanup.delay.ms}}.
> h2. Preliminary investigation
> * This does not appear to be the case in v1.1.0. With our application the 
> number of state directories only grows to 670 (over the base line 508)
> * The MANIFEST files were not preallocated to 4MB in v1.1.0 they are now in 
> v2.1.x, this appears to be expected RocksDB behaviour, but exacerbates the 
> many segment stores.
> * Suspect https://github.com/apache/kafka/pull/5253 to be the source of this 
> change of behaviour.
> A workaround is to use {{rocksdb

[jira] [Commented] (KAFKA-8042) Kafka Streams creates many segment stores on state restore

2019-03-21 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798477#comment-16798477
 ] 

Sophie Blee-Goldman commented on KAFKA-8042:


I think KAFKA-7934 would certainly fix this issue, but it seems like an 
independent matter to address why all these segments exist at the same time 
throughout rebalancing (rather than being created and expired one at a time).

 

That said, I was checking out the 2.1 branch and it seems like this shouldn't 
be an issue anymore? During restore we first fast-forward through all records 
to get the max timestamp, and then only create segments that are within a 
retention period of this observed stream time..

> Kafka Streams creates many segment stores on state restore
> --
>
> Key: KAFKA-8042
> URL: https://issues.apache.org/jira/browse/KAFKA-8042
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Adrian McCague
>Priority: Major
> Attachments: StateStoreSegments-StreamsConfig.txt
>
>
> Note that this is from the perspective of one instance of an application, 
> where there are 8 instances total, with partition count 8 for all topics and 
> of course stores. Standby replicas = 1.
> In the process there are multiple instances of {{KafkaStreams}} so the below 
> detail is from one of these.
> h2. Actual Behaviour
> During state restore of an application, many segment stores are created (I am 
> using MANIFEST files as a marker since they preallocate 4MB each). As can be 
> seen this topology has 5 joins - which is the extent of its state.
> {code:java}
> bash-4.2# pwd
> /data/fooapp/0_7
> bash-4.2# for dir in $(find . -maxdepth 1 -type d); do echo "${dir}: $(find 
> ${dir} -type f -name 'MANIFEST-*' -printf x | wc -c)"; done
> .: 8058
> ./KSTREAM-JOINOTHER-25-store: 851
> ./KSTREAM-JOINOTHER-40-store: 819
> ./KSTREAM-JOINTHIS-24-store: 851
> ./KSTREAM-JOINTHIS-29-store: 836
> ./KSTREAM-JOINOTHER-35-store: 819
> ./KSTREAM-JOINOTHER-30-store: 819
> ./KSTREAM-JOINOTHER-45-store: 745
> ./KSTREAM-JOINTHIS-39-store: 819
> ./KSTREAM-JOINTHIS-44-store: 685
> ./KSTREAM-JOINTHIS-34-store: 819
> There are many (x800 as above) of these segment files:
> ./KSTREAM-JOINOTHER-25-store.155146629
> ./KSTREAM-JOINOTHER-25-store.155155902
> ./KSTREAM-JOINOTHER-25-store.155149269
> ./KSTREAM-JOINOTHER-25-store.155154879
> ./KSTREAM-JOINOTHER-25-store.155169861
> ./KSTREAM-JOINOTHER-25-store.155153064
> ./KSTREAM-JOINOTHER-25-store.155148444
> ./KSTREAM-JOINOTHER-25-store.155155671
> ./KSTREAM-JOINOTHER-25-store.155168673
> ./KSTREAM-JOINOTHER-25-store.155159565
> ./KSTREAM-JOINOTHER-25-store.155175735
> ./KSTREAM-JOINOTHER-25-store.155168574
> ./KSTREAM-JOINOTHER-25-store.155163525
> ./KSTREAM-JOINOTHER-25-store.155165241
> ./KSTREAM-JOINOTHER-25-store.155146662
> ./KSTREAM-JOINOTHER-25-store.155178177
> ./KSTREAM-JOINOTHER-25-store.155158740
> ./KSTREAM-JOINOTHER-25-store.155168145
> ./KSTREAM-JOINOTHER-25-store.155166231
> ./KSTREAM-JOINOTHER-25-store.155172171
> ./KSTREAM-JOINOTHER-25-store.155175075
> ./KSTREAM-JOINOTHER-25-store.155163096
> ./KSTREAM-JOINOTHER-25-store.155161512
> ./KSTREAM-JOINOTHER-25-store.155179233
> ./KSTREAM-JOINOTHER-25-store.155146266
> ./KSTREAM-JOINOTHER-25-store.155153691
> ./KSTREAM-JOINOTHER-25-store.155159235
> ./KSTREAM-JOINOTHER-25-store.155152734
> ./KSTREAM-JOINOTHER-25-store.155160687
> ./KSTREAM-JOINOTHER-25-store.155174415
> ./KSTREAM-JOINOTHER-25-store.155150820
> ./KSTREAM-JOINOTHER-25-store.155148642
> ... etc
> {code}
> Once re-balancing and state restoration is complete - the redundant segment 
> files are deleted and the segment count drops to 508 total (where the above 
> mentioned state directory is one of many).
> We have seen the number of these segment stores grow to as many as 15000 over 
> the baseline 508 which can fill smaller volumes. *This means that a state 
> volume that would normally have ~300MB total disk usage would use in excess 
> of 30GB during rebalancing*, mostly preallocated MANIFEST files.
> h2. Expected Behaviour
> For this particular application we expect 508 segment folders total to be 
> active and existing throughout rebalancing. Give or take migrated tasks that 
> are subject to the {{state.cleanup.delay.ms}}.
> h2. Preliminary investig

[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer

2019-03-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798528#comment-16798528
 ] 

Matthias J. Sax commented on KAFKA-7996:


How would this config and `KafakStreams#close(timeout)` relate to each other? 
Not saying we should not add a config, just want to understand it better. What 
is the advantage compared to alternatives? Could we use 
`KafkaStreams#close(timeout)` and pass it to the internal clients instead 
(advantages/disadvantages)? Maybe those question could also be part of a KIP 
discussion. I think, it still don't fully understand the tradeoffs yet for 
different approaches.

> KafkaStreams does not pass timeout when closing Producer
> 
>
> Key: KAFKA-7996
> URL: https://issues.apache.org/jira/browse/KAFKA-7996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Patrik Kleindl
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
>
> [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/]
> We are running 2.1 and have a case where the shutdown of a streams 
> application takes several minutes
> I noticed that although we call streams.close with a timeout of 30 seconds 
> the log says
> [Producer 
> clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> Matthias J Sax [vor 3 Tagen]
> I just checked the code, and yes, we don't provide a timeout for the producer 
> on close()...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8143) Kafka-Streams GlobalStore cannot be read after application restart

2019-03-21 Thread Luke Stephenson (JIRA)
Luke Stephenson created KAFKA-8143:
--

 Summary: Kafka-Streams GlobalStore cannot be read after 
application restart
 Key: KAFKA-8143
 URL: https://issues.apache.org/jira/browse/KAFKA-8143
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.1
Reporter: Luke Stephenson


I've created a small example application which has a trivial `Processor` which 
takes messages and stores the length of the String value rather than the value 
itself.

That is, the following setup:

 
{code:java}
Topic[String, String]
Processor[String, String]
KeyValueStore[String, Long] // Note the Store persists Long values
{code}
 

The example application also has a Thread which periodically displays all 
values in the KeyValueStore.

While the application is run, I can publish values to the topic with:
{code:java}
root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property 
"parse.key=true" --property "key.separator=:" --broker-list localhost:9092 
--topic test.topic
>1:hello
>2:abc{code}
And the background Thread will report the values persisted to the key value 
store.

If the application is restarted, when attempting to read from the KeyValueStore 
it will fail.  It attempts to recover the state from the persistent RocksDB 
store which fails with:
{code:java}
org.apache.kafka.common.errors.SerializationException: Size of data received by 
LongDeserializer is not 8{code}
(Note there is no stack trace as SerializationException has disabled it.)

Debugging appears to reveal that the original data from the Topic is being 
restored rather than what was modified by the processor.

I've created a minimal example to show the issue at 
[https://github.com/lukestephenson/kafka-streams-example]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8143) Kafka-Streams GlobalStore cannot be read after application restart

2019-03-21 Thread Luke Stephenson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Stephenson updated KAFKA-8143:
---
Description: 
I've created a small example application which has a trivial `Processor` which 
takes messages and stores the length of the String value rather than the value 
itself.

That is, the following setup:
{code:java}
Topic[String, String]
Processor[String, String]
KeyValueStore[String, Long] // Note the Store persists Long values
{code}
 

The example application also has a Thread which periodically displays all 
values in the KeyValueStore.

While the application is run, I can publish values to the topic with:
{code:java}
root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property 
"parse.key=true" --property "key.separator=:" --broker-list localhost:9092 
--topic test.topic
>1:hello
>2:abc{code}
And the background Thread will report the values persisted to the key value 
store.

If the application is restarted, when attempting to read from the KeyValueStore 
it will fail.  It attempts to recover the state from the persistent RocksDB 
store which fails with:
{code:java}
org.apache.kafka.common.errors.SerializationException: Size of data received by 
LongDeserializer is not 8{code}
(Note there is no stack trace as SerializationException has disabled it.)

Debugging appears to reveal that the original data from the Topic is being 
restored rather than what was modified by the processor.

I've created a minimal example to show the issue at 
[https://github.com/lukestephenson/kafka-streams-example]

  was:
I've created a small example application which has a trivial `Processor` which 
takes messages and stores the length of the String value rather than the value 
itself.

That is, the following setup:

 
{code:java}
Topic[String, String]
Processor[String, String]
KeyValueStore[String, Long] // Note the Store persists Long values
{code}
 

The example application also has a Thread which periodically displays all 
values in the KeyValueStore.

While the application is run, I can publish values to the topic with:
{code:java}
root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property 
"parse.key=true" --property "key.separator=:" --broker-list localhost:9092 
--topic test.topic
>1:hello
>2:abc{code}
And the background Thread will report the values persisted to the key value 
store.

If the application is restarted, when attempting to read from the KeyValueStore 
it will fail.  It attempts to recover the state from the persistent RocksDB 
store which fails with:
{code:java}
org.apache.kafka.common.errors.SerializationException: Size of data received by 
LongDeserializer is not 8{code}
(Note there is no stack trace as SerializationException has disabled it.)

Debugging appears to reveal that the original data from the Topic is being 
restored rather than what was modified by the processor.

I've created a minimal example to show the issue at 
[https://github.com/lukestephenson/kafka-streams-example]


> Kafka-Streams GlobalStore cannot be read after application restart
> --
>
> Key: KAFKA-8143
> URL: https://issues.apache.org/jira/browse/KAFKA-8143
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Luke Stephenson
>Priority: Major
>
> I've created a small example application which has a trivial `Processor` 
> which takes messages and stores the length of the String value rather than 
> the value itself.
> That is, the following setup:
> {code:java}
> Topic[String, String]
> Processor[String, String]
> KeyValueStore[String, Long] // Note the Store persists Long values
> {code}
>  
> The example application also has a Thread which periodically displays all 
> values in the KeyValueStore.
> While the application is run, I can publish values to the topic with:
> {code:java}
> root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property 
> "parse.key=true" --property "key.separator=:" --broker-list localhost:9092 
> --topic test.topic
> >1:hello
> >2:abc{code}
> And the background Thread will report the values persisted to the key value 
> store.
> If the application is restarted, when attempting to read from the 
> KeyValueStore it will fail.  It attempts to recover the state from the 
> persistent RocksDB store which fails with:
> {code:java}
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by LongDeserializer is not 8{code}
> (Note there is no stack trace as SerializationException has disabled it.)
> Debugging appears to reveal that the original data from the Topic is being 
> restored rather than what was modified by the processor.
> I've created a minimal example to show the issue at 
> [https://github.com/lukestephenson/kafka-streams-example]



--
This message was sen

[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-03-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798572#comment-16798572
 ] 

Matthias J. Sax commented on KAFKA-7965:


[https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/69/tests]

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8143) Kafka-Streams GlobalStore cannot be read after application restart

2019-03-21 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798573#comment-16798573
 ] 

Tommy Becker commented on KAFKA-8143:
-

I think this is possibly due to 
https://issues.apache.org/jira/browse/KAFKA-7663, which renders 
addGlobalStore() useless for its intended purpose.

> Kafka-Streams GlobalStore cannot be read after application restart
> --
>
> Key: KAFKA-8143
> URL: https://issues.apache.org/jira/browse/KAFKA-8143
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Luke Stephenson
>Priority: Major
>
> I've created a small example application which has a trivial `Processor` 
> which takes messages and stores the length of the String value rather than 
> the value itself.
> That is, the following setup:
> {code:java}
> Topic[String, String]
> Processor[String, String]
> KeyValueStore[String, Long] // Note the Store persists Long values
> {code}
>  
> The example application also has a Thread which periodically displays all 
> values in the KeyValueStore.
> While the application is run, I can publish values to the topic with:
> {code:java}
> root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property 
> "parse.key=true" --property "key.separator=:" --broker-list localhost:9092 
> --topic test.topic
> >1:hello
> >2:abc{code}
> And the background Thread will report the values persisted to the key value 
> store.
> If the application is restarted, when attempting to read from the 
> KeyValueStore it will fail.  It attempts to recover the state from the 
> persistent RocksDB store which fails with:
> {code:java}
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by LongDeserializer is not 8{code}
> (Note there is no stack trace as SerializationException has disabled it.)
> Debugging appears to reveal that the original data from the Topic is being 
> restored rather than what was modified by the processor.
> I've created a minimal example to show the issue at 
> [https://github.com/lukestephenson/kafka-streams-example]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-03-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798577#comment-16798577
 ] 

Matthias J. Sax commented on KAFKA-7965:


[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/392/tests]

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7243) Add unit integration tests to validate metrics in Kafka Streams

2019-03-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798580#comment-16798580
 ] 

ASF GitHub Bot commented on KAFKA-7243:
---

guozhangwang commented on pull request #6080: KAFKA-7243: Add unit integration 
tests to validate metrics in Kafka Streams
URL: https://github.com/apache/kafka/pull/6080
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add unit integration tests to validate metrics in Kafka Streams
> ---
>
> Key: KAFKA-7243
> URL: https://issues.apache.org/jira/browse/KAFKA-7243
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: integration-test, newbie++
>
> We should add an integration test for Kafka Streams, that validates:
> 1. After streams application are started, all metrics from different levels 
> (thread, task, processor, store, cache) are correctly created and displaying 
> recorded values.
> 2. When streams application are shutdown, all metrics are correctly 
> de-registered and removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8143) Kafka-Streams GlobalStore cannot be read after application restart

2019-03-21 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8143.

Resolution: Duplicate

Thanks for reporting this [~lukestephenson]! Closing this as duplicated. It's 
not a bug, but a limitation in the design.

The processor of a global store, is supposed to only load the data unmodified. 
Actually Processing is not supported atm.

> Kafka-Streams GlobalStore cannot be read after application restart
> --
>
> Key: KAFKA-8143
> URL: https://issues.apache.org/jira/browse/KAFKA-8143
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Luke Stephenson
>Priority: Major
>
> I've created a small example application which has a trivial `Processor` 
> which takes messages and stores the length of the String value rather than 
> the value itself.
> That is, the following setup:
> {code:java}
> Topic[String, String]
> Processor[String, String]
> KeyValueStore[String, Long] // Note the Store persists Long values
> {code}
>  
> The example application also has a Thread which periodically displays all 
> values in the KeyValueStore.
> While the application is run, I can publish values to the topic with:
> {code:java}
> root@kafka:/opt/kafka# bin/kafka-console-producer.sh --property 
> "parse.key=true" --property "key.separator=:" --broker-list localhost:9092 
> --topic test.topic
> >1:hello
> >2:abc{code}
> And the background Thread will report the values persisted to the key value 
> store.
> If the application is restarted, when attempting to read from the 
> KeyValueStore it will fail.  It attempts to recover the state from the 
> persistent RocksDB store which fails with:
> {code:java}
> org.apache.kafka.common.errors.SerializationException: Size of data received 
> by LongDeserializer is not 8{code}
> (Note there is no stack trace as SerializationException has disabled it.)
> Debugging appears to reveal that the original data from the Topic is being 
> restored rather than what was modified by the processor.
> I've created a minimal example to show the issue at 
> [https://github.com/lukestephenson/kafka-streams-example]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7243) Add unit integration tests to validate metrics in Kafka Streams

2019-03-21 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798581#comment-16798581
 ] 

Guozhang Wang commented on KAFKA-7243:
--

[~Khairy] Thanks for your contribution!

> Add unit integration tests to validate metrics in Kafka Streams
> ---
>
> Key: KAFKA-7243
> URL: https://issues.apache.org/jira/browse/KAFKA-7243
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Khaireddine Rezgui
>Priority: Major
>  Labels: integration-test, newbie++
> Fix For: 2.3.0
>
>
> We should add an integration test for Kafka Streams, that validates:
> 1. After streams application are started, all metrics from different levels 
> (thread, task, processor, store, cache) are correctly created and displaying 
> recorded values.
> 2. When streams application are shutdown, all metrics are correctly 
> de-registered and removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8042) Kafka Streams creates many segment stores on state restore

2019-03-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798585#comment-16798585
 ] 

Matthias J. Sax commented on KAFKA-8042:


[~ableegoldman] From my understanding, the fast forwarding you mention is only 
done for a batch or records. If there is a larger backlog to restore, this 
would only provide a small lookahead and maybe skip over creating some 
segments. However, it believe that we need KAFKA-7934 for a  proper fix to do a 
"full look ahead" that allows us to not create any old segments at all.

I am just wondering if this ticket is a duplicate of KAFKA-7934? It would be 
great if [~amccague] could confirm this. If it's not a duplication, we should 
document the difference explicitly.

> Kafka Streams creates many segment stores on state restore
> --
>
> Key: KAFKA-8042
> URL: https://issues.apache.org/jira/browse/KAFKA-8042
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Adrian McCague
>Priority: Major
> Attachments: StateStoreSegments-StreamsConfig.txt
>
>
> Note that this is from the perspective of one instance of an application, 
> where there are 8 instances total, with partition count 8 for all topics and 
> of course stores. Standby replicas = 1.
> In the process there are multiple instances of {{KafkaStreams}} so the below 
> detail is from one of these.
> h2. Actual Behaviour
> During state restore of an application, many segment stores are created (I am 
> using MANIFEST files as a marker since they preallocate 4MB each). As can be 
> seen this topology has 5 joins - which is the extent of its state.
> {code:java}
> bash-4.2# pwd
> /data/fooapp/0_7
> bash-4.2# for dir in $(find . -maxdepth 1 -type d); do echo "${dir}: $(find 
> ${dir} -type f -name 'MANIFEST-*' -printf x | wc -c)"; done
> .: 8058
> ./KSTREAM-JOINOTHER-25-store: 851
> ./KSTREAM-JOINOTHER-40-store: 819
> ./KSTREAM-JOINTHIS-24-store: 851
> ./KSTREAM-JOINTHIS-29-store: 836
> ./KSTREAM-JOINOTHER-35-store: 819
> ./KSTREAM-JOINOTHER-30-store: 819
> ./KSTREAM-JOINOTHER-45-store: 745
> ./KSTREAM-JOINTHIS-39-store: 819
> ./KSTREAM-JOINTHIS-44-store: 685
> ./KSTREAM-JOINTHIS-34-store: 819
> There are many (x800 as above) of these segment files:
> ./KSTREAM-JOINOTHER-25-store.155146629
> ./KSTREAM-JOINOTHER-25-store.155155902
> ./KSTREAM-JOINOTHER-25-store.155149269
> ./KSTREAM-JOINOTHER-25-store.155154879
> ./KSTREAM-JOINOTHER-25-store.155169861
> ./KSTREAM-JOINOTHER-25-store.155153064
> ./KSTREAM-JOINOTHER-25-store.155148444
> ./KSTREAM-JOINOTHER-25-store.155155671
> ./KSTREAM-JOINOTHER-25-store.155168673
> ./KSTREAM-JOINOTHER-25-store.155159565
> ./KSTREAM-JOINOTHER-25-store.155175735
> ./KSTREAM-JOINOTHER-25-store.155168574
> ./KSTREAM-JOINOTHER-25-store.155163525
> ./KSTREAM-JOINOTHER-25-store.155165241
> ./KSTREAM-JOINOTHER-25-store.155146662
> ./KSTREAM-JOINOTHER-25-store.155178177
> ./KSTREAM-JOINOTHER-25-store.155158740
> ./KSTREAM-JOINOTHER-25-store.155168145
> ./KSTREAM-JOINOTHER-25-store.155166231
> ./KSTREAM-JOINOTHER-25-store.155172171
> ./KSTREAM-JOINOTHER-25-store.155175075
> ./KSTREAM-JOINOTHER-25-store.155163096
> ./KSTREAM-JOINOTHER-25-store.155161512
> ./KSTREAM-JOINOTHER-25-store.155179233
> ./KSTREAM-JOINOTHER-25-store.155146266
> ./KSTREAM-JOINOTHER-25-store.155153691
> ./KSTREAM-JOINOTHER-25-store.155159235
> ./KSTREAM-JOINOTHER-25-store.155152734
> ./KSTREAM-JOINOTHER-25-store.155160687
> ./KSTREAM-JOINOTHER-25-store.155174415
> ./KSTREAM-JOINOTHER-25-store.155150820
> ./KSTREAM-JOINOTHER-25-store.155148642
> ... etc
> {code}
> Once re-balancing and state restoration is complete - the redundant segment 
> files are deleted and the segment count drops to 508 total (where the above 
> mentioned state directory is one of many).
> We have seen the number of these segment stores grow to as many as 15000 over 
> the baseline 508 which can fill smaller volumes. *This means that a state 
> volume that would normally have ~300MB total disk usage would use in excess 
> of 30GB during rebalancing*, mostly preallocated MANIFEST files.
> h2. Expected Behaviour
> For this particular application we expect 508 segment folders total to be 
> active and existing throughout rebalancing. Give or take migrated tasks that 
> are subject to the 

[jira] [Commented] (KAFKA-7647) Flaky test LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic

2019-03-21 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798598#comment-16798598
 ] 

Matthias J. Sax commented on KAFKA-7647:


Failed again: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3402/]

> Flaky test 
> LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic
> -
>
> Key: KAFKA-7647
> URL: https://issues.apache.org/jira/browse/KAFKA-7647
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Dong Lin
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> {code}
> kafka.log.LogCleanerParameterizedIntegrationTest >
> testCleansCombinedCompactAndDeleteTopic[3] FAILED
>     java.lang.AssertionError: Contents of the map shouldn't change
> expected: (340,340), 5 -> (345,345), 10 -> (350,350), 14 ->
> (354,354), 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353),
> 2 -> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 8 ->
> (348,348), 19 -> (359,359), 4 -> (344,344), 15 -> (355,355))> but
> was: (340,340), 5 -> (345,345), 10 -> (350,350), 14 -> (354,354),
> 1 -> (341,341), 6 -> (346,346), 9 -> (349,349), 13 -> (353,353), 2 ->
> (342,342), 17 -> (357,357), 12 -> (352,352), 7 -> (347,347), 3 ->
> (343,343), 18 -> (358,358), 16 -> (356,356), 11 -> (351,351), 99 ->
> (299,299), 8 -> (348,348), 19 -> (359,359), 4 -> (344,344), 15 ->
> (355,355))>
>         at org.junit.Assert.fail(Assert.java:88)
>         at org.junit.Assert.failNotEquals(Assert.java:834)
>         at org.junit.Assert.assertEquals(Assert.java:118)
>         at
> kafka.log.LogCleanerParameterizedIntegrationTest.testCleansCombinedCompactAndDeleteTopic(LogCleanerParameterizedIntegrationTest.scala:129)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log

2019-03-21 Thread huxihx (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798686#comment-16798686
 ] 

huxihx commented on KAFKA-8100:
---

Since `UNKNOWN_TOPIC_OR_PARTITION` is a retriable exception, it's hard to tell 
whether the topic is deleted permanently or just not included in the broker 
metadata cache temporarily.

> If delete expired topic, kafka consumer will keep flushing unknown_topic 
> warning in log
> ---
>
> Key: KAFKA-8100
> URL: https://issues.apache.org/jira/browse/KAFKA-8100
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1, 2.1.1
>Reporter: Shengnan YU
>Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("metadata.max.age.ms", "6");
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
> private KafkaConsumer consumer;
> public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
> this.consumer = kafkaConsumer;
> }
> public void onPartitionsRevoked(Collection 
> partitions) {
> }
> public void onPartitionsAssigned(Collection 
> partitions) {
> //reading all partitions from the beginning
> consumer.seekToBeginning(partitions);
> }
> }
> consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
> while (true) {
> ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records) {
> System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8100) If delete expired topic, kafka consumer will keep flushing unknown_topic warning in log

2019-03-21 Thread Shengnan YU (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798695#comment-16798695
 ] 

Shengnan YU commented on KAFKA-8100:


Thanks for replying. However if the topic is permanent deleted, the warning log 
will keep flushing. Why not make it configurable to let the topic metadata 
expiry after a period?

> If delete expired topic, kafka consumer will keep flushing unknown_topic 
> warning in log
> ---
>
> Key: KAFKA-8100
> URL: https://issues.apache.org/jira/browse/KAFKA-8100
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1, 2.1.1
>Reporter: Shengnan YU
>Priority: Major
>
> Recently we used flink to consume kafka topics with a regex pattern. It is 
> found that when we deleted some unused topics, the logs will keep flushing 
> UNKNOWN_TOPIC_EXCEPTION.
> I study the source code of kafka client, it is found that for consumer, 
> topicExpiry is disable in Metadata, which leads to that the even the topic 
> deleted, the client still have this topic info in the metadata's topic list 
> and keep fetching from servers.
> Is there any good method to avoid this annoying warning logs without modify 
> the kafka's source code? (We still need the 'Real' Unknown topic exception, 
> which means not the outdated topic, in logs)
> The following code can be used to reproduce this problem (if you create 
> multiple topics such as "test1", "test2", "test3"..."testn" in kafka cluster 
> and then delete any of one while running).
> {code:java}
> public static void main(String [] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092\n");
> props.put("group.id", "test10");
> props.put("enable.auto.commit", "true");
> props.put("auto.commit.interval.ms", "1000");
> props.put("auto.offset.reset", "earliest");
> props.put("key.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("value.deserializer", 
> "org.apache.kafka.common.serialization.StringDeserializer");
> props.put("metadata.max.age.ms", "6");
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> class PartitionOffsetAssignerListener implements 
> ConsumerRebalanceListener {
> private KafkaConsumer consumer;
> public PartitionOffsetAssignerListener(KafkaConsumer 
> kafkaConsumer) {
> this.consumer = kafkaConsumer;
> }
> public void onPartitionsRevoked(Collection 
> partitions) {
> }
> public void onPartitionsAssigned(Collection 
> partitions) {
> //reading all partitions from the beginning
> consumer.seekToBeginning(partitions);
> }
> }
> consumer.subscribe(Pattern.compile("^test.*$"), new 
> PartitionOffsetAssignerListener(consumer));
> while (true) {
> ConsumerRecords records = consumer.poll(100);
> for (ConsumerRecord record : records) {
> System.out.printf("offset = %d, key = %s, value = %s%n", 
> record.offset(), record.key(), record.value());
> }
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8144) Flaky Test ControllerIntegrationTest#testMetadataPropagationOnControlPlane

2019-03-21 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8144:
--

 Summary: Flaky Test 
ControllerIntegrationTest#testMetadataPropagationOnControlPlane
 Key: KAFKA-8144
 URL: https://issues.apache.org/jira/browse/KAFKA-8144
 Project: Kafka
  Issue Type: Bug
  Components: core, unit tests
Affects Versions: 2.3.0
Reporter: Matthias J. Sax
 Fix For: 2.3.0


[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3485/tests]
{quote}java.lang.AssertionError: expected:<1.0> but was:<0.0>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotEquals(Assert.java:835)
at org.junit.Assert.assertEquals(Assert.java:555)
at org.junit.Assert.assertEquals(Assert.java:685)
at 
kafka.controller.ControllerIntegrationTest.testMetadataPropagationOnControlPlane(ControllerIntegrationTest.scala:105){quote}
STDOUT
{quote}[2019-03-22 02:42:56,725] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=0] Error for partition t-0 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-03-22 02:43:00,875] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
fetcherId=0] Error for partition test-0 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-03-22 02:43:00,876] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=0] Error for partition test-0 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-03-22 02:43:25,090] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Error for partition t-0 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-03-22 02:43:32,102] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Error for partition t-0 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-03-22 02:43:34,073] ERROR [Controller id=0] Error completing preferred 
replica leader election for partition t-0 (kafka.controller.KafkaController:76)
kafka.common.StateChangeFailedException: Failed to elect leader for partition 
t-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy
at 
kafka.controller.PartitionStateMachine.$anonfun$doElectLeaderForPartitions$9(PartitionStateMachine.scala:390)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
kafka.controller.PartitionStateMachine.doElectLeaderForPartitions(PartitionStateMachine.scala:388)
at 
kafka.controller.PartitionStateMachine.electLeaderForPartitions(PartitionStateMachine.scala:315)
at 
kafka.controller.PartitionStateMachine.doHandleStateChanges(PartitionStateMachine.scala:225)
at 
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:141)
at 
kafka.controller.KafkaController.kafka$controller$KafkaController$$onPreferredReplicaElection(KafkaController.scala:649)
at 
kafka.controller.KafkaController$PreferredReplicaLeaderElection.handleProcess(KafkaController.scala:1597)
at 
kafka.controller.PreemptableControllerEvent.process(KafkaController.scala:1809)
at 
kafka.controller.PreemptableControllerEvent.process$(KafkaController.scala:1807)
at 
kafka.controller.KafkaController$PreferredReplicaLeaderElection.process(KafkaController.scala:1551)
at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:95)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:95)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)
[2019-03-22 02:43:41,232] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Error for partition t-0 at offset 0 
(kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
does not host this topic-partition.
[2019-03-22 02:43:53,465] ERROR [ReplicaStateMachine controllerId=0] Controller 
moved to another broker when moving some replicas to OfflineReplica state 
(kafka.controller.ReplicaStateMachine:76)
org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 1
[2019-03-22 02:43:53,467] INFO [ControllerEventThread controllerId=0] 
Control

[jira] [Commented] (KAFKA-8142) Kafka Streams fails with NPE if record contains null-value in header

2019-03-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16798770#comment-16798770
 ] 

ASF GitHub Bot commented on KAFKA-8142:
---

mjsax commented on pull request #6484: KAFKA-8142: Fix NPE for nulls in Headers
URL: https://github.com/apache/kafka/pull/6484
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams fails with NPE if record contains null-value in header
> 
>
> Key: KAFKA-8142
> URL: https://issues.apache.org/jira/browse/KAFKA-8142
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>
> [2019-03-14 13:14:49,756] ERROR stream-thread 
> [-2-1-1f30b4e6-1204-4aa7-9426-a395ab06ad64-StreamThread-2] 
> Failed to process stream task 9_2 due to the following error: 
> (org.apache.kafka.str
> eams.processor.internals.AssignedStreamsTasks)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.processor.internals.ProcessorRecordContext.sizeBytes(ProcessorRecordContext.java:93)
> at 
> org.apache.kafka.streams.state.internals.ContextualRecord.sizeBytes(ContextualRecord.java:42)
> at 
> org.apache.kafka.streams.state.internals.LRUCacheEntry.(LRUCacheEntry.java:53)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:160)
> at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.put(CachingWindowStore.java:36)
> at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.put(MeteredWindowStore.java:114)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate$KStreamWindowAggregateProcessor.process(KStreamWindowAggregate.java:124)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:93)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:351)
> at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)