[jira] [Commented] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed
[ https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074389#comment-16074389 ] Umesh Chaudhary commented on KAFKA-5556: [~damianguy], I can work on this. > KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve > exception from future which hasn't failed > -- > > Key: KAFKA-5556 > URL: https://issues.apache.org/jira/browse/KAFKA-5556 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Damian Guy > > From the user list: > I have been running a streaming application on some data set. Things > usually run ok. Today I was trying to run the same application on Kafka > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After > running for quite some time, I got the following exception .. > {code} > Exception in thread "StreamThread-1" java.lang.IllegalStateException: > > Attempt to retrieve exception from future which hasn't failed > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99) > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) > > at > > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > {code} > Looks like we should check if the future is done, i.e., check the return > value from poll and retry if time is remaining and {{!future.isDone()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5557) Using a logPrefix inside the StreamPartitionAssignor
Paolo Patierno created KAFKA-5557: - Summary: Using a logPrefix inside the StreamPartitionAssignor Key: KAFKA-5557 URL: https://issues.apache.org/jira/browse/KAFKA-5557 Project: Kafka Issue Type: Improvement Components: streams Reporter: Paolo Patierno Assignee: Paolo Patierno Priority: Trivial Hi, the "stream-thread [%s]" is replicated more times in all the logging messages inside the StreamPartitionAssignor. Using a logPrefix like for the StreamThread class could be better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5537) Subscribe Earliest is not working as in 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-5537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074428#comment-16074428 ] Rajini Sivaram commented on KAFKA-5537: --- [~michael.andre.pearce] I have run your test and it passes consistently for me if I increase the first sleep (before produce) from 4000 to 8000. Your logs for 0.11.0 show that you are producing before partitions are assigned. With offsets reset to "latest", this will mean that the messages produced before assignment are not consumed. >From your logs for 0.11.0: {quote} 2017-06-30 23:02:15 INFO AbstractCoordinator:597 - Discovered coordinator localhost:52196 (id: 2147483646 rack: null) for group test-group. 017-06-30 23:02:15 INFO AbstractCoordinator:597 - Discovered coordinator localhost:52196 (id: 2147483646 rack: null) for group test-group. ... 017-06-30 23:02:18 TRACE KafkaProducer:740 - Sending record ProducerRecord(topic=topic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=[B@62379589, timestamp=null) with callback null to topic topic partition 0 ... 017-06-30 23:02:21 INFO ConsumerCoordinator:262 - Setting newly assigned partitions [] for group test-group 2017-06-30 23:02:21 INFO ConsumerCoordinator:262 - Setting newly assigned partitions (topic-0) for group test-group {quote} Messages are produced 4 seconds after the consumer was created. But at this point, rebalancing hasn't completed and no partitions have been assigned to the consumers. When the partitions are assigned 3 seconds later, consumers start consuming from the "latest" offset at point, ignoring already produced messages. With {{AUTO_OFFSET_RESET_CONFIG=earliest}}, this would have worked. With lower {{group.initial.rebalance.delay.ms}}, the test works since rebalance completes sooner. When your test was waiting for 60 seconds, can you check if the wait was before producing messages when offset reset strategy is {{latest}}? > Subscribe Earliest is not working as in 0.10.2.1 > > > Key: KAFKA-5537 > URL: https://issues.apache.org/jira/browse/KAFKA-5537 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0 >Reporter: Michael Andre Pearce >Priority: Critical > Attachments: kafka_0.10.2.1.log, kafka_0.11.0.0.log, KafkaSub.java, > KafkaSubLatest.java > > > We have seen issue with subscription where auto offset when set to earliest > (and also latest) does not behave the same as in 0.10.2.1 release. > We have managed to create a repeatable test for this, which passes when > pointing to 0.10.2.1 broker. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5519) Support for multiple certificates in a single keystore
[ https://issues.apache.org/jira/browse/KAFKA-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074427#comment-16074427 ] Tom Bentley commented on KAFKA-5519: Can you explain why having a keystore per component is particularly problematic, but using multiple certificates in a keystore and using aliases would make things significantly better? > Support for multiple certificates in a single keystore > -- > > Key: KAFKA-5519 > URL: https://issues.apache.org/jira/browse/KAFKA-5519 > Project: Kafka > Issue Type: New Feature > Components: security >Affects Versions: 0.10.2.1 >Reporter: Alla Tumarkin > Labels: upstream-issue > > Background > Currently, we need to have a keystore exclusive to the component with exactly > one key in it. Looking at the JSSE Reference guide, it seems like we would > need to introduce our own KeyManager into the SSLContext which selects a > configurable key alias name. > https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509KeyManager.html > has methods for dealing with aliases. > The goal here to use a specific certificate (with proper ACLs set for this > client), and not just the first one that matches. > Looks like it requires a code change to the SSLChannelBuilder -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5557) Using a logPrefix inside the StreamPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-5557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074480#comment-16074480 ] ASF GitHub Bot commented on KAFKA-5557: --- GitHub user ppatierno opened a pull request: https://github.com/apache/kafka/pull/3488 KAFKA-5557: Using a logPrefix inside the StreamPartitionAssignor Added logPrefix for avoiding stream thread name formatting replicated more times You can merge this pull request into a Git repository by running: $ git pull https://github.com/ppatierno/kafka kafka-5557 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3488.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3488 commit 87a7400acc5feda5828a7fa1604df28a4690c7e2 Author: ppatierno Date: 2017-07-05T09:18:02Z Added logPrefix for avoiding stream thread name formatting replicated more times > Using a logPrefix inside the StreamPartitionAssignor > > > Key: KAFKA-5557 > URL: https://issues.apache.org/jira/browse/KAFKA-5557 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Trivial > > Hi, > the "stream-thread [%s]" is replicated more times in all the logging messages > inside the StreamPartitionAssignor. Using a logPrefix like for the > StreamThread class could be better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed
[ https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5556: --- Fix Version/s: 0.10.2.1 0.11.0.1 > KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve > exception from future which hasn't failed > -- > > Key: KAFKA-5556 > URL: https://issues.apache.org/jira/browse/KAFKA-5556 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Damian Guy > Fix For: 0.10.2.1, 0.11.0.1 > > > From the user list: > I have been running a streaming application on some data set. Things > usually run ok. Today I was trying to run the same application on Kafka > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After > running for quite some time, I got the following exception .. > {code} > Exception in thread "StreamThread-1" java.lang.IllegalStateException: > > Attempt to retrieve exception from future which hasn't failed > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99) > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) > > at > > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > {code} > Looks like we should check if the future is done, i.e., check the return > value from poll and retry if time is remaining and {{!future.isDone()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5556) KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed
[ https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5556: --- Priority: Critical (was: Major) > KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve > exception from future which hasn't failed > -- > > Key: KAFKA-5556 > URL: https://issues.apache.org/jira/browse/KAFKA-5556 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Damian Guy >Priority: Critical > Fix For: 0.10.2.1, 0.11.0.1 > > > From the user list: > I have been running a streaming application on some data set. Things > usually run ok. Today I was trying to run the same application on Kafka > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After > running for quite some time, I got the following exception .. > {code} > Exception in thread "StreamThread-1" java.lang.IllegalStateException: > > Attempt to retrieve exception from future which hasn't failed > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99) > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) > > at > > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > {code} > Looks like we should check if the future is done, i.e., check the return > value from poll and retry if time is remaining and {{!future.isDone()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed
[ https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-5556: -- Assignee: Umesh Chaudhary > KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve > exception from future which hasn't failed > > > Key: KAFKA-5556 > URL: https://issues.apache.org/jira/browse/KAFKA-5556 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Damian Guy >Assignee: Umesh Chaudhary >Priority: Critical > Fix For: 0.10.2.2, 0.11.0.1 > > > From the user list: > I have been running a streaming application on some data set. Things > usually run ok. Today I was trying to run the same application on Kafka > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After > running for quite some time, I got the following exception .. > {code} > Exception in thread "StreamThread-1" java.lang.IllegalStateException: > > Attempt to retrieve exception from future which hasn't failed > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99) > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) > > at > > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > {code} > Looks like we should check if the future is done, i.e., check the return > value from poll and retry if time is remaining and {{!future.isDone()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed
[ https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5556: --- Summary: KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed (was: KafkaConsumer throws: java.lang.IllegalStateException: > Attempt to retrieve exception from future which hasn't failed) > KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve > exception from future which hasn't failed > > > Key: KAFKA-5556 > URL: https://issues.apache.org/jira/browse/KAFKA-5556 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Damian Guy >Priority: Critical > Fix For: 0.10.2.2, 0.11.0.1 > > > From the user list: > I have been running a streaming application on some data set. Things > usually run ok. Today I was trying to run the same application on Kafka > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After > running for quite some time, I got the following exception .. > {code} > Exception in thread "StreamThread-1" java.lang.IllegalStateException: > > Attempt to retrieve exception from future which hasn't failed > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99) > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) > > at > > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > {code} > Looks like we should check if the future is done, i.e., check the return > value from poll and retry if time is remaining and {{!future.isDone()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed
[ https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-5556: --- Fix Version/s: (was: 0.10.2.1) 0.10.2.2 > KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve > exception from future which hasn't failed > > > Key: KAFKA-5556 > URL: https://issues.apache.org/jira/browse/KAFKA-5556 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Damian Guy >Assignee: Umesh Chaudhary >Priority: Critical > Fix For: 0.10.2.2, 0.11.0.1 > > > From the user list: > I have been running a streaming application on some data set. Things > usually run ok. Today I was trying to run the same application on Kafka > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After > running for quite some time, I got the following exception .. > {code} > Exception in thread "StreamThread-1" java.lang.IllegalStateException: > > Attempt to retrieve exception from future which hasn't failed > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99) > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) > > at > > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > {code} > Looks like we should check if the future is done, i.e., check the return > value from poll and retry if time is remaining and {{!future.isDone()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN
zhu fangbo created KAFKA-5558: - Summary: can not connect to the unsecure port after config SASL/PLAIN Key: KAFKA-5558 URL: https://issues.apache.org/jira/browse/KAFKA-5558 Project: Kafka Issue Type: New Feature Components: clients Affects Versions: 0.10.1.1 Reporter: zhu fangbo Dear All, I followed modifying sasl mechanism in a running cluster to set a cluster with one broker using SASL/PLAIN to authorize client. here are configurations: server config server.properties: listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer super.users=User:admin kafka_server_jaas.conf: KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_alice="alice"; }; My producer configured with security.protocol=SASL_PLAINTEXT and correct jass.conf can work well when I connect to the secure port(9094),but when I use the default security.protocol and connect to unsecure port(9093), the producer can not fetch metadata: DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] Sending metadata request {topics=[test]} to node -1 WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] Error while fetching metadata with correlation id 0 : {test=UNKNOWN_TOPIC_OR_PARTITION} DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] Sending metadata request {topics=[test]} to node 1 WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] Error while fetching metadata with correlation id 2 : {test=UNKNOWN_TOPIC_OR_PARTITION} Why the unsecure port can not be connected after config sasl authorization? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN
[ https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu fangbo updated KAFKA-5558: -- Issue Type: Bug (was: New Feature) > can not connect to the unsecure port after config SASL/PLAIN > > > Key: KAFKA-5558 > URL: https://issues.apache.org/jira/browse/KAFKA-5558 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 >Reporter: zhu fangbo > > Dear All, > I followed modifying sasl mechanism in a running cluster to set a cluster > with one broker using SASL/PLAIN to authorize client. here are configurations: > server config > server.properties: > listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094 > security.inter.broker.protocol=SASL_PLAINTEXT > sasl.mechanism.inter.broker.protocol=PLAIN > sasl.enabled.mechanisms=PLAIN > authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer > super.users=User:admin > kafka_server_jaas.conf: > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="admin" > user_admin="admin" > user_alice="alice"; > }; > My producer configured with security.protocol=SASL_PLAINTEXT and correct > jass.conf can work well when I connect to the secure port(9094),but when I > use the default security.protocol and connect to unsecure port(9093), the > producer can not fetch metadata: > DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Sending metadata request {topics=[test]} to node -1 > WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Error while fetching metadata with correlation id 0 : > {test=UNKNOWN_TOPIC_OR_PARTITION} > DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Sending metadata request {topics=[test]} to node 1 > WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Error while fetching metadata with correlation id 2 : > {test=UNKNOWN_TOPIC_OR_PARTITION} > Why the unsecure port can not be connected after config sasl authorization? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN
[ https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074537#comment-16074537 ] Rajini Sivaram commented on KAFKA-5558: --- The error indicates PLAINTEXT producer has connected successfully to a broker. So this is likely to be an authorization issue. Have you granted access to the topic for the PLAINTEXT user? The user name is ANONYMOUS. > can not connect to the unsecure port after config SASL/PLAIN > > > Key: KAFKA-5558 > URL: https://issues.apache.org/jira/browse/KAFKA-5558 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 >Reporter: zhu fangbo > > Dear All, > I followed modifying sasl mechanism in a running cluster to set a cluster > with one broker using SASL/PLAIN to authorize client. here are configurations: > server config > server.properties: > listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094 > security.inter.broker.protocol=SASL_PLAINTEXT > sasl.mechanism.inter.broker.protocol=PLAIN > sasl.enabled.mechanisms=PLAIN > authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer > super.users=User:admin > kafka_server_jaas.conf: > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="admin" > user_admin="admin" > user_alice="alice"; > }; > My producer configured with security.protocol=SASL_PLAINTEXT and correct > jass.conf can work well when I connect to the secure port(9094),but when I > use the default security.protocol and connect to unsecure port(9093), the > producer can not fetch metadata: > DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Sending metadata request {topics=[test]} to node -1 > WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Error while fetching metadata with correlation id 0 : > {test=UNKNOWN_TOPIC_OR_PARTITION} > DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Sending metadata request {topics=[test]} to node 1 > WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Error while fetching metadata with correlation id 2 : > {test=UNKNOWN_TOPIC_OR_PARTITION} > Why the unsecure port can not be connected after config sasl authorization? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN
[ https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074567#comment-16074567 ] zhu fangbo commented on KAFKA-5558: --- Rajini Sivaram , Thanks for your advise, it really helps.I think I shoud read the code of kafkaServer carefully . > can not connect to the unsecure port after config SASL/PLAIN > > > Key: KAFKA-5558 > URL: https://issues.apache.org/jira/browse/KAFKA-5558 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 >Reporter: zhu fangbo > > Dear All, > I followed modifying sasl mechanism in a running cluster to set a cluster > with one broker using SASL/PLAIN to authorize client. here are configurations: > server config > server.properties: > listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094 > security.inter.broker.protocol=SASL_PLAINTEXT > sasl.mechanism.inter.broker.protocol=PLAIN > sasl.enabled.mechanisms=PLAIN > authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer > super.users=User:admin > kafka_server_jaas.conf: > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="admin" > user_admin="admin" > user_alice="alice"; > }; > My producer configured with security.protocol=SASL_PLAINTEXT and correct > jass.conf can work well when I connect to the secure port(9094),but when I > use the default security.protocol and connect to unsecure port(9093), the > producer can not fetch metadata: > DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Sending metadata request {topics=[test]} to node -1 > WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Error while fetching metadata with correlation id 0 : > {test=UNKNOWN_TOPIC_OR_PARTITION} > DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Sending metadata request {topics=[test]} to node 1 > WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Error while fetching metadata with correlation id 2 : > {test=UNKNOWN_TOPIC_OR_PARTITION} > Why the unsecure port can not be connected after config sasl authorization? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5556) KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed
[ https://issues.apache.org/jira/browse/KAFKA-5556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074573#comment-16074573 ] ASF GitHub Bot commented on KAFKA-5556: --- GitHub user umesh9794 opened a pull request: https://github.com/apache/kafka/pull/3489 KAFKA-5556 : KafkaConsumer.commitSync throws IllegalStateException: A… This PR makes `commitOffsetsSync` method check whether future is completed after client's poll or not. Tests: All existing tests especially "`testCommitOffsetSyncCallbackWithNonRetriableException`" is passed. Not sure if we need to add any dedicated tests for this minor change. Awaiting your review comments. You can merge this pull request into a Git repository by running: $ git pull https://github.com/umesh9794/kafka KAFKA-5556 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3489.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3489 commit 1d44ca726026aea7bb030d5eecb5d4a197b5b0b9 Author: umesh chaudhary Date: 2017-07-05T10:45:59Z KAFKA-5556 : KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve exception from future which hasn't failed > KafkaConsumer.commitSync throws IllegalStateException: Attempt to retrieve > exception from future which hasn't failed > > > Key: KAFKA-5556 > URL: https://issues.apache.org/jira/browse/KAFKA-5556 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.2.1, 0.11.0.0 >Reporter: Damian Guy >Assignee: Umesh Chaudhary >Priority: Critical > Fix For: 0.10.2.2, 0.11.0.1 > > > From the user list: > I have been running a streaming application on some data set. Things > usually run ok. Today I was trying to run the same application on Kafka > (ver 0.10.2.1 on Scala 2.12) installed in a Mesos DC/OS cluster. After > running for quite some time, I got the following exception .. > {code} > Exception in thread "StreamThread-1" java.lang.IllegalStateException: > > Attempt to retrieve exception from future which hasn't failed > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.exception(RequestFuture.java:99) > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.isRetriable(RequestFuture.java:89) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:590) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1124) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:296) > > at > > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:79) > > at > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > > at > > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:280) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:807) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:794) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:769) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:647) > > at > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) > {code} > Looks like we should check if the future is done, i.e., check the return > value from poll and retry if time is remaining and {{!future.isDone()}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5558) can not connect to the unsecure port after config SASL/PLAIN
[ https://issues.apache.org/jira/browse/KAFKA-5558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhu fangbo resolved KAFKA-5558. --- Resolution: Fixed this is not a bug > can not connect to the unsecure port after config SASL/PLAIN > > > Key: KAFKA-5558 > URL: https://issues.apache.org/jira/browse/KAFKA-5558 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.1.1 >Reporter: zhu fangbo > > Dear All, > I followed modifying sasl mechanism in a running cluster to set a cluster > with one broker using SASL/PLAIN to authorize client. here are configurations: > server config > server.properties: > listeners=PLAINTEXT://:9093,SASL_PLAINTEXT://:9094 > security.inter.broker.protocol=SASL_PLAINTEXT > sasl.mechanism.inter.broker.protocol=PLAIN > sasl.enabled.mechanisms=PLAIN > authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer > super.users=User:admin > kafka_server_jaas.conf: > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="admin" > user_admin="admin" > user_alice="alice"; > }; > My producer configured with security.protocol=SASL_PLAINTEXT and correct > jass.conf can work well when I connect to the secure port(9094),but when I > use the default security.protocol and connect to unsecure port(9093), the > producer can not fetch metadata: > DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Sending metadata request {topics=[test]} to node -1 > WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Error while fetching metadata with correlation id 0 : > {test=UNKNOWN_TOPIC_OR_PARTITION} > DEBUG 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Sending metadata request {topics=[test]} to node 1 > WARN 17:18:10 kafka-producer-network-thread | producer-1 [NetworkClient] > Error while fetching metadata with correlation id 2 : > {test=UNKNOWN_TOPIC_OR_PARTITION} > Why the unsecure port can not be connected after config sasl authorization? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class
[ https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546 ] Neil Avery edited comment on KAFKA-5515 at 7/5/17 11:03 AM: *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. was (Author: neil.avery): *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format*ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. > Consider removing date formatting from Segments class > - > > Key: KAFKA-5515 > URL: https://issues.apache.org/jira/browse/KAFKA-5515 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Neil Avery > Labels: performance > > Currently the {{Segments}} class uses a date when calculating the segment id > and uses {{SimpleDateFormat}} for formatting the segment id. However this is > a high volume code path and creating a new {{SimpleDateFormat}} and > formatting each segment id is expensive. We should look into removing the > date from the segment id or at a minimum use a faster alternative to > {{SimpleDateFormat}}. We should also consider keeping a lookup of existing > segments to avoid as many string operations as possible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class
[ https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546 ] Neil Avery edited comment on KAFKA-5515 at 7/5/17 11:03 AM: *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format*ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. was (Author: neil.avery): I've taken a look at dropping SimpleDateFormat and replacing it with commons-lang3-FastDateFormat (available in project but not a dependency on this module). Microbenchmarking diffs show SDF starts at 800ms/million then hotspots down to 250ms. Interestingly FDF starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. Does a real world calendar matter? - I've knocked together a simple math alternative that break into time-slice where all months/years are equals size. The time formatting is identical but day/month will be incorrect as a result of no calendar. This gets down to 150ms pretty much straight away. (still using SDF is still used for parsing). All tests pass, system runs fine etc - but I'm not sure of the gravity of this as a possible change - will it break things - any advice or feedback? > Consider removing date formatting from Segments class > - > > Key: KAFKA-5515 > URL: https://issues.apache.org/jira/browse/KAFKA-5515 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Neil Avery > Labels: performance > > Currently the {{Segments}} class uses a date when calculating the segment id > and uses {{SimpleDateFormat}} for formatting the segment id. However this is > a high volume code path and creating a new {{SimpleDateFormat}} and > formatting each segment id is expensive. We should look into removing the > date from the segment id or at a minimum use a faster alternative to > {{SimpleDateFormat}}. We should also consider keeping a lookup of existing > segments to avoid as many string operations as possible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class
[ https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546 ] Neil Avery edited comment on KAFKA-5515 at 7/5/17 11:06 AM: *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. I think we need more specifics around the problem itself - are there real-work stats that can identify the problem? In which case switching to unix-time/flooring-to-minute would be preferred. was (Author: neil.avery): *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. > Consider removing date formatting from Segments class > - > > Key: KAFKA-5515 > URL: https://issues.apache.org/jira/browse/KAFKA-5515 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Neil Avery > Labels: performance > > Currently the {{Segments}} class uses a date when calculating the segment id > and uses {{SimpleDateFormat}} for formatting the segment id. However this is > a high volume code path and creating a new {{SimpleDateFormat}} and > formatting each segment id is expensive. We should look into removing the > date from the segment id or at a minimum use a faster alternative to > {{SimpleDateFormat}}. We should also consider keeping a lookup of existing > segments to avoid as many string operations as possible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class
[ https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546 ] Neil Avery edited comment on KAFKA-5515 at 7/5/17 11:18 AM: *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. I think we need more specifics around the problem itself - are there real-work stats that can identify that performance is really a problem? In which case switching to unix-time/flooring-to-minute would be preferred provided the upgrade-path/operational costs work. Current WIP commits can be seen here: https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637 was (Author: neil.avery): *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. I think we need more specifics around the problem itself - are there real-work stats that can identify the problem? In which case switching to unix-time/flooring-to-minute would be preferred. > Consider removing date formatting from Segments class > - > > Key: KAFKA-5515 > URL: https://issues.apache.org/jira/browse/KAFKA-5515 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Neil Avery > Labels: performance > > Currently the {{Segments}} class uses a date when calculating the segment id > and uses {{SimpleDateFormat}} for formatting the segment id. However this is > a high volume code path and creating a new {{SimpleDateFormat}} and > formatting each segment id is expensive. We should look into removing the > date from the segment id or at a minimum use a faster alternative to > {{SimpleDateFormat}}. We should also consider keeping a lookup of existing > segments to avoid as many string operations as possible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class
[ https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546 ] Neil Avery edited comment on KAFKA-5515 at 7/5/17 11:26 AM: *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. _Note: Code can be viewed in the commit log at the bottom_ *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. I think we need more specifics around the problem itself - are there real-work stats that can identify that performance is really a problem? If so switch to unix-time/flooring-to-minute where the upgrade-path/operational costs work. Current WIP commits can be seen here: https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637 was (Author: neil.avery): *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. I think we need more specifics around the problem itself - are there real-work stats that can identify that performance is really a problem? In which case switching to unix-time/flooring-to-minute would be preferred provided the upgrade-path/operational costs work. Current WIP commits can be seen here: https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637 > Consider removing date formatting from Segments class > - > > Key: KAFKA-5515 > URL: https://issues.apache.org/jira/browse/KAFKA-5515 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Neil Avery > Labels: performance > > Currently the {{Segments}} class uses a date when calculating the segment id > and uses {{SimpleDateFormat}} for formatting the segment id. However this is > a high volume code path and creating a new {{SimpleDateFormat}} and > formatting each segment id is expensive. We should look into removing the > date from the segment id or at a minimum use a faster alternative to > {{Simp
[jira] [Assigned] (KAFKA-5255) Auto generate request/response classes
[ https://issues.apache.org/jira/browse/KAFKA-5255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley reassigned KAFKA-5255: -- Assignee: Tom Bentley > Auto generate request/response classes > -- > > Key: KAFKA-5255 > URL: https://issues.apache.org/jira/browse/KAFKA-5255 > Project: Kafka > Issue Type: New Feature >Reporter: Ismael Juma >Assignee: Tom Bentley > Fix For: 0.11.1.0 > > > We should automatically generate the request/response classes from the > protocol definition. This is a major source of boilerplate, development > effort and inconsistency at the moment. If we auto-generate the classes, we > may also be able to avoid the intermediate `Struct` representation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId
[ https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074714#comment-16074714 ] Evgeny Veretennikov commented on KAFKA-5503: [~hachikuji], part of call stack for blocking idempotent producer is: {noformat} Selector.poll(long): L321 NetworkClient.poll(long, long): L433 NetworkClientUtils.sendAndReceive(KafkaClient, ClientRequest, Time): L89 Sender.sendAndAwaitInitProducerIdRequest(): L405 Sender.maybeWaitForProducerId(): L419 Sender.run(long): L204 {noformat} So, selector blocks thread. While we invoke {{initiateClose()}} method, part of call stack is: {noformat} Selector.wakeUp(): 240 NetworkClient.wakeUp(): L498 Sender.wakeup(): L675 Sender.initiateClose(): L390 {noformat} So, that seems, that blocking selector will actually be waken up and thus sender thread won't be blocked right after concurrent {{initiateClose()}} call. So, is this issue still relevant? Do I miss something? > Idempotent producer ignores shutdown while fetching ProducerId > -- > > Key: KAFKA-5503 > URL: https://issues.apache.org/jira/browse/KAFKA-5503 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Affects Versions: 0.11.0.0 >Reporter: Jason Gustafson >Assignee: Evgeny Veretennikov > Fix For: 0.11.0.1 > > > When using the idempotent producer, we initially block the sender thread > while we attempt to get the ProducerId. During this time, a concurrent call > to close() will be ignored. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class
[ https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546 ] Neil Avery edited comment on KAFKA-5515 at 7/5/17 1:04 PM: --- *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. _Note: Code can be viewed in the commit log at the bottom_ *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. I think we need more specifics around the problem itself - are there real-work stats that can identify that performance is really a problem? If so switch to unix-time/flooring-to-minute where the upgrade-path/operational costs work. Current WIP commits can be seen here: https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637 Note: Segment granularity currently defaults to 1Minute and as a result works with the SDFormatter. If better granularity is required i.e. 10s, 20s, 5m etc - then using a unix-long (numeric) derivative it would be possible where you roll the appropriate boundary through a floor configured function. was (Author: neil.avery): *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. _Note: Code can be viewed in the commit log at the bottom_ *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. I think we need more specifics around the problem itself - are there real-work stats that can identify that performance is really a problem? If so switch to unix-time/flooring-to-minute where the upgrade-path/operational costs work. Current WIP commits can be seen here: https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637 > Consider removing date formatting from Segments class > - > > Key: KAFKA-5515 > URL: https://issues.apache.org/jira/browse/KAFKA-5515 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Bill Bejeck >Assignee: Neil Avery > Labels: performance > > Currently the {{Segments}} class uses a date when calculating
[jira] [Comment Edited] (KAFKA-5515) Consider removing date formatting from Segments class
[ https://issues.apache.org/jira/browse/KAFKA-5515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16068546#comment-16068546 ] Neil Avery edited comment on KAFKA-5515 at 7/5/17 1:04 PM: --- *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. _Note: Code can be viewed in the commit log at the bottom_ *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. I think we need more specifics around the problem itself - are there real-work stats that can identify that performance is really a problem? If so switch to unix-time/flooring-to-minute where the upgrade-path/operational costs work. *Current WIP commits can be seen here:* https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637 *Note:* Segment granularity currently defaults to 1Minute and as a result works with the SDFormatter. If better granularity is required i.e. 10s, 20s, 5m etc - then using a unix-long (numeric) derivative it would be possible where you roll the appropriate boundary through a floor configured function. was (Author: neil.avery): *Investigation:* Taking a look at the use shows SimpleDateFormat (SFD) is used for *parsing* Segment file names during initialisation, and *format* ting during runtime. I presume the suggested problem lies in the formatting. *Micro benchmark SDF* Formatting 1,000,000 items takes 250ms once hotspotting has kicked in. Per/M items (ms): [707, 572, 543, 591, 546.0, 545.0, 363.0, 250 etc] Parsing is slow - 2500ms per 1,000,000 items Commons-lang3-FastDateFormat is available in the project but not as a dependency on this particular module. FDF micro-bench starts at 400ms/million then gets down to 350ms (not very convincing). Calendar usage sucks performance and there is a degree of caching inside both of the impls. Looking at this in a different way "Segments" is a time-series slice/bucketing function to group/allocate/lookup segments etc. I've knocked together a simple math alternative that breaks into time-slice where all months/years are equals size i.e. not using a calendar - you get an approximate idea of performance: 150-200ms without hotspotting. The problem is that a real-calendar is still used upon initialisation extract segment-ids - there will be inconsistencies and likely breakage. _Note: Code can be viewed in the commit log at the bottom_ *Best performance* The best alternative would be to ditch calendars for parsing and formatting and to trunc/floor unix time to minutes/hours etc (at the cost a segment-filename readability). I'm not sure if there will be operational upgrade paths etc in order to make the change seamless. I think we need more specifics around the problem itself - are there real-work stats that can identify that performance is really a problem? If so switch to unix-time/flooring-to-minute where the upgrade-path/operational costs work. Current WIP commits can be seen here: https://github.com/bluemonk3y/kafka/commit/60176f037a8c1209f85597a69f1be9d833c6c637 Note: Segment granularity currently defaults to 1Minute and as a result works with the SDFormatter. If better granularity is required i.e. 10s, 20s, 5m etc - then using a unix-long (numeric) derivative it would be possible where you roll the appropriate boundary through a floor configured function. > Consider removing date formatting from Segments class > - > > Key: KAFKA-5515 > URL: https://
[jira] [Commented] (KAFKA-5500) it is impossible to have custom Login Modules for PLAIN SASL mechanism
[ https://issues.apache.org/jira/browse/KAFKA-5500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074720#comment-16074720 ] Rajini Sivaram commented on KAFKA-5500: --- Kafka's implementation of {{PlainLoginModule}} is tightly integrated with its implementation of {{PlainSaslServerProvider}} and {{PlainSaslServer}} (the server provider is loaded by the login module). At the moment, you can replace the whole server-side SASL/PLAIN implementation with your own implementation by replacing the three classes in {{org.apache.kafka.common.security.plain}} with your own implementation. As described in the docs (https://kafka.apache.org/documentation/#security_sasl_plain_production), the implementation in Kafka is provided as a sample and not suitable for use in production, [KIP-86|https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers] should improve customization. > it is impossible to have custom Login Modules for PLAIN SASL mechanism > -- > > Key: KAFKA-5500 > URL: https://issues.apache.org/jira/browse/KAFKA-5500 > Project: Kafka > Issue Type: Wish >Reporter: Anton Patrushev >Priority: Minor > > This change - > > https://github.com/apache/kafka/commit/275c5e1df237808fe72b8d9933f826949d4b5781#diff-3e86ea3ab586f9b6f920c00508a0d5bcR95 > - makes it impossible have login modules other than PlainLoginModule used > for PLAIN SASL mechanism. Could it be changed the way that doesn't use > particular login module class name? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-4249) Document how to customize GC logging options for broker
[ https://issues.apache.org/jira/browse/KAFKA-4249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evgeny Veretennikov reassigned KAFKA-4249: -- Assignee: Tom Bentley > Document how to customize GC logging options for broker > --- > > Key: KAFKA-4249 > URL: https://issues.apache.org/jira/browse/KAFKA-4249 > Project: Kafka > Issue Type: Improvement > Components: documentation >Affects Versions: 0.10.0.1 >Reporter: Jim Hoagland >Assignee: Tom Bentley > > We wanted to enable GC logging for Kafka broker and saw that you can set > GC_LOG_ENABLED=true. However, this didn't do what we wanted. For example, > the GC log will be overwritten every time the broker gets restarted. It > wasn't clear how we could do that (no documentation of it that I can find), > so I did some research by looking at the source code and did some testing and > found that KAFKA_GC_LOG_OPTS could be set with alternate JVM options prior to > starting broker. I posted my solution to StackOverflow: > > http://stackoverflow.com/questions/39854424/how-to-enable-gc-logging-for-apache-kafka-brokers-while-preventing-log-file-ove > (feel free to critique) > That solution is now public, but it seems like the Kafka documentation should > say how to do this. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-3823) QuickStart documentation is still showing MirrorMakers supports more than one consumer.config
[ https://issues.apache.org/jira/browse/KAFKA-3823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evgeny Veretennikov resolved KAFKA-3823. Resolution: Fixed Closed issue, as [~tombentley] suggested. > QuickStart documentation is still showing MirrorMakers supports more than one > consumer.config > - > > Key: KAFKA-3823 > URL: https://issues.apache.org/jira/browse/KAFKA-3823 > Project: Kafka > Issue Type: Bug > Components: website >Affects Versions: 0.9.0.0 >Reporter: Chak Lee >Priority: Minor > > On the official QuickStart documentation, the MirrorMaker section is still > showing the following example: > {code} > bin/kafka-mirror-maker.sh >--consumer.config consumer-1.properties --consumer.config > consumer-2.properties >--producer.config producer.properties --whitelist my-topic > {code} > However, the support for this is already dropped in KAFKA-1650. If you > tried to run the above script, you will get the following error: > {code} > [2016-06-10 18:35:11,201] ERROR Exception when starting mirror maker. > (kafka.tools.MirrorMaker$) > joptsimple.MultipleArgumentsForOptionException: Found multiple arguments for > option consumer.config, but you asked for only one > {code} > Please update the website's QuickStart section for MirrorMakers. Thanks. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4609) KTable/KTable join followed by groupBy and aggregate/count can result in incorrect results
[ https://issues.apache.org/jira/browse/KAFKA-4609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075005#comment-16075005 ] Damian Guy commented on KAFKA-4609: --- This was partially fixed by https://cwiki.apache.org/confluence/display/KAFKA/KIP-114%3A+KTable+state+stores+and+improved+semantics If you use one of the join/leftJoin/outerJoin methods that take either a {{StateStoreSupplier}} or {{queryableName}} as a param then it works. However, for the basic join/leftJoin/outerJoin method it doesn't work. In order to make it work properly we need to add another param to these join methods, {{joinSerde}}, so that we can construct the state store etc. This would require a KIP. However as we are currently discussing DSL changes to remove overloads I'd recommend we hold until we know which direction we are going. > KTable/KTable join followed by groupBy and aggregate/count can result in > incorrect results > -- > > Key: KAFKA-4609 > URL: https://issues.apache.org/jira/browse/KAFKA-4609 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.1.1, 0.10.2.0 >Reporter: Damian Guy >Assignee: Damian Guy > Labels: architecture > > When caching is enabled, KTable/KTable joins can result in duplicate values > being emitted. This will occur if there were updates to the same key in both > tables. Each table is flushed independently, and each table will trigger the > join, so you get two results for the same key. > If we subsequently perform a groupBy and then aggregate operation we will now > process these duplicates resulting in incorrect aggregated values. For > example count will be double the value it should be. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075053#comment-16075053 ] Damian Guy commented on KAFKA-5545: --- As you suggested, it would see that the first KafkaStreams instance hasn't closed successfully as some threads are stuck. You should probably try calling {{boolean KafkaStreams.close(timeout, timeunit)}} and check the return value. If the result is {{false}} then you should probably terminate the application and restart. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-de
[jira] [Commented] (KAFKA-5070) org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18
[ https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075056#comment-16075056 ] Matthias J. Sax commented on KAFKA-5070: I was working on https://issues.apache.org/jira/browse/KAFKA-5167 and hoping that it will cover this JIRA as well. Thoughts? > org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the > state directory: /opt/rocksdb/pulse10/0_18 > > > Key: KAFKA-5070 > URL: https://issues.apache.org/jira/browse/KAFKA-5070 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 > Environment: Linux Version >Reporter: Dhana >Assignee: Matthias J. Sax > Attachments: RocksDB_LockStateDirec.7z > > > Notes: we run two instance of consumer in two difference machines/nodes. > we have 400 partitions. 200 stream threads/consumer, with 2 consumer. > We perform HA test(on rebalance - shutdown of one of the consumer/broker), we > see this happening > Error: > 2017-04-05 11:36:09.352 WARN StreamThread:1184 StreamThread-66 - Could not > create task 0_115. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock > the state directory: /opt/rocksdb/pulse10/0_115 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075061#comment-16075061 ] Yogesh BG commented on KAFKA-5545: -- I see. But what could be the problems in closing the strems. I don't see restarting application a good idea. From log we can see some threads still polling to connect to old ip. We should try closing those threads right. One more thing is if I do close with in connction timeout all goes well. But if I issue close after connection timeout the threads are stuck > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp
[jira] [Commented] (KAFKA-5519) Support for multiple certificates in a single keystore
[ https://issues.apache.org/jira/browse/KAFKA-5519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075164#comment-16075164 ] Alla Tumarkin commented on KAFKA-5519: -- I wouldn't call it problematic: I just imagine there are situations where multiple J2EE applications may want to use a single keystore and import their client certificates into a single keystore - in order to decrease management overhead by not having to maintain multiple keystores (like managing keystore passwords, for example). > Support for multiple certificates in a single keystore > -- > > Key: KAFKA-5519 > URL: https://issues.apache.org/jira/browse/KAFKA-5519 > Project: Kafka > Issue Type: New Feature > Components: security >Affects Versions: 0.10.2.1 >Reporter: Alla Tumarkin > Labels: upstream-issue > > Background > Currently, we need to have a keystore exclusive to the component with exactly > one key in it. Looking at the JSSE Reference guide, it seems like we would > need to introduce our own KeyManager into the SSLContext which selects a > configurable key alias name. > https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509KeyManager.html > has methods for dealing with aliases. > The goal here to use a specific certificate (with proper ACLs set for this > client), and not just the first one that matches. > Looks like it requires a code change to the SSLChannelBuilder -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (KAFKA-5528) Error while reading topic, offset, partition info from process method
[ https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reopened KAFKA-5528: > Error while reading topic, offset, partition info from process method > - > > Key: KAFKA-5528 > URL: https://issues.apache.org/jira/browse/KAFKA-5528 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Nishkam Ravi > > We are encountering an {{IllegalStateException}} while trying to access > {{context.topic()}} from process function. The code is written in Scala and > is being launched using sbt (spring isn't involved). Here's the code sketch: > {noformat} > class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], > decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], > Array[Byte]] with LazyLogging { > private var hsmClient: HSMClient = _ > override def init(processorContext: ProcessorContext): Unit = { > super.init(processorContext) > hsmClient = HSMClient(config).getOrElse(null) > } > override def process(key: Array[Byte], value: Array[Byte]): Unit = { > val topic: String = this.context.topic() > partition: Int = this.context.partition() > val offset: Long = this.context.offset() > val timestamp: Long = this.context.timestamp() > // business logic > } > } > {noformat} > The exception is thrown only for the multi-consumer case (when number of > partitions for a topic > 1 and parallelism > 1). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5528) Error while reading topic, offset, partition info from process method
[ https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-5528. Resolution: Not A Bug > Error while reading topic, offset, partition info from process method > - > > Key: KAFKA-5528 > URL: https://issues.apache.org/jira/browse/KAFKA-5528 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Nishkam Ravi > > We are encountering an {{IllegalStateException}} while trying to access > {{context.topic()}} from process function. The code is written in Scala and > is being launched using sbt (spring isn't involved). Here's the code sketch: > {noformat} > class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], > decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], > Array[Byte]] with LazyLogging { > private var hsmClient: HSMClient = _ > override def init(processorContext: ProcessorContext): Unit = { > super.init(processorContext) > hsmClient = HSMClient(config).getOrElse(null) > } > override def process(key: Array[Byte], value: Array[Byte]): Unit = { > val topic: String = this.context.topic() > partition: Int = this.context.partition() > val offset: Long = this.context.offset() > val timestamp: Long = this.context.timestamp() > // business logic > } > } > {noformat} > The exception is thrown only for the multi-consumer case (when number of > partitions for a topic > 1 and parallelism > 1). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5528) Error while reading topic, offset, partition info from process method
[ https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075191#comment-16075191 ] Matthias J. Sax commented on KAFKA-5528: Added FAQ: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhydoIgetanIllegalStateExceptionwhenaccessingrecordmetadata? > Error while reading topic, offset, partition info from process method > - > > Key: KAFKA-5528 > URL: https://issues.apache.org/jira/browse/KAFKA-5528 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Nishkam Ravi > > We are encountering an {{IllegalStateException}} while trying to access > {{context.topic()}} from process function. The code is written in Scala and > is being launched using sbt (spring isn't involved). Here's the code sketch: > {noformat} > class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], > decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], > Array[Byte]] with LazyLogging { > private var hsmClient: HSMClient = _ > override def init(processorContext: ProcessorContext): Unit = { > super.init(processorContext) > hsmClient = HSMClient(config).getOrElse(null) > } > override def process(key: Array[Byte], value: Array[Byte]): Unit = { > val topic: String = this.context.topic() > partition: Int = this.context.partition() > val offset: Long = this.context.offset() > val timestamp: Long = this.context.timestamp() > // business logic > } > } > {noformat} > The exception is thrown only for the multi-consumer case (when number of > partitions for a topic > 1 and parallelism > 1). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-5525) Streams reset tool should have same console output with or without dry-run
[ https://issues.apache.org/jira/browse/KAFKA-5525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-5525: - Component/s: streams > Streams reset tool should have same console output with or without dry-run > -- > > Key: KAFKA-5525 > URL: https://issues.apache.org/jira/browse/KAFKA-5525 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > Fix For: 0.11.1.0 > > > Hi, > I see that the Streams reset tool provides a console output a little bit > different when you execute it using "dry-run" (so without executing any real > action) or without it. > With dry-run : > {code} > Dry run displays the actions which will be performed when running Streams > Reset Tool > Following input topics offsets will be reset to beginning (for consumer group > streams-wordcount) > Topic: streams-file-input > Done. > Deleting all internal/auto-created topics for application streams-wordcount > Topic: streams-wordcount-Counts-repartition > Topic: streams-wordcount-Counts-changelog > Done. > {code} > without dry-run : > {code} > Seek-to-beginning for input topics [streams-file-input] > Done. > Deleting all internal/auto-created topics for application streams-wordcount > Topic streams-wordcount-Counts-repartition is marked for deletion. > Note: This will have no impact if delete.topic.enable is not set to true. > Topic streams-wordcount-Counts-changelog is marked for deletion. > Note: This will have no impact if delete.topic.enable is not set to true. > Done. > {code} > I think that the version with dry-run related to show "Seek-to-beginning for > input topics [streams-file-input]" could be used even for version without > dry-run. > The output should be consistent and the only difference should be on > executing real actions or not. > I'm working on a trivial PR for a proposal. > Thanks, > Paolo -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5525) Streams reset tool should have same console output with or without dry-run
[ https://issues.apache.org/jira/browse/KAFKA-5525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075510#comment-16075510 ] ASF GitHub Bot commented on KAFKA-5525: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3443 > Streams reset tool should have same console output with or without dry-run > -- > > Key: KAFKA-5525 > URL: https://issues.apache.org/jira/browse/KAFKA-5525 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Minor > Fix For: 0.11.1.0 > > > Hi, > I see that the Streams reset tool provides a console output a little bit > different when you execute it using "dry-run" (so without executing any real > action) or without it. > With dry-run : > {code} > Dry run displays the actions which will be performed when running Streams > Reset Tool > Following input topics offsets will be reset to beginning (for consumer group > streams-wordcount) > Topic: streams-file-input > Done. > Deleting all internal/auto-created topics for application streams-wordcount > Topic: streams-wordcount-Counts-repartition > Topic: streams-wordcount-Counts-changelog > Done. > {code} > without dry-run : > {code} > Seek-to-beginning for input topics [streams-file-input] > Done. > Deleting all internal/auto-created topics for application streams-wordcount > Topic streams-wordcount-Counts-repartition is marked for deletion. > Note: This will have no impact if delete.topic.enable is not set to true. > Topic streams-wordcount-Counts-changelog is marked for deletion. > Note: This will have no impact if delete.topic.enable is not set to true. > Done. > {code} > I think that the version with dry-run related to show "Seek-to-beginning for > input topics [streams-file-input]" could be used even for version without > dry-run. > The output should be consistent and the only difference should be on > executing real actions or not. > I'm working on a trivial PR for a proposal. > Thanks, > Paolo -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5528) Error while reading topic, offset, partition info from process method
[ https://issues.apache.org/jira/browse/KAFKA-5528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075511#comment-16075511 ] Guozhang Wang commented on KAFKA-5528: -- Thanks [~mjsax] for adding the FAQ, I think this is indeed a common situation users may mistakenly configure. > Error while reading topic, offset, partition info from process method > - > > Key: KAFKA-5528 > URL: https://issues.apache.org/jira/browse/KAFKA-5528 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Nishkam Ravi > > We are encountering an {{IllegalStateException}} while trying to access > {{context.topic()}} from process function. The code is written in Scala and > is being launched using sbt (spring isn't involved). Here's the code sketch: > {noformat} > class GenericProcessor[T <: ThriftStruct](serDe: ThriftStructSerDe[T], > decrypt: Boolean, config: Config) extends AbstractProcessor[Array[Byte], > Array[Byte]] with LazyLogging { > private var hsmClient: HSMClient = _ > override def init(processorContext: ProcessorContext): Unit = { > super.init(processorContext) > hsmClient = HSMClient(config).getOrElse(null) > } > override def process(key: Array[Byte], value: Array[Byte]): Unit = { > val topic: String = this.context.topic() > partition: Int = this.context.partition() > val offset: Long = this.context.offset() > val timestamp: Long = this.context.timestamp() > // business logic > } > } > {noformat} > The exception is thrown only for the multi-consumer case (when number of > partitions for a topic > 1 and parallelism > 1). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3331) Refactor TopicCommand to make it testable and add unit tests
[ https://issues.apache.org/jira/browse/KAFKA-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075525#comment-16075525 ] Gwen Shapira commented on KAFKA-3331: - +1, that would be awesome Since it is via a normal API, we can also add stuff like auth, use of ACL, etc > Refactor TopicCommand to make it testable and add unit tests > > > Key: KAFKA-3331 > URL: https://issues.apache.org/jira/browse/KAFKA-3331 > Project: Kafka > Issue Type: Wish >Affects Versions: 0.9.0.1 >Reporter: Ashish Singh >Assignee: Ashish Singh > Fix For: 0.11.1.0 > > > TopicCommand has become a functionality packed, hard to read, class. Adding > or changing it with confidence requires some unit tests around it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075541#comment-16075541 ] Guozhang Wang commented on KAFKA-5545: -- [~yogeshbelur] In your code snippet it seems you did not ever close the instance before creating the new instance and then call {{cleanUp}}, or are the {{close()}} and {{start()}} calls for the previous instance (it is hard to tell how {{setupDiscovery}} is triggered)? {code} close(); streams = new KafkaStreams(buildTopology(config), config); logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString + "]."); streams.cleanUp(); start(); {code} Anyways, if {{Streams.close()}} is indeed called, then the producer will be closed in that function and the inner {{Sender}} thread will be terminated and not try to connect to the broker anymore. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] >
[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075541#comment-16075541 ] Guozhang Wang edited comment on KAFKA-5545 at 7/5/17 10:34 PM: --- [~yogeshbelur] In your code snippet it seems you did not ever close the instance before creating the new instance and then call {{cleanUp}}, or are the {{close()}} and {{start()}} calls for the previous instance (it is hard to tell how {{setupDiscovery}} is triggered)? {code} close(); streams = new KafkaStreams(buildTopology(config), config); logger.info("cleaning up oldBootstrapServerString [" + oldBootstrapServerString + "]."); streams.cleanUp(); start(); {code} Anyways, if {{Streams.close()}} is indeed called, then the producer will be closed in that function and the inner {{Sender}} thread will be terminated and not try to connect to the broker anymore. But from your attached logs it does seems the thread was notified to shutdown but never existed the main loop: {code} 10:02:33.981 [pool-1-thread-1] INFO o.apache.kafka.streams.KafkaStreams - stream-client [ks_0_inst] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.987 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Informed thread to shut down 10:02:33.987 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.988 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] Informed thread to shut down 10:02:33.988 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-2] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.988 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-3] Informed thread to shut down 10:02:33.988 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-3] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.989 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-4] Informed thread to shut down 10:02:33.989 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-4] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.989 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-5] Informed thread to shut down 10:02:33.989 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-5] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.990 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-6] Informed thread to shut down 10:02:33.990 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-6] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.990 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-7] Informed thread to shut down 10:02:33.991 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-7] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.991 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-8] Informed thread to shut down 10:02:33.991 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-8] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.991 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-9] Informed thread to shut down 10:02:33.991 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-9] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.992 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-10] Informed thread to shut down 10:02:33.992 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-10] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.992 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-11] Informed thread to shut down 10:02:33.992 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-11] State transition from RUNNING to PENDING_SHUTDOWN. 10:02:33.995 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-12] Informed thread to shut down 10:02:33.995 [kafka-streams-close-thread] INFO o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-12] State transition from RUNNING to PENDING_SHUTDOWN
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075558#comment-16075558 ] Guozhang Wang commented on KAFKA-5545: -- Which config of the {{connection timeout}} that you are referring to? > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.clients
[jira] [Commented] (KAFKA-4936) Allow dynamic routing of output records
[ https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075624#comment-16075624 ] Ankush Puri commented on KAFKA-4936: [~damianguy] [~ijuma] Any plans of this feature being added to future release of kafka ? > Allow dynamic routing of output records > --- > > Key: KAFKA-4936 > URL: https://issues.apache.org/jira/browse/KAFKA-4936 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax > Labels: needs-kip > > Currently, all used output topics must be know beforehand, and thus, it's not > possible to send output records to topic in a dynamic fashion. > There have been couple of request for this feature and we should consider > adding it. There are many open questions however, with regard to topic > creation and configuration (replication factor, number of partitions) etc. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-4601) Avoid duplicated repartitioning in KStream DSL
[ https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-4601: - Description: Consider the following DSL: {code} Stream source = builder.stream(Serdes.String(), Serdes.String(), "topic1"); Stream mapped = source.map(..); KTable counts = mapped .groupByKey() .count("Counts"); KStream sink = mapped.leftJoin(counts, ..); {code} The resulted topology looks like this: {code} ProcessorTopology: KSTREAM-SOURCE-00: topics: [topic1] children: [KSTREAM-MAP-01] KSTREAM-MAP-01: children: [KSTREAM-FILTER-04, KSTREAM-FILTER-07] KSTREAM-FILTER-04: children: [KSTREAM-SINK-03] KSTREAM-SINK-03: topic: X-Counts-repartition KSTREAM-FILTER-07: children: [KSTREAM-SINK-06] KSTREAM-SINK-06: topic: X-KSTREAM-MAP-01-repartition ProcessorTopology: KSTREAM-SOURCE-08: topics: [X-KSTREAM-MAP-01-repartition] children: [KSTREAM-LEFTJOIN-09] KSTREAM-LEFTJOIN-09: states: [Counts] KSTREAM-SOURCE-05: topics: [X-Counts-repartition] children: [KSTREAM-AGGREGATE-02] KSTREAM-AGGREGATE-02: states: [Counts] {code} I.e. there are two repartition topics, one for the aggregate and one for the join, which not only introduce unnecessary overheads but also mess up the processing ordering (users are expecting each record to go through aggregation first then the join operator). And in order to get the following simpler topology users today need to add a {{through}} operator after {{map}} manually to enforce repartitioning. {code} Stream source = builder.stream(Serdes.String(), Serdes.String(), "topic1"); Stream repartitioned = source.map(..).through("topic2"); KTable counts = repartitioned .groupByKey() .count("Counts"); KStream sink = repartitioned.leftJoin(counts, ..); {code} The resulted topology then will look like this: {code} ProcessorTopology: KSTREAM-SOURCE-00: topics: [topic1] children: [KSTREAM-MAP-01] KSTREAM-MAP-01: children: [KSTREAM-SINK-02] KSTREAM-SINK-02: topic: topic 2 ProcessorTopology: KSTREAM-SOURCE-03: topics: [topic 2] children: [KSTREAM-AGGREGATE-04, KSTREAM-LEFTJOIN-05] KSTREAM-AGGREGATE-04: states: [Counts] KSTREAM-LEFTJOIN-05: states: [Counts] {code} This kind of optimization should be automatic in Streams, which we can consider doing when extending from one-operator-at-a-time translation. was: Consider the following DSL: {code} Stream source = builder.stream(Serdes.String(), Serdes.String(), "topic1").map(..); KTable counts = source .groupByKey() .count("Counts"); KStream sink = source.leftJoin(counts, ..); {code} The resulted topology looks like this: {code} ProcessorTopology: KSTREAM-SOURCE-00: topics: [topic1] children: [KSTREAM-MAP-01] KSTREAM-MAP-01: children: [KSTREAM-FILTER-04, KSTREAM-FILTER-07] KSTREAM-FILTER-04:
[jira] [Commented] (KAFKA-4936) Allow dynamic routing of output records
[ https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075691#comment-16075691 ] Matthias J. Sax commented on KAFKA-4936: No concrete roadmap atm. But if anybody wants to pick it up, just go for it :) > Allow dynamic routing of output records > --- > > Key: KAFKA-4936 > URL: https://issues.apache.org/jira/browse/KAFKA-4936 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax > Labels: needs-kip > > Currently, all used output topics must be know beforehand, and thus, it's not > possible to send output records to topic in a dynamic fashion. > There have been couple of request for this feature and we should consider > adding it. There are many open questions however, with regard to topic > creation and configuration (replication factor, number of partitions) etc. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-5559) Metrics should throw if two client registers with same ID
Matthias J. Sax created KAFKA-5559: -- Summary: Metrics should throw if two client registers with same ID Key: KAFKA-5559 URL: https://issues.apache.org/jira/browse/KAFKA-5559 Project: Kafka Issue Type: Bug Components: metrics Affects Versions: 0.11.0.0 Reporter: Matthias J. Sax Assignee: Matthias J. Sax Currently, {{AppInfoParser}} only logs a WARN message when a bean is registered with an existing name. However, this should be treated as an error and the exception should be rthrown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5559) Metrics should throw if two client registers with same ID
[ https://issues.apache.org/jira/browse/KAFKA-5559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075784#comment-16075784 ] TAO XIAO commented on KAFKA-5559: - Does this PR suggest we need to use different {{client.id}} with multi-threading in the same JVM? As per my understanding (up to 0.10.0.0) {{client.id}} is the one that is used to identify an application when enabling quota. If this is case how do we have a model that have multiple consumers in the same JVM that share the same quota? Also there are many streaming frameworks, e.g. Storm, Flink, bootstrapping multiple consumers in the same JVM when working with local mode (especially useful for development and debugging purpose), how do we cope with this situation as well? > Metrics should throw if two client registers with same ID > - > > Key: KAFKA-5559 > URL: https://issues.apache.org/jira/browse/KAFKA-5559 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 0.11.0.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax > > Currently, {{AppInfoParser}} only logs a WARN message when a bean is > registered with an existing name. However, this should be treated as an error > and the exception should be rthrown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5167) streams task gets stuck after re-balance due to LockException
[ https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075856#comment-16075856 ] ASF GitHub Bot commented on KAFKA-5167: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3449 > streams task gets stuck after re-balance due to LockException > - > > Key: KAFKA-5167 > URL: https://issues.apache.org/jira/browse/KAFKA-5167 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0 >Reporter: Narendra Kumar >Assignee: Matthias J. Sax > Fix For: 0.11.0.1, 0.11.1.0 > > Attachments: BugTest.java, DebugTransformer.java, logs.txt > > > During rebalance processor node's close() method gets called two times once > from StreamThread.suspendTasksAndState() and once from > StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed > which I am closing in processor's close method. This instance's close method > throws some exception if I call close more than once. Because of this > exception, the Kafka streams does not attempt to close the statemanager ie. > task.closeStateManager(true) is never called. When a task moves from one > thread to another within same machine the task blocks trying to get lock on > state directory which is still held by unclosed statemanager and keep > throwing the below warning message: > 2017-04-30 12:34:17 WARN StreamThread:1214 - Could not create task 0_1. Will > retry. > org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the > state directory for task 0_1 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5070) org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the state directory: /opt/rocksdb/pulse10/0_18
[ https://issues.apache.org/jira/browse/KAFKA-5070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075940#comment-16075940 ] Matthias J. Sax commented on KAFKA-5070: [~dnagarajan], [~shoyebpathan], [~kchen] I just review an old comment from above: https://issues.apache.org/jira/browse/KAFKA-5070?focusedCommentId=16002228&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16002228 This issue might also occur, if Streams cleans local state directory -- we got a similar report at the mailing list recently. Increasing the clean-up interval via {{StreamsConfig}} parameter {{state.cleanup.delay.ms}} (default is 10 minutes, ie, 60) helped to avoid the issue. Can you try this out and report back? We also just merged KAFKA-5167 into {{trunk}} and {{0.11.0}}. It might also be worth to try this out to see if it resolves the issue (in case the clean-up thing does not work). > org.apache.kafka.streams.errors.LockException: task [0_18] Failed to lock the > state directory: /opt/rocksdb/pulse10/0_18 > > > Key: KAFKA-5070 > URL: https://issues.apache.org/jira/browse/KAFKA-5070 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 > Environment: Linux Version >Reporter: Dhana >Assignee: Matthias J. Sax > Attachments: RocksDB_LockStateDirec.7z > > > Notes: we run two instance of consumer in two difference machines/nodes. > we have 400 partitions. 200 stream threads/consumer, with 2 consumer. > We perform HA test(on rebalance - shutdown of one of the consumer/broker), we > see this happening > Error: > 2017-04-05 11:36:09.352 WARN StreamThread:1184 StreamThread-66 - Could not > create task 0_115. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_115] Failed to lock > the state directory: /opt/rocksdb/pulse10/0_115 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:102) > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5397) streams are not recovering from LockException during rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-5397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075943#comment-16075943 ] Matthias J. Sax commented on KAFKA-5397: [~jozi-k] KAFKA-5167 got merged into {{trunk}} and {{0.11.0}} today. Can you try this out and see if it fixes this issue? > streams are not recovering from LockException during rebalancing > > > Key: KAFKA-5397 > URL: https://issues.apache.org/jira/browse/KAFKA-5397 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 0.11.0.0 > Environment: one node setup, confluent kafka broker v3.2.0, > kafka-clients 0.11.0.0-SNAPSHOT, 5 threads for kafka-streams >Reporter: Jozef Koval > > Probably continuation of #KAFKA-5167. Portions of log: > {code} > 2017-06-07 01:17:52,435 WARN > [73e81b0b-5801-40ab-b02d-079afede6cc6-StreamThread-5] StreamTask >- task [2_0] Failed offset commits > {browser-aggregation-KSTREAM-MAP-39-repartition-0=OffsetAndMetadata{offset=4725597, > metadata=''}, > browser-aggregation-KSTREAM-MAP-52-repartition-0=OffsetAndMetadata{offset=4968164, > metadata=''}, > browser-aggregation-KSTREAM-MAP-26-repartition-0=OffsetAndMetadata{offset=2490506, > metadata=''}, > browser-aggregation-KSTREAM-MAP-65-repartition-0=OffsetAndMetadata{offset=7457795, > metadata=''}, > browser-aggregation-KSTREAM-MAP-13-repartition-0=OffsetAndMetadata{offset=530888, > metadata=''}} due to Commit cannot be completed since the group has already > rebalanced and assigned the partitions to another member. This means that the > time between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time message processing. You can address this either by increasing > the session timeout or by reducing the maximum size of batches returned in > poll() with max.poll.records. > 2017-06-07 01:17:52,436 WARN > [73e81b0b-5801-40ab-b02d-079afede6cc6-StreamThread-2] StreamTask >- task [7_0] Failed offset commits > {browser-aggregation-Aggregate-Counts-repartition-0=OffsetAndMetadata{offset=13275085, > metadata=''}} due to Commit cannot be completed since the group has already > rebalanced and assigned the partitions to another member. This means that the > time between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time message processing. You can address this either by increasing > the session timeout or by reducing the maximum size of batches returned in > poll() with max.poll.records. > 2017-06-07 01:17:52,488 WARN > [73e81b0b-5801-40ab-b02d-079afede6cc6-StreamThread-2] StreamThread >- stream-thread [73e81b0b-5801-40ab-b02d-079afede6cc6-StreamThread-2] > Failed to commit StreamTask 7_0 state: > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. This means that the time between subsequent calls to > poll() was longer than the configured max.poll.interval.ms, which typically > implies that the poll loop is spending too much time message processing. You > can address this either by increasing the session timeout or by reducing the > maximum size of batches returned in poll() with max.poll.records. > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:792) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:738) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:798) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:778) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(C
[jira] [Commented] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075945#comment-16075945 ] Yogesh BG commented on KAFKA-5545: -- Hey setupDiscovery is scheduled thread, having logic to check the ip's of broker has changed or not and then u can see the code i am calling close(), which internally calls stream.close(); You can also see the logs that the close has been triggered. If not called how shutdowns will be initiated? <> _ But from your attached logs it does seems the thread was notified to shutdown but never existed the main loop:_ You should check why shutdown didn't happen. why there are some threads still alive which were part of the previous stream instance once the close has been invoked??? Is there any way i can shutdown the stream completely without restarting the app. BTW restarting application is having its own problem, when i do restart with new broker ip threads are hung, never coming back to process the data. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.
[jira] [Comment Edited] (KAFKA-5545) Kafka Stream not able to successfully restart over new broker ip
[ https://issues.apache.org/jira/browse/KAFKA-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075945#comment-16075945 ] Yogesh BG edited comment on KAFKA-5545 at 7/6/17 5:16 AM: -- Hey setupDiscovery is scheduled thread, having logic to check the ip's of broker has changed or not and then u can see the code i am calling close(), which internally calls stream.close(); You can also see the logs that the close has been triggered. If not called how shutdowns will be initiated? <> _ But from your attached logs it does seems the thread was notified to shutdown but never existed the main loop:_ You should check why shutdown didn't happen. why there are some threads still alive which were part of the previous stream instance once the close has been invoked??? Is there any way i can shutdown the stream completely without restarting the app. BTW restarting application is having its own problem, when i do restart with new broker ip threads are hung, never coming back to process the data. was (Author: yogeshbelur): Hey setupDiscovery is scheduled thread, having logic to check the ip's of broker has changed or not and then u can see the code i am calling close(), which internally calls stream.close(); You can also see the logs that the close has been triggered. If not called how shutdowns will be initiated? <> _ But from your attached logs it does seems the thread was notified to shutdown but never existed the main loop:_ You should check why shutdown didn't happen. why there are some threads still alive which were part of the previous stream instance once the close has been invoked??? Is there any way i can shutdown the stream completely without restarting the app. BTW restarting application is having its own problem, when i do restart with new broker ip threads are hung, never coming back to process the data. > Kafka Stream not able to successfully restart over new broker ip > > > Key: KAFKA-5545 > URL: https://issues.apache.org/jira/browse/KAFKA-5545 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Yogesh BG >Priority: Critical > Attachments: kafkastreams.log > > > Hi > I have one kafka broker and one kafka stream application > initially kafka stream connected and starts processing data. Then i restart > the broker. When broker restarts new ip will be assigned. > In kafka stream i have a 5min interval thread which checks if broker ip > changed and if changed, we cleanup the stream, rebuild topology(tried with > reusing topology) and start the stream again. I end up with the following > exceptions. > 11:04:08.032 [StreamThread-38] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-38] Creating active task 0_5 with assigned > partitions [PR-5] > 11:04:08.033 [StreamThread-41] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-41] Creating active task 0_1 with assigned > partitions [PR-1] > 11:04:08.036 [StreamThread-34] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-34] Creating active task 0_7 with assigned > partitions [PR-7] > 11:04:08.036 [StreamThread-37] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-37] Creating active task 0_3 with assigned > partitions [PR-3] > 11:04:08.036 [StreamThread-45] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-45] Creating active task 0_0 with assigned > partitions [PR-0] > 11:04:08.037 [StreamThread-36] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-36] Creating active task 0_4 with assigned > partitions [PR-4] > 11:04:08.037 [StreamThread-43] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-43] Creating active task 0_6 with assigned > partitions [PR-6] > 11:04:08.038 [StreamThread-48] INFO o.a.k.s.p.internals.StreamThread - > stream-thread [StreamThread-48] Creating active task 0_2 with assigned > partitions [PR-2] > 11:04:09.034 [StreamThread-38] WARN o.a.k.s.p.internals.StreamThread - Could > not create task 0_5. Will retry. > org.apache.kafka.streams.errors.LockException: task [0_5] Failed to lock the > state directory for task 0_5 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na] > at > org.apache.kafka.streams.processor.internals.
[jira] [Commented] (KAFKA-5557) Using a logPrefix inside the StreamPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-5557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075952#comment-16075952 ] ASF GitHub Bot commented on KAFKA-5557: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/3488 > Using a logPrefix inside the StreamPartitionAssignor > > > Key: KAFKA-5557 > URL: https://issues.apache.org/jira/browse/KAFKA-5557 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Paolo Patierno >Assignee: Paolo Patierno >Priority: Trivial > Fix For: 0.11.1.0 > > > Hi, > the "stream-thread [%s]" is replicated more times in all the logging messages > inside the StreamPartitionAssignor. Using a logPrefix like for the > StreamThread class could be better. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-4827) Kafka connect: error with special characters in connector name
[ https://issues.apache.org/jira/browse/KAFKA-4827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076052#comment-16076052 ] Sönke Liebau commented on KAFKA-4827: - There is a few more special characters that cause similar behavior: %, ?, <, >, /, \, ... I have blacklisted the obvious ones in the fix for KAFKA-4930 but think that a larger discussion around naming conventions and what is and isn't allowed is probably necessary. > Kafka connect: error with special characters in connector name > -- > > Key: KAFKA-4827 > URL: https://issues.apache.org/jira/browse/KAFKA-4827 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.1.0 >Reporter: Aymeric Bouvet >Priority: Minor > > When creating a connector, if the connector name (and possibly other > properties) end with a carriage return, kafka-connect will create the config > but report error > {code} > cat << EOF > file-connector.json > { > "name": "file-connector\r", > "config": { > "topic": "kafka-connect-logs\r", > "tasks.max": "1", > "file": "/var/log/ansible-confluent/connect.log", > "connector.class": > "org.apache.kafka.connect.file.FileStreamSourceConnector" > } > } > EOF > curl -X POST -H "Content-Type: application/json" -H "Accept: > application/json" -d @file-connector.json localhost:8083/connectors > {code} > returns an error 500 and log the following > {code} > [2017-03-01 18:25:23,895] WARN (org.eclipse.jetty.servlet.ServletHandler) > javax.servlet.ServletException: java.lang.IllegalArgumentException: Illegal > character in path at index 27: /connectors/file-connector4 > at > org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489) > at > org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:388) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:341) > at > org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:228) > at > org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:812) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587) > at > org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) > at > org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) > at > org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at > org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) > at org.eclipse.jetty.server.Server.handle(Server.java:499) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311) > at > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257) > at > org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalArgumentException: Illegal character in path at > index 27: /connectors/file-connector4 > at java.net.URI.create(URI.java:852) > at > org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource.createConnector(ConnectorsResource.java:100) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81) > at > org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144) > at >