[jira] [Commented] (KAFKA-7927) Read committed receives aborted events

2019-02-15 Thread Gabor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769059#comment-16769059
 ] 

Gabor Somogyi commented on KAFKA-7927:
--

[~huxi_2b] that's a good point, will retest it...
As a side note most probably I'm not the only one who will try this not 
supported scenario, it worth a warning to give this info.


> Read committed receives aborted events
> --
>
> Key: KAFKA-7927
> URL: https://issues.apache.org/jira/browse/KAFKA-7927
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 1.0.0
>Reporter: Gabor Somogyi
>Priority: Blocker
> Attachments: KafkaProducer.scala, consumer.log, producer.log.gz
>
>
> When a kafka client produces ~30k events and at the end it aborts the 
> transaction a consumer can read part of the aborted messages when 
> "isolation.level" set to "READ_COMMITTED".
> Kafka client version: 2.0.0
> Kafka broker version: 1.0.0
> Producer:
> {code:java}
> java -jar 
> kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
> gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic
> {code}
> See attached code.
> Consumer:
> {code:java}
> kafka-console-consumer --zookeeper localhost:2181 --topic src-topic 
> --from-beginning --isolation-level read_committed
> {code}
> Same behavior seen when re-implemented the consumer in scala.
> The whole application can be found here: 
> https://github.com/gaborgsomogyi/kafka-semantics-tester



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7880) KafkaConnect should standardize worker thread name

2019-02-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769060#comment-16769060
 ] 

ASF GitHub Bot commented on KAFKA-7880:
---

FrLy commented on pull request #6273: KAFKA-7880:Naming worker thread by 
task id
URL: https://github.com/apache/kafka/pull/6273
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConnect should standardize worker thread name
> --
>
> Key: KAFKA-7880
> URL: https://issues.apache.org/jira/browse/KAFKA-7880
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.1.0
>Reporter: YeLiang
>Assignee: YeLiang
>Priority: Minor
>
> KafkaConnect will create a WorkerTask for tasks assigned to it and then 
> submit tasks to a thread pool.
> However,the 
> [Worker|https://github.com/apache/kafka/blob/2.1.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java]
>  class initializes its thread pool using a default ThreadFactory.So the 
> thread name will have a pattern pool-[0-9]\-thread\-[0-9].
> When we are running KafkaConnect and find that one of the task thread is 
> under high CPU usage, it is difficult for us to find out which task is under 
> high load becasue when we print out the stack of KafkaConnect, we can only 
> see a list of threads name pool-[0-9]\-thread\-[0-9] even if we can know the 
> exact pid of the high CPU usage thread
> If worker threads name will be named like connectorName-taskId, it will be 
> very helpful



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7927) Read committed receives aborted events

2019-02-15 Thread huxihx (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769021#comment-16769021
 ] 

huxihx edited comment on KAFKA-7927 at 2/15/19 8:31 AM:


[~gsomogyi] The console consumer you were using is the Scala consumer which 
does not support transaction, therefore it ignored the given isolation level. 
Could you retry the scenario using the new consumer? For example, issue  
"kafka-console-consumer --bootstrap-server 
gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 --topic src-topic 
--from-beginning --isolation-level read_committed" and resend the to-be-aborted 
messages to check if they are not visible to the consumer.


was (Author: huxi_2b):
[~gsomogyi] The console consumer you were using is the Scala consumer which 
does not support transaction, therefore it ignored the given isolation level. 
Could you retry the scenario using the new consumer? For example, issue  
"kafka-console-consumer --zookeeper 
gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 --topic src-topic 
--from-beginning --isolation-level read_committed" and resend the to-be-aborted 
messages to check if they are not visible to the consumer.

> Read committed receives aborted events
> --
>
> Key: KAFKA-7927
> URL: https://issues.apache.org/jira/browse/KAFKA-7927
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core, producer 
>Affects Versions: 1.0.0
>Reporter: Gabor Somogyi
>Priority: Blocker
> Attachments: KafkaProducer.scala, consumer.log, producer.log.gz
>
>
> When a kafka client produces ~30k events and at the end it aborts the 
> transaction a consumer can read part of the aborted messages when 
> "isolation.level" set to "READ_COMMITTED".
> Kafka client version: 2.0.0
> Kafka broker version: 1.0.0
> Producer:
> {code:java}
> java -jar 
> kafka-producer/target/kafka-producer-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
> gsomogyi-cdh5144-220cloudera2-1.gce.cloudera.com:9092 src-topic
> {code}
> See attached code.
> Consumer:
> {code:java}
> kafka-console-consumer --zookeeper localhost:2181 --topic src-topic 
> --from-beginning --isolation-level read_committed
> {code}
> Same behavior seen when re-implemented the consumer in scala.
> The whole application can be found here: 
> https://github.com/gaborgsomogyi/kafka-semantics-tester



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7880) KafkaConnect should standardize worker thread name

2019-02-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769087#comment-16769087
 ] 

ASF GitHub Bot commented on KAFKA-7880:
---

FrLy commented on pull request #6273: KAFKA-7880:[WIP]Naming worker thread 
by task id
URL: https://github.com/apache/kafka/pull/6273
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConnect should standardize worker thread name
> --
>
> Key: KAFKA-7880
> URL: https://issues.apache.org/jira/browse/KAFKA-7880
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.1.0
>Reporter: YeLiang
>Assignee: YeLiang
>Priority: Minor
>
> KafkaConnect will create a WorkerTask for tasks assigned to it and then 
> submit tasks to a thread pool.
> However,the 
> [Worker|https://github.com/apache/kafka/blob/2.1.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java]
>  class initializes its thread pool using a default ThreadFactory.So the 
> thread name will have a pattern pool-[0-9]\-thread\-[0-9].
> When we are running KafkaConnect and find that one of the task thread is 
> under high CPU usage, it is difficult for us to find out which task is under 
> high load becasue when we print out the stack of KafkaConnect, we can only 
> see a list of threads name pool-[0-9]\-thread\-[0-9] even if we can know the 
> exact pid of the high CPU usage thread
> If worker threads name will be named like connectorName-taskId, it will be 
> very helpful



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7880) KafkaConnect should standardize worker thread name

2019-02-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769113#comment-16769113
 ] 

ASF GitHub Bot commented on KAFKA-7880:
---

FrLy commented on pull request #6275: KAFKA-7880:Naming worker thread by 
task id
URL: https://github.com/apache/kafka/pull/6275
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConnect should standardize worker thread name
> --
>
> Key: KAFKA-7880
> URL: https://issues.apache.org/jira/browse/KAFKA-7880
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.1.0
>Reporter: YeLiang
>Assignee: YeLiang
>Priority: Minor
>
> KafkaConnect will create a WorkerTask for tasks assigned to it and then 
> submit tasks to a thread pool.
> However,the 
> [Worker|https://github.com/apache/kafka/blob/2.1.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java]
>  class initializes its thread pool using a default ThreadFactory.So the 
> thread name will have a pattern pool-[0-9]\-thread\-[0-9].
> When we are running KafkaConnect and find that one of the task thread is 
> under high CPU usage, it is difficult for us to find out which task is under 
> high load becasue when we print out the stack of KafkaConnect, we can only 
> see a list of threads name pool-[0-9]\-thread\-[0-9] even if we can know the 
> exact pid of the high CPU usage thread
> If worker threads name will be named like connectorName-taskId, it will be 
> very helpful



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7565) NPE in KafkaConsumer

2019-02-15 Thread Alexey Vakhrenev (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769204#comment-16769204
 ] 

Alexey Vakhrenev commented on KAFKA-7565:
-

One more data point: when we removed all the {{wakeup()}} invocations, the 
issue has gone. Seems like throwing {{WakeupException}} leaves the 
{{KafkaConsumer}} in some inconsistent state.

> NPE in KafkaConsumer
> 
>
> Key: KAFKA-7565
> URL: https://issues.apache.org/jira/browse/KAFKA-7565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Alexey Vakhrenev
>Priority: Critical
> Fix For: 2.2.0
>
>
> The stacktrace is
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> {noformat}
> Couldn't find minimal reproducer, but it happens quite often in our system. 
> We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is 
> somehow related.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-15 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769321#comment-16769321
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

I am using this approach only after window aggregation step so that all the 
input events are windowed. Final output event is produced after window duration 
+ window grace period (the check if the time has expired is repeated multiple 
times using wall-clock punctuation). After that, I clear my state store 
accordingly. In this way information is not kept forever and max 1 event per 
window is emitted. Am I right?

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7933) KTableKTableLeftJoinTest takes an hour to finish

2019-02-15 Thread Viktor Somogyi (JIRA)
Viktor Somogyi created KAFKA-7933:
-

 Summary: KTableKTableLeftJoinTest takes an hour to finish
 Key: KAFKA-7933
 URL: https://issues.apache.org/jira/browse/KAFKA-7933
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.2.0
Reporter: Viktor Somogyi
 Attachments: jenkins-output-one-hour-test.log

PRs might time out as 
{{KTableKTableLeftJoinTest.shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions}}
 took one hour to complete.

{noformat}
11:57:45 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest > 
shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions STARTED
12:53:35 
12:53:35 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest > 
shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions PASSED
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7933) KTableKTableLeftJoinTest takes an hour to finish

2019-02-15 Thread Viktor Somogyi (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769385#comment-16769385
 ] 

Viktor Somogyi commented on KAFKA-7933:
---

For now I only saw this at one instance:
https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2394

> KTableKTableLeftJoinTest takes an hour to finish
> 
>
> Key: KAFKA-7933
> URL: https://issues.apache.org/jira/browse/KAFKA-7933
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Viktor Somogyi
>Priority: Major
> Attachments: jenkins-output-one-hour-test.log
>
>
> PRs might time out as 
> {{KTableKTableLeftJoinTest.shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions}}
>  took one hour to complete.
> {noformat}
> 11:57:45 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions STARTED
> 12:53:35 
> 12:53:35 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions PASSED
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7933) KTableKTableLeftJoinTest takes an hour to finish

2019-02-15 Thread Viktor Somogyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi updated KAFKA-7933:
--
Priority: Major  (was: Critical)

> KTableKTableLeftJoinTest takes an hour to finish
> 
>
> Key: KAFKA-7933
> URL: https://issues.apache.org/jira/browse/KAFKA-7933
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Viktor Somogyi
>Priority: Major
> Attachments: jenkins-output-one-hour-test.log
>
>
> PRs might time out as 
> {{KTableKTableLeftJoinTest.shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions}}
>  took one hour to complete.
> {noformat}
> 11:57:45 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions STARTED
> 12:53:35 
> 12:53:35 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions PASSED
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7886) Some partitions are fully truncated during recovery when log.message.format = 0.10.2 & inter.broker.protocol >= 0.11

2019-02-15 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7886.

Resolution: Duplicate

Resolving this as a duplicate of KAFKA-7897. This will be included in 2.1.1, 
which is set to release this week. Please reopen if the issue still persists.

> Some partitions are fully truncated during recovery when log.message.format = 
> 0.10.2 & inter.broker.protocol >= 0.11
> 
>
> Key: KAFKA-7886
> URL: https://issues.apache.org/jira/browse/KAFKA-7886
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.11.0.0, 2.0.1, 2.1.0
> Environment: centos 7 
>Reporter: Hervé RIVIERE
>Priority: Major
> Attachments: broker.log
>
>
> On a cluster of Kafka 2.0.1, and brokers configured with
>  * inter.broker.protocol.format = 2.0
>  * log.message.format.version = 0.10.2
>  
> In such configuration, when a broker is restarted (clean shutdown), the 
> recovery process, for some partitions, is not taking in account the high 
> watermark and is truncating and re-downloading the full partition.
> Typically for brokers with 500 partitions each / 5 TB of disk usage the 
> recovery process with this configuration is during up to 1 hour whereas it 
> usually takes less than 10 min in the same broker when 
> (inter.broker.protocol.format = log.message.format.version)
>  Partitions redownloaded seems not predictable : after several restart of the 
> same broker, partitions redownloaded are now always the same.
> Broker log filter for one specific partition that was redownloaded ( the 
> truncate offset : 12878451349 is corresponding to the log-start-offset) :
>  
> {code:java}
> 2019-01-31 09:23:34,703 INFO [ProducerStateManager partition=my_topic-11] 
> Writing producer snapshot at offset 13132373966 
> (kafka.log.ProducerStateManager)
> 2019-01-31 09:25:15,245 INFO [Log partition=my_topic-11, dir=/var/lib/kafka] 
> Loading producer state till offset 13132373966 with message format version 1 
> (kafka.log.Log)
> 2019-01-31 09:25:15,245 INFO [ProducerStateManager partition=my_topic-11] 
> Writing producer snapshot at offset 13130789408 
> (kafka.log.ProducerStateManager)
> 2019-01-31 09:25:15,249 INFO [ProducerStateManager partition=my_topic-11] 
> Writing producer snapshot at offset 13131829288 
> (kafka.log.ProducerStateManager)
> 2019-01-31 09:25:15,388 INFO [ProducerStateManager partition=my_topic-11] 
> Writing producer snapshot at offset 13132373966 
> (kafka.log.ProducerStateManager)
> 2019-01-31 09:25:15,388 INFO [Log partition=my_topic-11, dir=/var/lib/kafka] 
> Completed load of log with 243 segments, log start offset 12878451349 and log 
> end offset 13132373966 in 46273 ms (kafka.log.Log)
> 2019-01-31 09:28:38,226 INFO Replica loaded for partition my_topic-11 with 
> initial high watermark 13132373966 (kafka.cluster.Replica)
> 2019-01-31 09:28:38,226 INFO Replica loaded for partition my_topic-11 with 
> initial high watermark 0 (kafka.cluster.Replica)
> 2019-01-31 09:28:38,226 INFO Replica loaded for partition my_topic-11 with 
> initial high watermark 0 (kafka.cluster.Replica)
> 2019-01-31 09:28:42,132 INFO The cleaning for partition my_topic-11 is 
> aborted and paused (kafka.log.LogCleaner)
> 2019-01-31 09:28:42,133 INFO [Log partition=my_topic-11, dir=/var/lib/kafka] 
> Truncating to offset 12878451349 (kafka.log.Log)
> 2019-01-31 09:28:42,135 INFO [Log partition=my_topic-11, dir=/var/lib/kafka] 
> Scheduling log segment [baseOffset 12879521312, size 536869342] for deletion. 
> (kafka.log.Log)
> (...)
> 2019-01-31 09:28:42,521 INFO [Log partition=my_topic-11, dir=/var/lib/kafka] 
> Scheduling log segment [baseOffset 13131829288, size 280543535] for deletion. 
> (kafka.log.Log)
> 2019-01-31 09:28:43,870 WARN [ReplicaFetcher replicaId=11, leaderId=13, 
> fetcherId=1] Truncating my_topic-11 to offset 12878451349 below high 
> watermark 13132373966 (kafka.server.ReplicaFetcherThread)
> 2019-01-31 09:29:03,703 INFO [Log partition=my_topic-11, dir=/var/lib/kafka] 
> Found deletable segments with base offsets [12878451349] due to retention 
> time 25920ms breach (kafka.log.Log)
> 2019-01-31 09:28:42,550 INFO Compaction for partition my_topic-11 is resumed 
> (kafka.log.LogManager)
> {code}
>  
> We sucessfull tried to reproduce the same bug with kafka 0.11, 2.0.1 & 2.1.0
>  
> Same issue appears when we are doing a rolling restart by switching 
> log.message.format to 2.0
> Issue disappears when all brokers are with log.message.format = 2.0 & 
> inter.broker.protocol = 2.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7312) Transient failure in kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts

2019-02-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769540#comment-16769540
 ] 

Matthias J. Sax commented on KAFKA-7312:


Failed on `2.2`: 
https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/16/tests

Different trace
{quote}java.lang.AssertionError: Expected an exception of type 
org.apache.kafka.common.errors.TimeoutException; got type 
org.apache.kafka.common.errors.SslAuthenticationException
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1435)
at 
kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1071){quote}

> Transient failure in 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
> 
>
> Key: KAFKA-7312
> URL: https://issues.apache.org/jira/browse/KAFKA-7312
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Error Message
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
> Stacktrace
> org.junit.runners.model.TestTimedOutException: test timed out after 12 
> milliseconds
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
>   at 
> kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1345)
>   at 
> kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1080)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7933) KTableKTableLeftJoinTest takes an hour to finish

2019-02-15 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7933:
---
Affects Version/s: (was: 2.2.0)

> KTableKTableLeftJoinTest takes an hour to finish
> 
>
> Key: KAFKA-7933
> URL: https://issues.apache.org/jira/browse/KAFKA-7933
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Viktor Somogyi-Vass
>Priority: Major
> Attachments: jenkins-output-one-hour-test.log
>
>
> PRs might time out as 
> {{KTableKTableLeftJoinTest.shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions}}
>  took one hour to complete.
> {noformat}
> 11:57:45 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions STARTED
> 12:53:35 
> 12:53:35 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions PASSED
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7933) KTableKTableLeftJoinTest takes an hour to finish

2019-02-15 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7933:
---
Component/s: unit tests

> KTableKTableLeftJoinTest takes an hour to finish
> 
>
> Key: KAFKA-7933
> URL: https://issues.apache.org/jira/browse/KAFKA-7933
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Viktor Somogyi-Vass
>Priority: Major
> Attachments: jenkins-output-one-hour-test.log
>
>
> PRs might time out as 
> {{KTableKTableLeftJoinTest.shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions}}
>  took one hour to complete.
> {noformat}
> 11:57:45 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions STARTED
> 12:53:35 
> 12:53:35 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions PASSED
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7799) Fix flaky test RestServerTest.testCORSEnabled

2019-02-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769579#comment-16769579
 ] 

ASF GitHub Bot commented on KAFKA-7799:
---

avocader commented on pull request #6276: KAFKA-7799; Use httpcomponents-client 
in RestServerTest.
URL: https://github.com/apache/kafka/pull/6276
 
 
   The test 
`org.apache.kafka.connect.runtime.rest.RestServerTest#testCORSEnabled` assumes 
Jersey client can send restricted HTTP headers(`Origin`).
   
   Jersey client uses `sun.net.www.protocol.http.HttpURLConnection`.
   `sun.net.www.protocol.http.HttpURLConnection` drops restricted 
headers(`Host`, `Keep-Alive`, `Origin`, etc) based on static property 
`allowRestrictedHeaders`.
   This property is initialized in a static block by reading Java system 
property `sun.net.http.allowRestrictedHeaders`.
   
   So, if classloader loads `HttpURLConnection` before we set 
`sun.net.http.allowRestrictedHeaders=true`, then all subsequent changes of this 
system property won't take any effect(which happens if 
`org.apache.kafka.connect.integration.ExampleConnectIntegrationTest` is 
executed before `RestServerTest`).
   To prevent this, we have to either make sure we set 
`sun.net.http.allowRestrictedHeaders=true` as early as possible or do not rely 
on this system property at all.
   
   This PR adds test dependency on `httpcomponents-client` which doesn't depend 
on `sun.net.http.allowRestrictedHeaders` system property. Thus none of existing 
tests should interfere with `RestServerTest`.
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix flaky test RestServerTest.testCORSEnabled
> -
>
> Key: KAFKA-7799
> URL: https://issues.apache.org/jira/browse/KAFKA-7799
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> Starting to see this failure quite a lot, locally and on jenkins:
> {code}
> org.apache.kafka.connect.runtime.rest.RestServerTest.testCORSEnabled
> Failing for the past 7 builds (Since Failed#18600 )
> Took 0.7 sec.
> Error Message
> java.lang.AssertionError: expected: but was:
> Stacktrace
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.kafka.connect.runtime.rest.RestServerTest.checkCORSRequest(RestServerTest.java:221)
>   at 
> org.apache.kafka.connect.runtime.rest.RestServerTest.testCORSEnabled(RestServerTest.java:84)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:326)
>   at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89)
>   at 
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97)
> {code}
> If it helps, I see an uncaught exception in the stdout:
> {code}
> [2019-01-08 19:35:23,664] ERROR Uncaught exception in REST call to 
> /connector-plugins/FileStreamSource/validate 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
> javax.ws.rs.NotFoundException: HTTP 404 Not Found
>   at 
> org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:274)
>   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
>   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
>   at 
> org.glassfish.jersey.process.internal.RequestScope.runInScope(

[jira] [Commented] (KAFKA-7799) Fix flaky test RestServerTest.testCORSEnabled

2019-02-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769592#comment-16769592
 ] 

ASF GitHub Bot commented on KAFKA-7799:
---

avocader commented on pull request #6277: KAFKA-7799; Use httpcomponents-client 
in RestServerTest.
URL: https://github.com/apache/kafka/pull/6277
 
 
   The test 
`org.apache.kafka.connect.runtime.rest.RestServerTest#testCORSEnabled` assumes 
Jersey client can send restricted HTTP headers(`Origin`).
   
   Jersey client uses `sun.net.www.protocol.http.HttpURLConnection`.
   `sun.net.www.protocol.http.HttpURLConnection` drops restricted 
headers(`Host`, `Keep-Alive`, `Origin`, etc) based on static property 
`allowRestrictedHeaders`.
   This property is initialized in a static block by reading Java system 
property `sun.net.http.allowRestrictedHeaders`.
   
   So, if classloader loads `HttpURLConnection` before we set 
`sun.net.http.allowRestrictedHeaders=true`, then all subsequent changes of this 
system property won't take any effect(which happens if 
`org.apache.kafka.connect.integration.ExampleConnectIntegrationTest` is 
executed before `RestServerTest`).
   To prevent this, we have to either make sure we set 
`sun.net.http.allowRestrictedHeaders=true` as early as possible or do not rely 
on this system property at all.
   
   This PR adds test dependency on `httpcomponents-client` which doesn't depend 
on `sun.net.http.allowRestrictedHeaders` system property. Thus none of existing 
tests should interfere with `RestServerTest`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix flaky test RestServerTest.testCORSEnabled
> -
>
> Key: KAFKA-7799
> URL: https://issues.apache.org/jira/browse/KAFKA-7799
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> Starting to see this failure quite a lot, locally and on jenkins:
> {code}
> org.apache.kafka.connect.runtime.rest.RestServerTest.testCORSEnabled
> Failing for the past 7 builds (Since Failed#18600 )
> Took 0.7 sec.
> Error Message
> java.lang.AssertionError: expected: but was:
> Stacktrace
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.kafka.connect.runtime.rest.RestServerTest.checkCORSRequest(RestServerTest.java:221)
>   at 
> org.apache.kafka.connect.runtime.rest.RestServerTest.testCORSEnabled(RestServerTest.java:84)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:326)
>   at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89)
>   at 
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97)
> {code}
> If it helps, I see an uncaught exception in the stdout:
> {code}
> [2019-01-08 19:35:23,664] ERROR Uncaught exception in REST call to 
> /connector-plugins/FileStreamSource/validate 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
> javax.ws.rs.NotFoundException: HTTP 404 Not Found
>   at 
> org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:274)
>   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
>   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
>   at 
> org.glassfish.jersey.process.internal.RequestScope.runInSc

[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-15 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769611#comment-16769611
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

Late record as I understand (correct me if I'm wrong) is between the window 
duration end time and window grace period end time. After that I reject all the 
late events.

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7933) KTableKTableLeftJoinTest takes an hour to finish

2019-02-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769638#comment-16769638
 ] 

Matthias J. Sax commented on KAFKA-7933:


Happende again: 
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/19484/consoleFull

> KTableKTableLeftJoinTest takes an hour to finish
> 
>
> Key: KAFKA-7933
> URL: https://issues.apache.org/jira/browse/KAFKA-7933
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Viktor Somogyi-Vass
>Priority: Major
> Attachments: jenkins-output-one-hour-test.log
>
>
> PRs might time out as 
> {{KTableKTableLeftJoinTest.shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions}}
>  took one hour to complete.
> {noformat}
> 11:57:45 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions STARTED
> 12:53:35 
> 12:53:35 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions PASSED
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769659#comment-16769659
 ] 

Matthias J. Sax commented on KAFKA-7882:


{quote}After that I reject all the late events.
{quote}
This implies, that you cannot reprocess old data, because you would reject 
everything as late for this case until to reach the end of the topic and 
event-time matches wall-clock time again. Thus, using wall-clock time only 
works for the live run when you are at the end of the topic. If your 
application starts to lag, this might also become problematic, because you 
might reject/drop more records than you wish as "late records".

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-02-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769663#comment-16769663
 ] 

ASF GitHub Bot commented on KAFKA-7895:
---

vvcephei commented on pull request #6278: KAFKA-7895: Fix stream-time reckoning 
for suppress
URL: https://github.com/apache/kafka/pull/6278
 
 
   * Add suppress to system tests
   * Move stream-time reckoning from Task into Processor
   
   Even within a Task, different Processors have different perceptions
   of time, due to record caching on stores and in suppression itself,
   and in general, due to any processor logic that may hold onto
   records arbitrarily and emit them later. Thanks to this, we can't rely
   on the whole task existing in the same "instant" of stream-time. The
   solution is for each processor node that cares about stream-time to
   track it independently.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  * make sure that there's some system test coverage with caching disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7914) LDAP

2019-02-15 Thread Jordan Moore (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769766#comment-16769766
 ] 

Jordan Moore commented on KAFKA-7914:
-

I'll assume this is about having an LDAPAuthorizer in AK like Confluent has
https://docs.confluent.io/current/confluent-security-plugins/kafka/introduction.html

> LDAP
> 
>
> Key: KAFKA-7914
> URL: https://issues.apache.org/jira/browse/KAFKA-7914
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chris Bogan
>Priority: Major
>
> Entry



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769587#comment-16769587
 ] 

Matthias J. Sax commented on KAFKA-7882:


{quote}After that, I clear my state store accordingly.
{quote}
What happens if there is a late record after this step?

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-02-15 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769842#comment-16769842
 ] 

John Roesler commented on KAFKA-7895:
-

Hi all,

 

I'm proposing [https://github.com/apache/kafka/pull/6278] as a fix to this 
issue.

If anyone is willing to build my branch and verify if it fixes the issue for 
them, I would be grateful.

Your code reviews are also appreciated.

 

Thanks,

-John

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  * make sure that there's some system test coverage with caching disabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-02-15 Thread Yishun Guan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769862#comment-16769862
 ] 

Yishun Guan commented on KAFKA-6460:


Sounds good to me [~mjsax] and [~guozhang], the design scope and all the use 
cases are a little bit out of reach to me already. So if I am continuing with 
this story, a KIP should be the way to go, so I can gather more opinions and 
suggestions.

 

>From what I am seeing, what we want is to implement mock stores. So we will be 
>implementing a `MockStoreFactory` that generates 'MockxxxStoreBuilder' that 
>generates 'MockxxxStoreSupplier', so we are implementing all these layers? 
>Which ones should be EasyMock? 'MockxxxStoreSupplier' should be tracking it's 
>function calls, what kind of detail levels do we want? So it be similar to 
>KeyValueStoreTestDriver's structure?

These will be mainly used with the `TopologyTestDriver` (the other usage is to 
replace the vanilla store test drivers like KeyValueStoreTestDriver), so use 
the following as an example:

 
{code:java}
@Test
public void shouldDriveGlobalStore() {
 final String storeName = "my-store";
 final String global = "global";
 final String topic = "topic";

 
topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
 Serdes.String(), Serdes.String()).withLoggingDisabled(),
 global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", 
define(new StatefulProcessor(storeName)));

 driver = new TopologyTestDriver(topology, props);
 final KeyValueStore globalStore = 
driver.getKeyValueStore(storeName);
 driver.pipeInput(recordFactory.create(topic, "key1", "value1"));
 driver.pipeInput(recordFactory.create(topic, "key2", "value2"));
 assertEquals("value1", globalStore.get("key1"));
 assertEquals("value2", globalStore.get("key2"));
}{code}
>From what Guozhang is saying, even though `Topology` takes an 
>'inMemoryKeyValueStore', we need to have something under the hood of 
>`TopologyTestDriver`, that will iterate through the stores in 'Topology' and 
>replace them with a 'MockStoreFactory.mockedKeyValueStore(...', is that true?

I agree, more could be discussed through a KIP, I will prepare one soon.

> Add mocks for state stores used in Streams unit testing
> ---
>
> Key: KAFKA-6460
> URL: https://issues.apache.org/jira/browse/KAFKA-6460
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: newbie++
>
> We'd like to use mocks for different types of state stores: kv, window, 
> session that can be used to record the number of expected put / get calls 
> used in the DSL operator unit testing. This involves implementing the two 
> interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object 
> created from, say, EasyMock, and the object can then be set up with the 
> expected calls.
> In addition, we should also add a mock record collector which can be returned 
> from the mock processor context so that with logging enabled store, users can 
> also validate if the changes have been forwarded to the changelog as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2019-02-15 Thread Peter Davis (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769912#comment-16769912
 ] 

Peter Davis commented on KAFKA-7088:


Encountered this bug today with 0.11.0 client.  Similar same stack trace and 
symptoms.  Caused a major issue in production.

In 0.11.0 there is no timeout for this await().  A timeout was added by 
KAFKA-6446 in 2.x.  KAFKA-6446 also appears to fix the deadlock – instead an 
error would be thrown. (Fixed/dup?)

KAFKA-6446 also suggests the root cause – *the broker is either unavailable or 
has stopped responding to InitProducerIdRequests.*

**Very interesting error on broker that immediately preceded the outage – 
*"failed: this should not happen"* followed by a transaction metadata error.  
Eventually, we identified that this particular broker was in a bad state, 
restarted it, and the problem immediately resolved.  Broker is running CP 
4.1.2, which is Apache Kafka 1.1.1-cp1 (not sure what all is in the -cp1 
patch).  It looks like this particular tx metadata error is thrown when 
validating the producer epoch, which means either the broker or producer times 
out.  This particular broker is on a VM and we've seen issues with 
pauses/performance issues in the past that can cause a broker to get into a bad 
state after timing out (unrelated but illustrative example: ).
{quote}{{February 15th 2019, 04:32:15.538 xx_broker_xxx.example.com 
[2019-02-15 12:32:08,191] ERROR 
TransactionMetadata(transactionalId=xx_group_xxx--0_1, 
producerId=106000, producerEpoch=427, txnTimeoutMs=6, state=CompleteCommit, 
pendingState=Some(Ongoing), topicPartitions=Set(), 
txnStartTimestamp=1550233928711, txnLastUpdateTimestamp=1550233927976)'s 
transition to TxnTransitMetadata(producerId=106000, producerEpoch=427, 
txnTimeoutMs=6, txnState=Ongoing, 
topicPartitions=Set(xx_topic_xxx-1), txnStartTimestamp=1550233928001, 
txnLastUpdateTimestamp=1550233928001) +*failed: this should not happen*+ 
(kafka.coordinator.transaction.TransactionMetadata)}}{{February 15th 2019, 
04:32:15.538 xx_broker_xxx.example.com [2019-02-15 12:32:08,239] ERROR 
[KafkaApi-1] Error when handling request 
\{replica_id=103,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=647691730,epoch=6210667,topics=[{topic=__transaction_state,partitions=[{partition=25,fetch_offset=32613278,log_start_offset=0,max_bytes=10485760}]}],forgetten_topics_data=[]}
 (kafka.server.KafkaApis)}}
{{java.lang.IllegalStateException: TransactionalId xx_group_xxx-0_1 
*failed transition to state* TxnTransitMetadata(producerId=106000, 
producerEpoch=427, txnTimeoutMs=6, txnState=Ongoing, 
topicPartitions=Set(xx_topic_xxx-1), txnStartTimestamp=1550233928001, 
txnLastUpdateTimestamp=1550233928001) due to unexpected metadata}}
{{ at 
kafka.coordinator.transaction.TransactionMetadata.throwStateTransitionFailure(TransactionMetadata.scala:390)}}
{{ at 
kafka.coordinator.transaction.TransactionMetadata.*completeTransitionTo(TransactionMetadata.scala:326)*}}
{{ at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply$mcV$sp(TransactionStateManager.scala:534)}}
{{ at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply(TransactionStateManager.scala:526)}}
{{ at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply(TransactionStateManager.scala:526)}}
{{ at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)}}
{{ at 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)}}
{{ at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1(TransactionStateManager.scala:525)}}
{{ at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$11.apply(TransactionStateManager.scala:620)}}
{{ at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$11.apply(TransactionStateManager.scala:620)}}
{{ at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)}}
{{ at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)}}
{{ at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)}}
{{ at 
kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)}}
{{ at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371)}}
{{ at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277)}}
{{ at 
kafka.server.ReplicaManager.tryCompleteDelayedProduce(ReplicaManager.scala:289)}}
{{

[jira] [Commented] (KAFKA-7884) Docs for message.format.version and log.message.format.version show invalid (corrupt?) "valid values"

2019-02-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7884?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769916#comment-16769916
 ] 

ASF GitHub Bot commented on KAFKA-7884:
---

hachikuji commented on pull request #6209: KAFKA-7884: Docs for 
message.format.version and log.message.format.version show invalid (corrupt?) 
"valid values"
URL: https://github.com/apache/kafka/pull/6209
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Docs for message.format.version and log.message.format.version show invalid 
> (corrupt?) "valid values"
> -
>
> Key: KAFKA-7884
> URL: https://issues.apache.org/jira/browse/KAFKA-7884
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: James Cheng
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.2.0
>
>
> In the docs for message.format.version and log.message.format.version, the 
> list of valid values is
>  
> {code:java}
> kafka.api.ApiVersionValidator$@56aac163 
> {code}
>  
> It appears it's simply doing a .toString on the class/instance.
> At a minimum, we should remove this java-y-ness.
> Even better is, it should show all the valid values.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7933) KTableKTableLeftJoinTest takes an hour to finish

2019-02-15 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-7933:
--

Assignee: Matthias J. Sax

> KTableKTableLeftJoinTest takes an hour to finish
> 
>
> Key: KAFKA-7933
> URL: https://issues.apache.org/jira/browse/KAFKA-7933
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Viktor Somogyi-Vass
>Assignee: Matthias J. Sax
>Priority: Major
> Attachments: jenkins-output-one-hour-test.log
>
>
> PRs might time out as 
> {{KTableKTableLeftJoinTest.shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions}}
>  took one hour to complete.
> {noformat}
> 11:57:45 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions STARTED
> 12:53:35 
> 12:53:35 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions PASSED
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7933) KTableKTableLeftJoinTest takes an hour to finish

2019-02-15 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769918#comment-16769918
 ] 

Matthias J. Sax commented on KAFKA-7933:


One more: 
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/19503/consoleFull

> KTableKTableLeftJoinTest takes an hour to finish
> 
>
> Key: KAFKA-7933
> URL: https://issues.apache.org/jira/browse/KAFKA-7933
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Viktor Somogyi-Vass
>Priority: Major
> Attachments: jenkins-output-one-hour-test.log
>
>
> PRs might time out as 
> {{KTableKTableLeftJoinTest.shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions}}
>  took one hour to complete.
> {noformat}
> 11:57:45 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions STARTED
> 12:53:35 
> 12:53:35 org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoinTest 
> > shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions PASSED
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7088) Kafka streams thread waits infinitely on transaction init

2019-02-15 Thread Peter Davis (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16769912#comment-16769912
 ] 

Peter Davis edited comment on KAFKA-7088 at 2/16/19 12:58 AM:
--

Encountered this bug today with 0.11.0 client.  Similar same stack trace and 
symptoms.  Caused a major issue in production.

In 0.11.0 there is no timeout for this await().  A timeout was added by 
KAFKA-6446 in 2.x.  KAFKA-6446 also appears to fix the deadlock – instead an 
error would be thrown. (Fixed/dup?)

KAFKA-6446 also suggests the root cause – *the broker is either unavailable or 
has stopped responding to InitProducerIdRequests.*

**Very interesting error on broker that immediately preceded the outage – 
*"failed: this should not happen"* followed by a transaction metadata error.  
Eventually, we identified that this particular broker was in a bad state, 
restarted it, and the problem immediately resolved.  Broker is running CP 
4.1.2, which is Apache Kafka 1.1.1-cp1 (not sure what all is in the -cp1 
patch).  It looks like this particular tx metadata error is thrown when 
validating the producer epoch, which means either the broker or producer times 
out.  This particular broker is on a VM and we've seen issues with 
pauses/performance issues in the past that can cause a broker to get into a bad 
state after timing out.  (Have never seen an issue with Transaction coordinator 
before, but plenty of issues with ZooKeeper and Group Coordinators on VMs.)
{quote}{{February 15th 2019, 04:32:15.538 xx_broker_xxx.example.com 
[2019-02-15 12:32:08,191] ERROR 
TransactionMetadata(transactionalId=xx_group_xxx--0_1, 
producerId=106000, producerEpoch=427, txnTimeoutMs=6, state=CompleteCommit, 
pendingState=Some(Ongoing), topicPartitions=Set(), 
txnStartTimestamp=1550233928711, txnLastUpdateTimestamp=1550233927976)'s 
transition to TxnTransitMetadata(producerId=106000, producerEpoch=427, 
txnTimeoutMs=6, txnState=Ongoing, 
topicPartitions=Set(xx_topic_xxx-1), txnStartTimestamp=1550233928001, 
txnLastUpdateTimestamp=1550233928001) +*failed: this should not happen*+ 
(kafka.coordinator.transaction.TransactionMetadata)}}{{February 15th 2019, 
04:32:15.538 xx_broker_xxx.example.com [2019-02-15 12:32:08,239] ERROR 
[KafkaApi-1] Error when handling request 
{replica_id=103,max_wait_time=500,min_bytes=1,max_bytes=10485760,isolation_level=0,session_id=647691730,epoch=6210667,topics=[{topic=__transaction_state,partitions=[
Unknown macro: 
\{partition=25,fetch_offset=32613278,log_start_offset=0,max_bytes=10485760}
]}],forgetten_topics_data=[]} (kafka.server.KafkaApis)}}
 {{java.lang.IllegalStateException: TransactionalId xx_group_xxx-0_1 
*failed transition to state* TxnTransitMetadata(producerId=106000, 
producerEpoch=427, txnTimeoutMs=6, txnState=Ongoing, 
topicPartitions=Set(xx_topic_xxx-1), txnStartTimestamp=1550233928001, 
txnLastUpdateTimestamp=1550233928001) due to unexpected metadata}}
 \{{ at 
kafka.coordinator.transaction.TransactionMetadata.throwStateTransitionFailure(TransactionMetadata.scala:390)}}
 {{ at 
kafka.coordinator.transaction.TransactionMetadata.*completeTransitionTo(TransactionMetadata.scala:326)*}}
 \{{ at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply$mcV$sp(TransactionStateManager.scala:534)}}
 \{{ at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply(TransactionStateManager.scala:526)}}
 \{{ at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1$1.apply(TransactionStateManager.scala:526)}}
 \{{ at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)}}
 \{{ at 
kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)}}
 \{{ at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$updateCacheCallback$1(TransactionStateManager.scala:525)}}
 \{{ at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$11.apply(TransactionStateManager.scala:620)}}
 \{{ at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$appendTransactionToLog$1$$anonfun$apply$mcV$sp$11.apply(TransactionStateManager.scala:620)}}
 \{{ at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)}}
 \{{ at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)}}
 \{{ at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111)}}
 \{{ at 
kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)}}
 \{{ at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala

[jira] [Resolved] (KAFKA-7884) Docs for message.format.version and log.message.format.version show invalid (corrupt?) "valid values"

2019-02-15 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7884.

Resolution: Fixed

> Docs for message.format.version and log.message.format.version show invalid 
> (corrupt?) "valid values"
> -
>
> Key: KAFKA-7884
> URL: https://issues.apache.org/jira/browse/KAFKA-7884
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: James Cheng
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.2.0
>
>
> In the docs for message.format.version and log.message.format.version, the 
> list of valid values is
>  
> {code:java}
> kafka.api.ApiVersionValidator$@56aac163 
> {code}
>  
> It appears it's simply doing a .toString on the class/instance.
> At a minimum, we should remove this java-y-ness.
> Even better is, it should show all the valid values.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7932) Streams needs to handle new Producer exceptions

2019-02-15 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7932:

Affects Version/s: 2.0.0
   2.0.1
   2.1.0
   2.1.1

> Streams needs to handle new Producer exceptions
> ---
>
> Key: KAFKA-7932
> URL: https://issues.apache.org/jira/browse/KAFKA-7932
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1
>Reporter: John Roesler
>Priority: Critical
> Fix For: 2.3.0
>
>
> Following on KAFKA-7763, Streams needs to handle the new behavior.
> See also https://github.com/apache/kafka/pull/6066
> Streams code (StreamTask.java) needs to be modified to handle the new 
> exception.
> Also, from another upstream change, `initTxn` can also throw TimeoutException 
> now: default `MAX_BLOCK_MS_CONFIG` in producer is 60 seconds, so I think just 
> wrapping it as StreamsException should be reasonable, similar to what we do 
> for `producer#send`'s TimeoutException 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L220-L225]
>  ).
>  
> Note we need to handle in three functions: init/commit/abortTxn.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-02-15 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7895:

Description: 
Hi, We are using kstreams to get the aggregated counts per vendor(key) within a 
specified window.

Here's how we configured the suppress operator to emit one final record per 
key/window.
{code:java}
KTable, Long> windowedCount = groupedStream
 .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
 .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
 .suppress(Suppressed.untilWindowCloses(unbounded()));
{code}
But we are getting more than one record for the same key/window as shown below.
{code:java}
[KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
[KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
[KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
[KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
[KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
[KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
[KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
[KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
Could you please take a look?

Thanks

 

 

Added by John:

Acceptance Criteria:
 * add suppress to system tests, such that it's exercised with crash/shutdown 
recovery, rebalance, etc.
 ** https://github.com/apache/kafka/pull/6278
 * make sure that there's some system test coverage with caching disabled.
 * test with tighter time bounds with windows of say 30 seconds and use system 
time without adding any extra time for verification

  was:
Hi, We are using kstreams to get the aggregated counts per vendor(key) within a 
specified window.

Here's how we configured the suppress operator to emit one final record per 
key/window.
{code:java}
KTable, Long> windowedCount = groupedStream
 .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
 .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
 .suppress(Suppressed.untilWindowCloses(unbounded()));
{code}
But we are getting more than one record for the same key/window as shown below.
{code:java}
[KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
[KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
[KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
[KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
[KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
[KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
[KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
[KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
Could you please take a look?

Thanks

 

 

Added by John:

Acceptance Criteria:
 * add suppress to system tests, such that it's exercised with crash/shutdown 
recovery, rebalance, etc.
 * make sure that there's some system test coverage with caching disabled.


> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** https://github.com/apache/

[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-15 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16770005#comment-16770005
 ] 

John Roesler commented on KAFKA-7882:
-

My mistake [~nijo], I made an incorrect assumption about your use case based on 
an incorrect assumption about how you were using wall-clock time.

If you do really need exactly final results, then `suppress` is what you want 
to use, and you may want to delay going to production until 
[https://github.com/apache/kafka/pull/6278] is merged and released (should be 
soon, hopefully).

 

As your conversation with Matthias reflects, you can guarantee final results 
with wall-clock-based punctuation, but you have to be careful, do proper 
book-keeping, and there are some limitations. Stream-time is more natural.

As I learned in [https://github.com/apache/kafka/pull/6278,] it's quite hard to 
get exactly right, and there are subtle edge conditions to consider.

 

-John

 

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)