[jira] [Commented] (KAFKA-9450) Decouple inner state flushing from committing

2020-02-13 Thread Navinder Brar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036093#comment-17036093
 ] 

Navinder Brar commented on KAFKA-9450:
--

Sure, [~ableegoldman]. I will create a ticket. Checked with Rocksdb, they will 
not cherry-pick to 5.x version. So, we will have to wait I guess.

> Decouple inner state flushing from committing
> -
>
> Key: KAFKA-9450
> URL: https://issues.apache.org/jira/browse/KAFKA-9450
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> When EOS is turned on, the commit interval is set quite low (100ms) and all 
> the store layers are flushed during a commit. This is necessary for 
> forwarding records in the cache to the changelog, but unfortunately also 
> forces rocksdb to flush the current memtable before it's full. The result is 
> a large number of small writes to disk, losing the benefits of batching, and 
> a large number of very small L0 files that are likely to slow compaction.
> Since we have to delete the stores to recreate from scratch anyways during an 
> unclean shutdown with EOS, we may as well skip flushing the innermost 
> StateStore during a commit and only do so during a graceful shutdown, before 
> a rebalance, etc. This is currently blocked on a refactoring of the state 
> store layers to allow decoupling the flush of the caching layer from the 
> actual state store.
> Note that this is especially problematic with EOS due to the necessarily-low 
> commit interval, but still hurts even with at-least-once and a much larger 
> commit interval. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-13 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036099#comment-17036099
 ] 

Bruno Cadonna commented on KAFKA-9533:
--

Thank you for picking this up!

Ad 1) We do not support backwards compatibility of bugs. The documentation is 
clear, we just failed to implement it correctly.

Ad 2) If you want to try to simplify the code with the adapter as in 
{{transform()}} and {{flatTransform()}} you are very welcome to do so. For a 
smoother review process, I would like to ask you to create two PRs, one for the 
refactoring and one for the bug fix.

Are you able to assign the ticket to yourself?

> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Viamari
>Priority: Minor
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9280) Duplicate messages are observed in ACK mode ALL

2020-02-13 Thread VIkram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036125#comment-17036125
 ] 

VIkram commented on KAFKA-9280:
---

Will the consumer FetchRequest\{from=1000} gets served?

> Duplicate messages are observed in ACK mode ALL
> ---
>
> Key: KAFKA-9280
> URL: https://issues.apache.org/jira/browse/KAFKA-9280
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: VIkram
>Priority: Major
>
> In ack mode ALL, leader is sending the message to consumer even before 
> receiving the acknowledgements from other replicas. This can lead to 
> *+duplicate messages+*.
>  
> Setup details:
>  * 1 zookeeper, 5 brokers
>  * Producer: Synchronous
>  * Topic: 1 partition, replication factor - 3, min isr - 2
>  
> Say First replica (Leader), Second replica and Third replica are the three 
> replicas of the topic.
>  
> *Sequence of events:*
> a) All brokers are up and running.
> b) Clients started running.
> c) Kill second replica of the topic.
> d) Kill the third replica. Now min isr will not be satisfied.
> e) Bring up third replica. Min isr will be satisfied.
>  
> *Breakdown of step 'd':*
>  # Just before producer sends next message, killed third replica with kill -9 
> (Leader takes time ~5sec to detect that the broker is down).
>  # Producer sent a message to leader.
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer retries sending the same message. (In our application it is 
> the next integer we send).
>  # Now when second/third replica is up, leader accepts the message and sends 
> the same message to consumer. *Thus sending duplicates.*
>  
>  
> *Logs:*
>  # 2-3 seconds before producer sends next message, killed third replica with 
> kill -9 (Leader takes time ~5sec to detect that the broker is down).
> _{{{_
> _> kill -9 49596_
> _}}}_
>  __ 
>  # Producer sent a message to leader.
> _{{{_
> _[20190327 11:02:48.231 EDT (main-Terminating-1) Will send message: 
> ProducerRecord(topic=t229, partition=null, headers=RecordHeaders(headers = 
> [], isReadOnly = false), key=null, value=p229-4, timestamp=null)_
> _}}}_
>  
>  # Before the leader knows that third replica is down, it accepts the message 
> from producer.
>  # Leader forwards the message to third replica.
>  # Before receiving ACK from third replica, leader sent the message to 
> consumer.
> _{{{_
>  _Received MessageRecord: ConsumerRecord(topic = t229, partition = 0, 
> leaderEpoch = 1, offset = 3, CreateTime = 1553698968232, serialized key size 
> = -1, serialized value size = 6, headers = RecordHeaders(headers = [], 
> isReadOnly = false), key = null, value = p229-4)_
> _}}}_
>  __ 
>  # Leader doesn't get an ACK from third replica.
>  # Now leader detects that third replica is down and throws 
> NOT_ENOUGH_REPLICAS_EXCEPTION.
> _{{{_
> _[2019-03-27 11:02:53,541] ERROR [ReplicaManager broker=0] Error processing 
> append operation on partition t229-0 (kafka.server.ReplicaManager)_
> _org.apache.kafka.common.errors.NotEnoughReplicasException: The size of the 
> current ISR Set(0) is insufficient to satisfy the min.isr requirement of 2 
> for partition t229-0_
> _}}}_
>  
>  # Now leader stops accepting messages from producer.
>  # Since producer didn't get ACK from leader, it will throw TIMEOUT_EXCEPTION 
> after timeout (2min in our case) .
> _{{{_
>  _java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> t229-0:12 ms_
> _has passed since batch creation_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)_
>     _at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)_
> _Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
> record(s) for t229-0:12 ms has passed since batch creation_
> _}}}_
>  
>  # So far, producer believes that the message was not received by leader 
> whereas the consumer actually received it.
>  # Now producer re

[jira] [Created] (KAFKA-9546) Make FileStreamSourceTask extendable with generic streams

2020-02-13 Thread Csaba Galyo (Jira)
Csaba Galyo created KAFKA-9546:
--

 Summary: Make FileStreamSourceTask extendable with generic streams
 Key: KAFKA-9546
 URL: https://issues.apache.org/jira/browse/KAFKA-9546
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Csaba Galyo


Use case: I want to read a ZIP compressed text file with a file connector and 
send it to Kafka.

Currently, we have FileStreamSourceConnector which reads a \n delimited text 
file. This connector always returns a task of type FileStreamSourceTask.

The FileStreamSourceTask reads from stdio or opens a file InputStream. The 
issue with this approach is that the input needs to be a text file, otherwise 
it won't work. 

The code should be modified so that users could change the default InputStream 
to eg. ZipInputStream, or any other format. The code is currently written in 
such a way that it's not possible to extend it, we cannot use a different input 
stream. 

See example here where the code got copy-pasted just so it could read from a 
ZstdInputStream (which reads ZSTD compressed files): 
[https://github.com/gcsaba2/kafka-zstd/tree/master/src/main/java/org/apache/kafka/connect/file]

 

I suggest 2 changes:
 # FileStreamSourceConnector should be extendable to return tasks of different 
types. These types would be input by the user through the configuration map
 # FileStreamSourceTask should be modified so it could be extended and child 
classes could define different input streams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9547) Kafka transaction - skip one offset when the application stops and be started again

2020-02-13 Thread Rumel (Jira)
Rumel created KAFKA-9547:


 Summary: Kafka transaction - skip one offset when the application 
stops and be started again
 Key: KAFKA-9547
 URL: https://issues.apache.org/jira/browse/KAFKA-9547
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.4.0
 Environment: I am using kafka-clients 2.4.0 and 
wurstmeister/kafka:2.12-2.3.0
Reporter: Rumel


To be fair, I have tested it with normal kafka without transaction scheme, and 
it does not skip the offset when I try to rerun the ProducerTest like a lot of 
times.
{code:java}
object ProducerTest extends LazyLogging {
  def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka.local:9092")
props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
props.put("acks", "all")
props.put("retries", "3")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("zxc", "key", "value")
val record2 = new ProducerRecord[String, String]("zxc", "key2", "value2")
val record3 = new ProducerRecord[String, String]("zxc", "key3", "value3")
producer.send(record)
producer.send(record2)
producer.send(record3)
Thread.sleep(3000)
  }
}{code}
But when I enable transaction on producer, it will skip one offset when the 
ProducerTestWithTransaction application is rerun. Like when I first started it, 
it has an offset of 0,1,2 then after rerun, it will be 4,5,6 which skips 3, and 
so on and so forth.
{code:java}
object ProducerTestWithTransaction extends LazyLogging {
  def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka.local:9092")
props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer")
props.put("enable.idempotence", "true")
props.put("transactional.id", "alona")
props.put("acks", "all")
props.put("retries", "3")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("wew", "key", "value")
val record2 = new ProducerRecord[String, String]("wew", "key2", "value2")
val record3 = new ProducerRecord[String, String]("wew", "key3", "value3")
producer.initTransactions()
try {
  producer.beginTransaction()
  producer.send(record)
  producer.send(record2)
  producer.send(record3)
  producer.commitTransaction()
} catch {
  case e: ProducerFencedException => producer.close()
  case e: Exception => producer.abortTransaction();
}
  }
}{code}
Please enlighten me why this is happening? Is this the standard behavior when 
we are using transaction? Is there any workaround on this to not skip an 
offset. Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-13 Thread Michael Viamari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036291#comment-17036291
 ] 

Michael Viamari commented on KAFKA-9533:


Ok. Great. I'll address the adaptor code separately if necessary.

I cannot yet assign this to myself.

> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Viamari
>Priority: Minor
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9319) Run some system tests using TLSv1.3

2020-02-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036295#comment-17036295
 ] 

ASF GitHub Bot commented on KAFKA-9319:
---

nizhikov commented on pull request #8106: KAFKA-9319: Fix generation of CA 
certificate for system tests.
URL: https://github.com/apache/kafka/pull/8106
 
 
   I perform system tests check to ensure that we can enable only `TLSv1.3` by 
default.
   
   I've found two issues:
   
 1. CA certificate that is generated in `security_config.py` can't be 
validated by the openjdk11, therefore, tests with SSL enabled failed. (Error 
message is "TrustAnchor with subject "CN=SystemTestCA" is not a CA certificate")
 2. The actual stack trace of the fail is hidden when the `ConfigException` 
stack trace printed.
   
   This PR fixes those 2 issues:
  * ` --ext bc=ca:true` param for `keytool` added.
  * SSL Validation exception printed in error log.
   
   [Keytool 
documentation](https://docs.oracle.com/en/java/javase/11/tools/keytool.html)
   
   >Supported Named Extensions
   > The keytool command supports these named extensions. The names aren't 
case-sensitive.
   > BC or BasicContraints
   > Values:
   > The full form is ca:{true|false}[,pathlen:len] or len, which is short for 
ca:true,pathlen:len.
   > When len is omitted, the resulting value is ca:true. 
   
   Command to run tests(openjdk11 used):
   ```
   export tests="tests/kafkatest/tests/connect/connect_distributed_test.py"
   TC_PATHS="$tests" bash tests/docker/run_tests.sh
   ```
   
   java version in a docker container:
   ```
   [nizhikov@sbt-qa-01 kafka]$ docker exec -it ducker04 bash
   ducker@ducker04:/$ java -version
   openjdk version "11.0.6" 2020-01-14
   OpenJDK Runtime Environment 18.9 (build 11.0.6+10)
   OpenJDK 64-Bit Server VM 18.9 (build 11.0.6+10, mixed mode)
   ```
   
   Exception in tests *without* new `ext` parameter:
   
   ```
   [2020-02-13 10:17:46,244] DEBUG Created SSL context with keystore 
SecurityStore(path=/mnt/security/test.keystore.jks, modificationTime=Thu Feb 13 
10:17:43 UTC 2020), truststore 
SecurityStore(path=/mnt/security/test.truststore.jks, modificationTime=Thu Feb 
13 10:17:41 UTC 2020), provider SunJSSE. 
(org.apache.kafka.common.security.ssl.SslEngineBuilder)
   javax.net.ssl.SSLHandshakeException: PKIX path validation failed: 
sun.security.validator.ValidatorException: TrustAnchor with subject 
"CN=SystemTestCA" is not a CA certificate
   at 
java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
   at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:320)
   at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:263)
   at 
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:258)
   at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1332)
   at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1207)
   at 
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1150)
   at 
java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
   at 
java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
   at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
   at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1048)
   at java.base/java.security.AccessController.doPrivileged(Native 
Method)
   at 
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:995)
   at 
org.apache.kafka.common.security.ssl.SslFactory$SslEngineValidator.handshake(SslFactory.java:360)
   at 
org.apache.kafka.common.security.ssl.SslFactory$SslEngineValidator.validate(SslFactory.java:301)
   at 
org.apache.kafka.common.security.ssl.SslFactory$SslEngineValidator.validate(SslFactory.java:282)
   at 
org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:98)
   at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:168)
   at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:157)
   at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:97)
   at kafka.network.Processor.(SocketServer.scala:724)
   at kafka.network.SocketServer.newProcessor(SocketServer.scala:367)
   at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:252)
   at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:251)
   at 
kafka.network.SocketServer

[jira] [Commented] (KAFKA-9319) Run some system tests using TLSv1.3

2020-02-13 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036297#comment-17036297
 ] 

Nikolay Izhikov commented on KAFKA-9319:


Hello, [~rsivaram].

I found two issues with system tests when only TLSv1.3 enabled:

1. CA certificate that is generated in `security_config.py` can't be validated 
by the openjdk11, therefore, tests with SSL enabled failed. (Error message is 
"TrustAnchor with subject "CN=SystemTestCA" is not a CA certificate")
2. The actual stack trace of the fail is hidden when the `ConfigException` 
stack trace printed.

I fixed both of them and raised a PR - https://github.com/apache/kafka/pull/8106
A detailed explanation of the issues in the PR description.

Can you, please, take a look?



> Run some system tests using TLSv1.3
> ---
>
> Key: KAFKA-9319
> URL: https://issues.apache.org/jira/browse/KAFKA-9319
> Project: Kafka
>  Issue Type: Test
>Reporter: Rajini Sivaram
>Assignee: Nikolay Izhikov
>Priority: Major
> Fix For: 2.5.0
>
>
> KAFKA-7251 enables TLSv1.3 for Kafka. We should get some system tests to run 
> using TLSv1.3. Since TLSv1.3 is only supported from Java 11 onwards, we need 
> a system test build that runs with JDK 11 to enable these tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9455) Consider using TreeMap for in-memory stores of Streams

2020-02-13 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9455?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036300#comment-17036300
 ] 

highluck commented on KAFKA-9455:
-

[~guozhang]

I don't know if I understand..

I'm trying to replace JoinWindowStore and existing InMemoryWindowStore with 
TreeMap. What do you think?
single-point query -> WindowStore with TreeMap

range-query -> JoinWindowStore

 

If it's not what you think, please give me a hint

 

> Consider using TreeMap for in-memory stores of Streams
> --
>
> Key: KAFKA-9455
> URL: https://issues.apache.org/jira/browse/KAFKA-9455
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: highluck
>Priority: Major
>  Labels: newbie++
>
> From [~ableegoldman]: It's worth noting that it might be a good idea to 
> switch to TreeMap for different reasons. Right now the ConcurrentSkipListMap 
> allows us to safely perform range queries without copying over the entire 
> keyset, but the performance on point queries seems to scale noticeably worse 
> with the number of unique keys. Point queries are used by aggregations while 
> range queries are used by windowed joins, but of course both are available 
> within the PAPI and for interactive queries so it's hard to say which we 
> should prefer. Maybe rather than make that tradeoff we should have one 
> version for efficient range queries (a "JoinWindowStore") and one for 
> efficient point queries ("AggWindowStore") - or something. I know we've had 
> similar thoughts for a different RocksDB store layout for Joins (although I 
> can't find that ticket anywhere..), it seems like the in-memory stores could 
> benefit from a special "Join" version as well cc/ Guozhang Wang
> Here are some random thoughts:
> 1. For kafka streams processing logic (i.e. without IQ), it's better to make 
> all processing logic relying on point queries rather than range queries. 
> Right now the only processor that use range queries are, as mentioned above, 
> windowed stream-stream joins. I think we should consider using a different 
> window implementation for this (and as a result also get rid of the 
> retainDuplicate flags) to refactor the windowed stream-stream join operation.
> 2. With 1), range queries would only be exposed as IQ. Depending on its usage 
> frequency I think it makes lots of sense to optimize for single-point queries.
> Of course, even without step 1) we should still consider using tree-map for 
> windowed in-memory stores to have a better scaling effect.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions

2020-02-13 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036321#comment-17036321
 ] 

highluck commented on KAFKA-7499:
-

[~jbfletch], [~mjsax]

Is this ticket in progress?

If not, can I proceed?

> Extend ProductionExceptionHandler to cover serialization exceptions
> ---
>
> Key: KAFKA-7499
> URL: https://issues.apache.org/jira/browse/KAFKA-7499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: jbfletch
>Priority: Major
>  Labels: beginner, kip, newbie
>
> In 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce],
>  an exception handler for the write path was introduced. This exception 
> handler covers exception that are raised in the producer callback.
> However, serialization happens before the data is handed to the producer with 
> Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair 
> types.
> Thus, we might want to extend the ProductionExceptionHandler to cover 
> serialization exception, too, to skip over corrupted output messages. An 
> example could be a "String" message that contains invalid JSON and should be 
> serialized as JSON.
> KIP-399 (not voted yet; feel free to pick it up): 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9517) KTable Joins Without Materialized Argument Yield Results That Further Joins NPE On

2020-02-13 Thread Paul Snively (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036329#comment-17036329
 ] 

Paul Snively commented on KAFKA-9517:
-

John,

Thanks so much for your prompt attention, attention to detail, and pointers to 
the relevant issues and PRs. I'm glad to hear they've been merged and look 
forward to the 2.4.1 release when the last blocker is resolved. We'll continue 
pushing on our specific use-cases for the remainder of the week and let you 
know ASAP if anything continues to arise.

> KTable Joins Without Materialized Argument Yield Results That Further Joins 
> NPE On
> --
>
> Key: KAFKA-9517
> URL: https://issues.apache.org/jira/browse/KAFKA-9517
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Paul Snively
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.5.0, 2.4.1
>
> Attachments: test.tar.xz
>
>
> The `KTable` API implemented [[here||#L842-L844]] 
> [https://github.com/apache/kafka/blob/2.4.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842-L844]
>  []|#L842-L844]] calls `doJoinOnForeignKey` with an argument of 
> `Materialized.with(null, null)`, as apparently do several other APIs. As the 
> comment spanning [these lines|#L1098-L1099]] makes clear, the result is a 
> `KTable` whose `valueSerde` (as a `KTableImpl`) is `null`. Therefore, 
> attempts to `join` etc. on the resulting `KTable` fail with a 
> `NullPointerException`.
> While there is an obvious workaround—explicitly construct the required 
> `Materialized` and use the APIs that take it as an argument—I have to admit I 
> find the existence of public APIs with this sort of bug, particularly when 
> the bug is literally documented as a comment in the source code, astonishing 
> to the point of incredulity. It calls the quality and trustworthiness of 
> Kafka Streams into serious question, and if a resolution is not forthcoming 
> within a week, we will be left with no other option but to consider technical 
> alternatives.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9548) RemoteStorageManager and RemoteLogMetadataManager interfaces.

2020-02-13 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-9548:
-

 Summary: RemoteStorageManager and RemoteLogMetadataManager 
interfaces.
 Key: KAFKA-9548
 URL: https://issues.apache.org/jira/browse/KAFKA-9548
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Reporter: Satish Duggana






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9549) Local storage implementations for RSM and RLMM which can be used in tests.

2020-02-13 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-9549:
-

 Summary: Local storage implementations for RSM and RLMM which can 
be used in tests.
 Key: KAFKA-9549
 URL: https://issues.apache.org/jira/browse/KAFKA-9549
 Project: Kafka
  Issue Type: Sub-task
Reporter: Satish Duggana
Assignee: Alexandre Dupriez






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9548) RemoteStorageManager and RemoteLogMetadataManager interfaces.

2020-02-13 Thread Satish Duggana (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Satish Duggana reassigned KAFKA-9548:
-

Assignee: Satish Duggana

> RemoteStorageManager and RemoteLogMetadataManager interfaces.
> -
>
> Key: KAFKA-9548
> URL: https://issues.apache.org/jira/browse/KAFKA-9548
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Satish Duggana
>Assignee: Satish Duggana
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-9533:
--

Assignee: Michael Viamari

> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Viamari
>Assignee: Michael Viamari
>Priority: Minor
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-13 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036369#comment-17036369
 ] 

Matthias J. Sax commented on KAFKA-9533:


[~mviamari] Thanks for picking up this ticket – I added you to the list of 
contributors and you can now self-assign tickets.

> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Viamari
>Priority: Minor
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9550) RemoteLogManager implementation

2020-02-13 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-9550:
-

 Summary: RemoteLogManager implementation 
 Key: KAFKA-9550
 URL: https://issues.apache.org/jira/browse/KAFKA-9550
 Project: Kafka
  Issue Type: Sub-task
Reporter: Satish Duggana
Assignee: Satish Duggana


Implementation of RLM as mentioned in the HLD section of KIP-405

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP-405:KafkaTieredStorage-High-leveldesign]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9551) Alternate WindowKeySchema Implementations

2020-02-13 Thread Michael Viamari (Jira)
Michael Viamari created KAFKA-9551:
--

 Summary: Alternate WindowKeySchema Implementations
 Key: KAFKA-9551
 URL: https://issues.apache.org/jira/browse/KAFKA-9551
 Project: Kafka
  Issue Type: Improvement
Reporter: Michael Viamari


Currently, the {{WindowKeySchema}} used by all {{WindowStore}} implementations 
serializes the key with window information as {{keyBytes + timestampBytes + 
seqNumByte}}. This is optimal for iterations and queries that have a fixed key 
with a variable time window (which I think is leveraged in KStream-KStream join 
windows).

 

In cases where the time-window is fixed, but the key range is variable, there 
is a significant overhead for iteration: all time-windows for a given key must 
be traversed. The iteration only uses 1 out of every N keys, where N is the 
number of windows.

A key serialization format that is structured as {{timestampBytes}} + 
{{keyBytes + seqNumByte}} would be much more efficient when iterating over keys 
in a fixed window.

Implementing a custom {{KeySchema}} is not easy at the moment. Currently, 
{{WindowKeySchema}} is instantiated when supplying a RocksDB instance in 
{{RocksDbWindowBytesStoreSupplier}}, but most/all other references to a 
{{KeySchema}} use static functions on {{WindowKeySchema}}. This makes 
supporting an alternate {{KeySchema}} very challenging. Additionally, making a 
custom implementation of {{KeySchema}} is complicated by the fact that although 
{{RocksDBSegmentedBytesStore.KeySchema}} is a public interface, the interface 
depends on {{HasNextCondition}} which is package-private to 
{{org.apache.kafka.streams.state.internals}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9551) Alternate WindowKeySchema Implementations

2020-02-13 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9551:
---
Component/s: streams

> Alternate WindowKeySchema Implementations
> -
>
> Key: KAFKA-9551
> URL: https://issues.apache.org/jira/browse/KAFKA-9551
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Viamari
>Priority: Minor
>
> Currently, the {{WindowKeySchema}} used by all {{WindowStore}} 
> implementations serializes the key with window information as {{keyBytes + 
> timestampBytes + seqNumByte}}. This is optimal for iterations and queries 
> that have a fixed key with a variable time window (which I think is leveraged 
> in KStream-KStream join windows).
>  
> In cases where the time-window is fixed, but the key range is variable, there 
> is a significant overhead for iteration: all time-windows for a given key 
> must be traversed. The iteration only uses 1 out of every N keys, where N is 
> the number of windows.
> A key serialization format that is structured as {{timestampBytes}} + 
> {{keyBytes + seqNumByte}} would be much more efficient when iterating over 
> keys in a fixed window.
> Implementing a custom {{KeySchema}} is not easy at the moment. Currently, 
> {{WindowKeySchema}} is instantiated when supplying a RocksDB instance in 
> {{RocksDbWindowBytesStoreSupplier}}, but most/all other references to a 
> {{KeySchema}} use static functions on {{WindowKeySchema}}. This makes 
> supporting an alternate {{KeySchema}} very challenging. Additionally, making 
> a custom implementation of {{KeySchema}} is complicated by the fact that 
> although {{RocksDBSegmentedBytesStore.KeySchema}} is a public interface, the 
> interface depends on {{HasNextCondition}} which is package-private to 
> {{org.apache.kafka.streams.state.internals}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions

2020-02-13 Thread jbfletch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036373#comment-17036373
 ] 

jbfletch commented on KAFKA-7499:
-

It's in progress 🙂 but thanks for checking in 

> Extend ProductionExceptionHandler to cover serialization exceptions
> ---
>
> Key: KAFKA-7499
> URL: https://issues.apache.org/jira/browse/KAFKA-7499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: jbfletch
>Priority: Major
>  Labels: beginner, kip, newbie
>
> In 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce],
>  an exception handler for the write path was introduced. This exception 
> handler covers exception that are raised in the producer callback.
> However, serialization happens before the data is handed to the producer with 
> Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair 
> types.
> Thus, we might want to extend the ProductionExceptionHandler to cover 
> serialization exception, too, to skip over corrupted output messages. An 
> example could be a "String" message that contains invalid JSON and should be 
> serialized as JSON.
> KIP-399 (not voted yet; feel free to pick it up): 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9533) ValueTransform forwards `null` values

2020-02-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036381#comment-17036381
 ] 

ASF GitHub Bot commented on KAFKA-9533:
---

mviamari commented on pull request #8108: KAFKA-9533: ValueTransform forwards 
`null` values
URL: https://github.com/apache/kafka/pull/8108
 
 
   Fixes a bug where `KStream#transformValues` would forward null values from 
the provided `ValueTransform#transform` operation.
   
   A test was added for verification 
`shouldNotForwardNullTransformValuesWithValueTransformerWithKey`. A parallel 
test for non-key ValueTransformer was not added, as they share the same code 
path.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ValueTransform forwards `null` values
> -
>
> Key: KAFKA-9533
> URL: https://issues.apache.org/jira/browse/KAFKA-9533
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Viamari
>Assignee: Michael Viamari
>Priority: Minor
>
> According to the documentation for `KStream#transformValues`, nulls returned 
> from `ValueTransformer#transform` are not forwarded. (see 
> [KStream#transformValues|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-]
> However, this does not appear to be the case. In 
> `KStreamTransformValuesProcessor#process` the result of the transform is 
> forwarded directly.
> {code:java}
>  @Override
>  public void process(final K key, final V value) {
>  context.forward(key, valueTransformer.transform(key, value));
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2020-02-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036391#comment-17036391
 ] 

ASF GitHub Bot commented on KAFKA-9545:
---

abbccdda commented on pull request #8109: KAFKA-9545: Fix subscription bugs 
from Stream refactoring
URL: https://github.com/apache/kafka/pull/8109
 
 
   This PR fixes two bugs related to stream refactoring:
   
   1. The subscribed topics are not updated correctly when topic gets removed 
from broker. 
   2. The `remainingPartitions` computation doesn't account the condition for 
one task having a pattern subscription of multiple topics.
   
   The bugs are exposed from `testRegexMatchesTopicsAWhenDeleted` and could be 
used to verify the fix works.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Boyang Chen
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8616) Replace ApiVersionsRequest request/response with automated protocol

2020-02-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036407#comment-17036407
 ] 

ASF GitHub Bot commented on KAFKA-8616:
---

abbccdda commented on pull request #7052: KAFKA-8616: Replace 
ApiVersionsRequest request/response with automated protocol
URL: https://github.com/apache/kafka/pull/7052
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Replace ApiVersionsRequest request/response with automated protocol
> ---
>
> Key: KAFKA-8616
> URL: https://issues.apache.org/jira/browse/KAFKA-8616
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9106) metrics exposed via JMX shoud be configurable

2020-02-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036408#comment-17036408
 ] 

ASF GitHub Bot commented on KAFKA-9106:
---

cmccabe commented on pull request #7674: KAFKA-9106 make metrics exposed via 
jmx configurable
URL: https://github.com/apache/kafka/pull/7674
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> metrics exposed via JMX shoud be configurable
> -
>
> Key: KAFKA-9106
> URL: https://issues.apache.org/jira/browse/KAFKA-9106
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
>
> Kafka exposes a very large number of metrics, all of which are always visible 
> in JMX by default. On large clusters with many partitions, this may result in 
> tens of thousands of mbeans to be registered, which can lead to timeouts with 
> some popular monitoring agents that rely on listing JMX metrics via RMI.
> Making the set of JMX-visible metrics configurable would allow operators to 
> decide on the set of critical metrics to collect and workaround limitation of 
> JMX in those cases.
> corresponding KIP
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9552) Stream should handle OutOfSequence exception thrown from Producer

2020-02-13 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-9552:
--

 Summary: Stream should handle OutOfSequence exception thrown from 
Producer
 Key: KAFKA-9552
 URL: https://issues.apache.org/jira/browse/KAFKA-9552
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.5.0
Reporter: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9552) Stream should handle OutOfSequence exception thrown from Producer

2020-02-13 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9552:
---
Description: 
As of today the stream thread could die from OutOfSequence error:
[2020-02-12T07:14:35-08:00] 
(streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
[2020-02-12T07:14:35-08:00] 
(streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) [2020-02-12 
15:14:35,185] ERROR 
[stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] 
stream-thread 
[stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] Failed 
to commit stream task 3_2 due to the following error: 
(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
[2020-02-12T07:14:35-08:00] 
(streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
org.apache.kafka.streams.errors.StreamsException: task [3_2] Abort sending 
since an error caught with a previous record (timestamp 1581484094825) to topic 
stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-49-changelog due to 
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)
at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1353)
Although this is fatal exception for Producer, stream should treat it as an 
opportunity to reinitialize by doing a rebalance, instead of killing 
computation resource.

> Stream should handle OutOfSequence exception thrown from Producer
> -
>
> Key: KAFKA-9552
> URL: https://issues.apache.org/jira/browse/KAFKA-9552
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Boyang Chen
>Priority: Major
>
> As of today the stream thread could die from OutOfSequence error:
> [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) [2020-02-12 
> 15:14:35,185] ERROR 
> [stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] 
> stream-thread 
> [stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] Failed 
> to commit stream task 3_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
> [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
> org.apache.kafka.streams.errors.StreamsException: task [3_2] Abort sending 
> since an error caught with a previous record (timestamp 1581484094825) to 
> topic stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-49-changelog due 
> to org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1353)
> Although this is fatal exception for Producer, stream should treat it as an 
> opportunity to reinitialize by doing a rebalance, instead of killing 
> computation resource.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9552) Stream should handle OutOfSequence exception thrown from Producer

2020-02-13 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9552:
---
Description: 
As of today the stream thread could die from OutOfSequence error:
{code:java}
 [2020-02-12T07:14:35-08:00] 
(streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
 [2020-02-12T07:14:35-08:00] 
(streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) [2020-02-12 
15:14:35,185] ERROR 
[stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] 
stream-thread 
[stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] Failed 
to commit stream task 3_2 due to the following error: 
(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
 [2020-02-12T07:14:35-08:00] 
(streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
org.apache.kafka.streams.errors.StreamsException: task [3_2] Abort sending 
since an error caught with a previous record (timestamp 1581484094825) to topic 
stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-49-changelog due to 
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
 at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)
 at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1353)
{code}

 Although this is fatal exception for Producer, stream should treat it as an 
opportunity to reinitialize by doing a rebalance, instead of killing 
computation resource.

  was:
As of today the stream thread could die from OutOfSequence error:
[2020-02-12T07:14:35-08:00] 
(streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
[2020-02-12T07:14:35-08:00] 
(streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) [2020-02-12 
15:14:35,185] ERROR 
[stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] 
stream-thread 
[stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] Failed 
to commit stream task 3_2 due to the following error: 
(org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
[2020-02-12T07:14:35-08:00] 
(streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
org.apache.kafka.streams.errors.StreamsException: task [3_2] Abort sending 
since an error caught with a previous record (timestamp 1581484094825) to topic 
stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-49-changelog due to 
org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received 
an out of order sequence number.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)
at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1353)
Although this is fatal exception for Producer, stream should treat it as an 
opportunity to reinitialize by doing a rebalance, instead of killing 
computation resource.


> Stream should handle OutOfSequence exception thrown from Producer
> -
>
> Key: KAFKA-9552
> URL: https://issues.apache.org/jira/browse/KAFKA-9552
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Boyang Chen
>Priority: Major
>
> As of today the stream thread could die from OutOfSequence error:
> {code:java}
>  [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
>  [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) [2020-02-12 
> 15:14:35,185] ERROR 
> [stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] 
> stream-thread 
> [stream-soak-test-546f8754-5991-4d62-8565-dbe98d51638e-StreamThread-1] Failed 
> to commit stream task 3_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
>  [2020-02-12T07:14:35-08:00] 
> (streams-soak-2-5-eos_soak_i-03f89b1e566ac95cc_streamslog) 
> 

[jira] [Commented] (KAFKA-9206) Consumer should handle `CORRUPT_MESSAGE` error code in fetch response

2020-02-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036473#comment-17036473
 ] 

ASF GitHub Bot commented on KAFKA-9206:
---

agam commented on pull request #8111: KAFKA-9206: log a warning when 
encountering CORRUPT_MESSAGE
URL: https://github.com/apache/kafka/pull/8111
 
 
   - If the completed fetch has an error code signifying a _corrupt message_, 
log a specific warning for this (instead of the generic `Unexpected error code` 
right now).
   - Also added a test that triggers the warning and verifies it is logged.
   
   - Jira: https://issues.apache.org/jira/browse/KAFKA-9206
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer should handle `CORRUPT_MESSAGE` error code in fetch response
> -
>
> Key: KAFKA-9206
> URL: https://issues.apache.org/jira/browse/KAFKA-9206
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Agam Brahma
>Priority: Major
>
> This error code is possible, for example, when the broker scans the log to 
> find the fetch offset after the index lookup. Currently this results in a 
> slightly obscure message such as the following:
> {code:java}
> java.lang.IllegalStateException: Unexpected error code 2 while fetching 
> data{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8507) Support --bootstrap-server in all command line tools

2020-02-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036528#comment-17036528
 ] 

ASF GitHub Bot commented on KAFKA-8507:
---

hachikuji commented on pull request #8023: KAFKA-8507 kip 499 Unify connection 
name flag for command line tool
URL: https://github.com/apache/kafka/pull/8023
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support --bootstrap-server in all command line tools
> 
>
> Key: KAFKA-8507
> URL: https://issues.apache.org/jira/browse/KAFKA-8507
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 2.5.0
>Reporter: Jason Gustafson
>Assignee: Mitchell
>Priority: Major
>  Labels: pull-request-available
>
> This is a unambitious initial move toward standardizing the command line 
> tools. We have favored the name {{\-\-bootstrap-server}} in all new tools 
> since it matches the config {{bootstrap.server}} which is used by all 
> clients. Some older commands use {{\-\-broker-list}} or 
> {{\-\-bootstrap-servers}} and maybe other exotic variations. We should 
> support {{\-\-bootstrap-server}} in all commands and deprecate the other 
> options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9290) Update IQ related JavaDocs

2020-02-13 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036566#comment-17036566
 ] 

ASF GitHub Bot commented on KAFKA-9290:
---

highluck commented on pull request #8114: KAFKA-9290: Update IQ related JavaDocs
URL: https://github.com/apache/kafka/pull/8114
 
 
   Update IQ related JavaDocs
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Update IQ related JavaDocs
> --
>
> Key: KAFKA-9290
> URL: https://issues.apache.org/jira/browse/KAFKA-9290
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: highluck
>Priority: Minor
>  Labels: beginner, newbie
>
> In Kafka 2.1.0 we deprecated couple of methods (KAFKA-7277) to pass in 
> timestamps via IQ API via Duration/Instance parameters instead of plain longs.
> In Kafka 2.3.0 we introduced TimestampedXxxStores (KAFKA-3522) and allow IQ 
> to return the stored timestamp.
> However, we never update our JavaDocs that contain code snippets to 
> illustrate how a local store can be queries. For example 
> `KGroupedStream#count(Materialized)` 
> ([https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java#L116-L122]):
>  
> {code:java}
> * {@code * KafkaStreams streams = ... // counting words
> * String queryableStoreName = "storeName"; // the store name should be the 
> name of the store as defined by the Materialized instance
> * ReadOnlyKeyValueStore localStore = 
> streams.store(queryableStoreName, QueryableStoreTypes. Long>keyValueStore());
> * String key = "some-word";
> * Long countForWord = localStore.get(key); // key must be local (application 
> state is shared over all running Kafka Streams instances)
> * }
> {code}
> We should update all JavaDocs to use `TimestampedXxxStore` and the new 
> Duration/Instance methods in all those code snippets.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9254) Updating Kafka Broker configuration dynamically twice reverts log configuration to default

2020-02-13 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-9254:
---
Affects Version/s: 1.1.0

> Updating Kafka Broker configuration dynamically twice reverts log 
> configuration to default
> --
>
> Key: KAFKA-9254
> URL: https://issues.apache.org/jira/browse/KAFKA-9254
> Project: Kafka
>  Issue Type: Bug
>  Components: config, log, replication
>Affects Versions: 1.1.0, 2.0.1, 2.1.1, 2.2.2, 2.4.0, 2.3.1
>Reporter: fenghong
>Assignee: huxihx
>Priority: Critical
> Fix For: 2.0.2, 2.1.2, 2.2.3, 2.3.2, 2.4.1
>
>
> We are engineers at Huobi and now encounter Kafka BUG 
> Modifying DynamicBrokerConfig more than 2 times will invalidate the topic 
> level unrelated configuration
> The bug reproduction method as follows:
>  # Set Kafka Broker config  server.properties min.insync.replicas=3
>  # Create topic test-1 and set topic‘s level config min.insync.replicas=2
>  # Dynamically modify the configuration twice as shown below
> {code:java}
> bin/kafka-configs.sh --bootstrap-server xxx:9092 --entity-type brokers 
> --entity-default --alter --add-config log.message.timestamp.type=LogAppendTime
> bin/kafka-configs.sh --bootstrap-server xxx:9092 --entity-type brokers 
> --entity-default --alter --add-config log.retention.ms=60480
> {code}
>  # stop a Kafka Server and found the Exception as shown below
>  org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync 
> replicas for partition test-1-0 is [2], below required minimum [3]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9254) Updating Kafka Broker configuration dynamically twice reverts log configuration to default

2020-02-13 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-9254:
---
Fix Version/s: 1.1.2

> Updating Kafka Broker configuration dynamically twice reverts log 
> configuration to default
> --
>
> Key: KAFKA-9254
> URL: https://issues.apache.org/jira/browse/KAFKA-9254
> Project: Kafka
>  Issue Type: Bug
>  Components: config, log, replication
>Affects Versions: 1.1.0, 2.0.1, 2.1.1, 2.2.2, 2.4.0, 2.3.1
>Reporter: fenghong
>Assignee: huxihx
>Priority: Critical
> Fix For: 1.1.2, 2.0.2, 2.1.2, 2.2.3, 2.3.2, 2.4.1
>
>
> We are engineers at Huobi and now encounter Kafka BUG 
> Modifying DynamicBrokerConfig more than 2 times will invalidate the topic 
> level unrelated configuration
> The bug reproduction method as follows:
>  # Set Kafka Broker config  server.properties min.insync.replicas=3
>  # Create topic test-1 and set topic‘s level config min.insync.replicas=2
>  # Dynamically modify the configuration twice as shown below
> {code:java}
> bin/kafka-configs.sh --bootstrap-server xxx:9092 --entity-type brokers 
> --entity-default --alter --add-config log.message.timestamp.type=LogAppendTime
> bin/kafka-configs.sh --bootstrap-server xxx:9092 --entity-type brokers 
> --entity-default --alter --add-config log.retention.ms=60480
> {code}
>  # stop a Kafka Server and found the Exception as shown below
>  org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync 
> replicas for partition test-1-0 is [2], below required minimum [3]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7644) Worker Re balance enhancements

2020-02-13 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036603#comment-17036603
 ] 

Konstantine Karantasis commented on KAFKA-7644:
---

This ticket is similar to [#KAFKA-5505]
 and has now been fixed after 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-415:+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]
 was implemented.

Closing as duplicate of [#KAFKA-5505] that is now fixed.

> Worker Re balance enhancements
> --
>
> Key: KAFKA-7644
> URL: https://issues.apache.org/jira/browse/KAFKA-7644
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: satya
>Priority: Major
>
> Currently Kafka Connect distributed worker triggers a re balance any time 
> there is a new connector/task is added irrespective of whether the connector 
> added is a source connector or sink connector. 
> My understanding has been the worker re balance should be identical to 
> consumer group re balance. That said, should not source connectors be immune 
> to the re balance ?
> Are we not supposed to use source connectors with distributed workers ?
> It does appear to me there is some caveat in the way the worker re balance is 
> working and it needs enhancement to not trigger unwanted re balances causing 
> restarts of all tasks etc.
> Kafka connectors should have a way to not restart and stay with existing 
> partition assignment if the re balance trigger is related to a different 
> connector
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7644) Worker Re balance enhancements

2020-02-13 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-7644.
---
Resolution: Duplicate

> Worker Re balance enhancements
> --
>
> Key: KAFKA-7644
> URL: https://issues.apache.org/jira/browse/KAFKA-7644
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: satya
>Priority: Major
>
> Currently Kafka Connect distributed worker triggers a re balance any time 
> there is a new connector/task is added irrespective of whether the connector 
> added is a source connector or sink connector. 
> My understanding has been the worker re balance should be identical to 
> consumer group re balance. That said, should not source connectors be immune 
> to the re balance ?
> Are we not supposed to use source connectors with distributed workers ?
> It does appear to me there is some caveat in the way the worker re balance is 
> working and it needs enhancement to not trigger unwanted re balances causing 
> restarts of all tasks etc.
> Kafka connectors should have a way to not restart and stay with existing 
> partition assignment if the re balance trigger is related to a different 
> connector
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9446) Integration test library should provide utilities to assert connector state

2020-02-13 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036609#comment-17036609
 ] 

Konstantine Karantasis commented on KAFKA-9446:
---

This is mostly addressed by: 

[https://github.com/apache/kafka/pull/8055]

Additional methods can be added as needed to the integration utils. 
The class that now contains the assertions and the checks for integration tests 
is

{{org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions}}

Given this class contains similar methods to the ones described above, such as:

{code:java}
assertAtLeastNumWorkersAreUp
assertConnectorAndAtLeastNumTasksAreRunning
assertConnectorAndExactlyNumTasksAreRunning
assertConnectorIsRunningAndTasksHaveFailed
{code}
and some more, maybe we can close this ticket. 

> Integration test library should provide utilities to assert connector state
> ---
>
> Key: KAFKA-9446
> URL: https://issues.apache.org/jira/browse/KAFKA-9446
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Minor
>
> The integration testing library for Connect could be expanded to include 
> methods for verifying the state of connectors. Some possibilities are:
>  
> {{public boolean connectorIsRunning(String connector);}}
> {{public boolean connectorIsFailed(String connector);}}
> {{public boolean connectorIsRunningWithTasks(String connector, int 
> numTasks);}}
> {{public boolean connectorIsFailedWithTasksRunning(String connector, int 
> numTasks);}}
> {{public boolean connectorAndTasksAreFailed(String connector, int numTasks);}}
>  
> These could be used in conjunction with the various 
> [waitForCondition|https://github.com/apache/kafka/blob/6d87c12729ac6dc9d39949c931fad4c45c6af841/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L335-L372]
>  methods to easily wait for connectors to be started, failed, etc. during 
> tests.
>  
> Functionality like this is already present in some integration tests, but is 
> implemented on a per-test basis instead of as part of the integration testing 
> library itself.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9446) Integration test library should provide utilities to assert connector state

2020-02-13 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036609#comment-17036609
 ] 

Konstantine Karantasis edited comment on KAFKA-9446 at 2/14/20 12:39 AM:
-

This is mostly addressed by: 

[https://github.com/apache/kafka/pull/8055]

Additional methods can be added as needed to the integration utils. 
The class that now contains the assertions and the checks for integration tests 
is

{{org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions}}

Given this class contains similar methods to the ones described above, such as:

{code:java}
assertAtLeastNumWorkersAreUp
assertConnectorAndAtLeastNumTasksAreRunning
assertConnectorAndExactlyNumTasksAreRunning
assertConnectorIsRunningAndTasksHaveFailed
assertConnectorAndTasksAreStopped
{code}
and some more, maybe we can close this ticket. 


was (Author: kkonstantine):
This is mostly addressed by: 

[https://github.com/apache/kafka/pull/8055]

Additional methods can be added as needed to the integration utils. 
The class that now contains the assertions and the checks for integration tests 
is

{{org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions}}

Given this class contains similar methods to the ones described above, such as:

{code:java}
assertAtLeastNumWorkersAreUp
assertConnectorAndAtLeastNumTasksAreRunning
assertConnectorAndExactlyNumTasksAreRunning
assertConnectorIsRunningAndTasksHaveFailed
{code}
and some more, maybe we can close this ticket. 

> Integration test library should provide utilities to assert connector state
> ---
>
> Key: KAFKA-9446
> URL: https://issues.apache.org/jira/browse/KAFKA-9446
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Minor
>
> The integration testing library for Connect could be expanded to include 
> methods for verifying the state of connectors. Some possibilities are:
>  
> {{public boolean connectorIsRunning(String connector);}}
> {{public boolean connectorIsFailed(String connector);}}
> {{public boolean connectorIsRunningWithTasks(String connector, int 
> numTasks);}}
> {{public boolean connectorIsFailedWithTasksRunning(String connector, int 
> numTasks);}}
> {{public boolean connectorAndTasksAreFailed(String connector, int numTasks);}}
>  
> These could be used in conjunction with the various 
> [waitForCondition|https://github.com/apache/kafka/blob/6d87c12729ac6dc9d39949c931fad4c45c6af841/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L335-L372]
>  methods to easily wait for connectors to be started, failed, etc. during 
> tests.
>  
> Functionality like this is already present in some integration tests, but is 
> implemented on a per-test basis instead of as part of the integration testing 
> library itself.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9446) Integration test library should provide utilities to assert connector state

2020-02-13 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036610#comment-17036610
 ] 

Chris Egerton commented on KAFKA-9446:
--

Sounds good. Thanks for the improvements, [~kkonstantine]

> Integration test library should provide utilities to assert connector state
> ---
>
> Key: KAFKA-9446
> URL: https://issues.apache.org/jira/browse/KAFKA-9446
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Minor
>
> The integration testing library for Connect could be expanded to include 
> methods for verifying the state of connectors. Some possibilities are:
>  
> {{public boolean connectorIsRunning(String connector);}}
> {{public boolean connectorIsFailed(String connector);}}
> {{public boolean connectorIsRunningWithTasks(String connector, int 
> numTasks);}}
> {{public boolean connectorIsFailedWithTasksRunning(String connector, int 
> numTasks);}}
> {{public boolean connectorAndTasksAreFailed(String connector, int numTasks);}}
>  
> These could be used in conjunction with the various 
> [waitForCondition|https://github.com/apache/kafka/blob/6d87c12729ac6dc9d39949c931fad4c45c6af841/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L335-L372]
>  methods to easily wait for connectors to be started, failed, etc. during 
> tests.
>  
> Functionality like this is already present in some integration tests, but is 
> implemented on a per-test basis instead of as part of the integration testing 
> library itself.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9446) Integration test library should provide utilities to assert connector state

2020-02-13 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-9446.
--
Resolution: Fixed

> Integration test library should provide utilities to assert connector state
> ---
>
> Key: KAFKA-9446
> URL: https://issues.apache.org/jira/browse/KAFKA-9446
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Minor
>
> The integration testing library for Connect could be expanded to include 
> methods for verifying the state of connectors. Some possibilities are:
>  
> {{public boolean connectorIsRunning(String connector);}}
> {{public boolean connectorIsFailed(String connector);}}
> {{public boolean connectorIsRunningWithTasks(String connector, int 
> numTasks);}}
> {{public boolean connectorIsFailedWithTasksRunning(String connector, int 
> numTasks);}}
> {{public boolean connectorAndTasksAreFailed(String connector, int numTasks);}}
>  
> These could be used in conjunction with the various 
> [waitForCondition|https://github.com/apache/kafka/blob/6d87c12729ac6dc9d39949c931fad4c45c6af841/clients/src/test/java/org/apache/kafka/test/TestUtils.java#L335-L372]
>  methods to easily wait for connectors to be started, failed, etc. during 
> tests.
>  
> Functionality like this is already present in some integration tests, but is 
> implemented on a per-test basis instead of as part of the integration testing 
> library itself.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5866) Let source/sink task to finish their job before exit

2020-02-13 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036622#comment-17036622
 ] 

Konstantine Karantasis commented on KAFKA-5866:
---

Returning here to add another counter argument. After 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect
 the Connect workers will start keeping track of the set of topics that a 
connector is actively using. But when such connector gets deleted, this 
information should be deleted as well. 

Allowing connectors to produce records to topics after they've been stopped 
would complicate things in many cases (as for example with KIP-558) and seems 
to complicate the programming model for Kafka Connect and its connectors. 

> Let source/sink task to finish their job before exit
> 
>
> Key: KAFKA-5866
> URL: https://issues.apache.org/jira/browse/KAFKA-5866
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> My case is about reading files. When task stops to rebalance or for other 
> reason, I want let it to read file till the end at least.
> I found that flag 
> {code:java}
> WorkerTask#stopping
> {code}
>  is set to true and only then 
> {code:java}
> SourceTask.stop()
> {code}
>  is called. This stopping flag prevents WorkerSourceTask from further 
> ingestion (exit from 
> {code:java}
> while ( !isStopped()))
> {code}.
> Is it possible to let task to decide to work some more time and possibly 
> produce more records from the moment of stop() was called on rebalance?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5866) Let source/sink task to finish their job before exit

2020-02-13 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-5866.
---
Resolution: Won't Do

I'm closing this ticket, as this is a quite old request and there's not a 
strong case for it. In contrast, it seems better in many ways to keep not 
allowing more records to be produced after a source task receives a stop 
signal. 

> Let source/sink task to finish their job before exit
> 
>
> Key: KAFKA-5866
> URL: https://issues.apache.org/jira/browse/KAFKA-5866
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> My case is about reading files. When task stops to rebalance or for other 
> reason, I want let it to read file till the end at least.
> I found that flag 
> {code:java}
> WorkerTask#stopping
> {code}
>  is set to true and only then 
> {code:java}
> SourceTask.stop()
> {code}
>  is called. This stopping flag prevents WorkerSourceTask from further 
> ingestion (exit from 
> {code:java}
> while ( !isStopped()))
> {code}.
> Is it possible to let task to decide to work some more time and possibly 
> produce more records from the moment of stop() was called on rebalance?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7381) Parameterize connector rebalancing behavior

2020-02-13 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-7381.
---
Resolution: Fixed

Marking as {{Fixed}} by [#KAFKA-5505]

> Parameterize connector rebalancing behavior
> ---
>
> Key: KAFKA-7381
> URL: https://issues.apache.org/jira/browse/KAFKA-7381
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Chen He
>Assignee: Chen He
>Priority: Major
>
> I have a question about connector rebalancing issue. Why don't we make it 
> option, I mean have a parameter that turn on/off it instead of having it as a 
> must?
>  
> We can have a parameter like: "connector.rebalancing.enable" parameter and 
> make it as "true" by default. It allows users to turn it off if they want.
>  
> There are some cases that connector rebalancing is not needed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7381) Parameterize connector rebalancing behavior

2020-02-13 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17036626#comment-17036626
 ] 

Konstantine Karantasis commented on KAFKA-7381:
---

I'll close this ticket, given that since [~rhauch]'s last comment 
[KIP-415|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]
 has landed as the implemented of Incremental Cooperative Rebalance in Connect. 
Connect workers can now choose between eager and incremental cooperative 
rebalancing, and these options with respect to load balancing should cover the 
vast majority of use cases that require a Connect cluster to be deployed. It's 
worth saying that is hard to see how load balancing could be completely 
disabled if a distributed system needs to be fault tolerant in the presence of 
failures on worker nodes.

> Parameterize connector rebalancing behavior
> ---
>
> Key: KAFKA-7381
> URL: https://issues.apache.org/jira/browse/KAFKA-7381
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Chen He
>Assignee: Chen He
>Priority: Major
>
> I have a question about connector rebalancing issue. Why don't we make it 
> option, I mean have a parameter that turn on/off it instead of having it as a 
> must?
>  
> We can have a parameter like: "connector.rebalancing.enable" parameter and 
> make it as "true" by default. It allows users to turn it off if they want.
>  
> There are some cases that connector rebalancing is not needed. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9553) Transaction state loading metric does not count total loading time

2020-02-13 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9553:
--

 Summary: Transaction state loading metric does not count total 
loading time
 Key: KAFKA-9553
 URL: https://issues.apache.org/jira/browse/KAFKA-9553
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Agam Brahma


KIP-484 added a metric to track total loading time for internal topics: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-484%3A+Expose+metrics+for+group+and+transaction+metadata+loading+duration.
 The value seems to be being updated incorrectly in TransactionStateManager. 
Rather than recording the total loading time, it records the loading separately 
after every read from the log.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8507) Support --bootstrap-server in all command line tools

2020-02-13 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-8507:
---
Affects Version/s: (was: 2.5.0)

> Support --bootstrap-server in all command line tools
> 
>
> Key: KAFKA-8507
> URL: https://issues.apache.org/jira/browse/KAFKA-8507
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Jason Gustafson
>Assignee: Mitchell
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.5.0
>
>
> This is a unambitious initial move toward standardizing the command line 
> tools. We have favored the name {{\-\-bootstrap-server}} in all new tools 
> since it matches the config {{bootstrap.server}} which is used by all 
> clients. Some older commands use {{\-\-broker-list}} or 
> {{\-\-bootstrap-servers}} and maybe other exotic variations. We should 
> support {{\-\-bootstrap-server}} in all commands and deprecate the other 
> options.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)