[jira] [Commented] (KAFKA-8106) Reducing the allocation and copying of ByteBuffer when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835389#comment-16835389 ] ASF GitHub Bot commented on KAFKA-8106: --- Flowermin commented on pull request #6699: KAFKA-8106:Reducing the allocation and copying of ByteBuffer when logValidator do validation(target trunk). URL: https://github.com/apache/kafka/pull/6699 We suggest that reducing the allocation and copying of ByteBuffer when logValidator do validation when magic value to use is above 1 and no format conversion or value overwriting is required for compressed messages.And improved code is as follows. 1. Adding a class **SimplifiedDefaultRecord** implement class Record which define various attributes of a message. 2. Adding Function **simplifiedreadFrom**() at class **DefaultRecord** .This function will not read data from DataInput to ByteBuffer which need newly creating .**This will reduce the allocation and copying of ByteBuffer** when logValidator do validation .This will reduces GC frequency. We offer a simple read function to read data from **DataInput** whithout create ByteBuffer.Of course this opertion can not avoid deconmpression to data. 3. Adding Function **simplifiedIterator**() and **simplifiedCompressedIterator**() at class **DefaultRecordBatch**.This two functions will return iterator of instance belongs to class **SimplifiedDefaultRecord**. 4. Modify code of function **validateMessagesAndAssignOffsetsCompressed**() at class LogValidator. **After modifing code wich reducing the allocation and copying of ByteBuffer**, the test performance is greatly improved, and the CPU's stable usage is below 60%. The following is a comparison of different code test performance under the same conditions. **Result of performance testing** Main config of Kafka: Single Message:1024B;TopicPartitions:200;linger.ms:1000ms, **1.Before modified code(Source code):** Network inflow rate:600M/s;CPU(%)(97%);production:25,000,000 messages/s **2.After modified code(remove allocation of ByteBuffer):** Network inflow rate:1G/s;CPU(%)(<60%);production:41,000,000 messages/s **1.Before modified code(Source code) GC:**  **2.After modified code(remove allocation of ByteBuffer) GC:**  This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reducing the allocation and copying of ByteBuffer when logValidator do > validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .After we checked and completed the performance test again, we located the
[jira] [Commented] (KAFKA-8317) ClassCastException using KTable.suppress()
[ https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835390#comment-16835390 ] Andrew commented on KAFKA-8317: --- Ok thanks for the reply. I will try this later. What surprises me though, is why it would should work fine when I don't use suppress. > ClassCastException using KTable.suppress() > -- > > Key: KAFKA-8317 > URL: https://issues.apache.org/jira/browse/KAFKA-8317 > Project: Kafka > Issue Type: Bug >Reporter: Andrew >Priority: Major > > I am trying to use `KTable.suppress()` and I am getting the following error : > {Code} > java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed > cannot be cast to java.lang.String > at > org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) > {Code} > My code is as follows : > {Code} > final KTable, GenericRecord> groupTable = > groupedStream > .aggregate(lastAggregator, lastAggregator, materialized); > final KTable, GenericRecord> suppressedTable = > groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())); > // write the change-log stream to the topic > suppressedTable.toStream((k, v) -> k.key()) > .mapValues(joinValueMapper::apply) > .to(props.joinTopic()); > {Code} > The code without using `suppressedTable` works... what am i doing wrong. > Someone else has encountered the same issue : > https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380 > Slack conversation : > https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8317) ClassCastException using KTable.suppress()
[ https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835391#comment-16835391 ] Andrew commented on KAFKA-8317: --- Ok thanks for the reply. I will try this later. What surprises me though, is why it would works fine when I don't use suppress. > ClassCastException using KTable.suppress() > -- > > Key: KAFKA-8317 > URL: https://issues.apache.org/jira/browse/KAFKA-8317 > Project: Kafka > Issue Type: Bug >Reporter: Andrew >Priority: Major > > I am trying to use `KTable.suppress()` and I am getting the following error : > {Code} > java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed > cannot be cast to java.lang.String > at > org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87) > at > org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117) > {Code} > My code is as follows : > {Code} > final KTable, GenericRecord> groupTable = > groupedStream > .aggregate(lastAggregator, lastAggregator, materialized); > final KTable, GenericRecord> suppressedTable = > groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())); > // write the change-log stream to the topic > suppressedTable.toStream((k, v) -> k.key()) > .mapValues(joinValueMapper::apply) > .to(props.joinTopic()); > {Code} > The code without using `suppressedTable` works... what am i doing wrong. > Someone else has encountered the same issue : > https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380 > Slack conversation : > https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
Boquan Tang created KAFKA-8335: -- Summary: Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets Key: KAFKA-8335 URL: https://issues.apache.org/jira/browse/KAFKA-8335 Project: Kafka Issue Type: Bug Affects Versions: 2.2.0 Reporter: Boquan Tang My Colleague Weichu already sent out a mail to kafka user mailing list regarding this issue, but we think it's worth having a ticket tracking it. We are using Kafka Streams with exactly-once enabled on a Kafka cluster for a while. Recently we found that the size of __consumer_offsets partitions grew huge. Some partition went over 30G. This caused Kafka to take quite long to load "__consumer_offsets" topic on startup (it loads the topic in order to become group coordinator). We dumped the __consumer_offsets segments and found that while normal offset commits are nicely compacted, transaction records (COMMIT, etc) are all preserved. Looks like that since these messages don't have a key, the LogCleaner is keeping them all: -- $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /003484332061.log --key-decoder-class kafka.serializer.StringDecoder 2>/dev/null | cat -v | head Dumping 003484332061.log Starting offset: 3484332061 offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006 producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 81 offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005 producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 84 ... -- Streams is doing transaction commits per 100ms (commit.interval.ms=100 when exactly-once) so the __consumer_offsets is growing really fast. Is this (to keep all transactions) by design, or is that a bug for LogCleaner? What would be the way to clean up the topic? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7817) Multiple Consumer Group Management with Regex
[ https://issues.apache.org/jira/browse/KAFKA-7817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835410#comment-16835410 ] ASF GitHub Bot commented on KAFKA-7817: --- rootex- commented on pull request #6700: KAFKA-7817 ConsumerGroupCommand Regex Feature URL: https://github.com/apache/kafka/pull/6700 *KAFKA-7817 ConsumerGroupCommand Regex Feature Description: *Add ability to select a subset of consumer groups using regex for operations: --describe, --delete and --reset-offsets* JIRA: [Multiple Consumer Group Management with Regex](https://issues.apache.org/jira/browse/KAFKA-7817) Discussion 1. [Multiple Consumer Group Management](https://www.mail-archive.com/dev@kafka.apache.org/msg93781.html) Discussion 2. [Re: ConsumerGroupCommand tool improvement?](https://www.mail-archive.com/dev@kafka.apache.org/msg90561.html) *Unit tests implemented* ### Committer Checklist (excluded from commit message) - [v] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Multiple Consumer Group Management with Regex > - > > Key: KAFKA-7817 > URL: https://issues.apache.org/jira/browse/KAFKA-7817 > Project: Kafka > Issue Type: New Feature > Components: tools >Affects Versions: 2.1.0 >Reporter: Alex Dunayevsky >Assignee: Alex Dunayevsky >Priority: Minor > > //TODO: > New feature: Provide ConsumerGroupCommand with ability to query/manage > multiple consumer groups using a single regex pattern. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8336) Enable dynamic update of client-side SSL factory in brokers
Rajini Sivaram created KAFKA-8336: - Summary: Enable dynamic update of client-side SSL factory in brokers Key: KAFKA-8336 URL: https://issues.apache.org/jira/browse/KAFKA-8336 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 2.2.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.3.0 We currently support dynamic update of server-side keystores. This allows expired certs to be updated on brokers without a rolling restart. When mutual authentication is enabled for inter-broker-communication (ssl.client.auth=required), we dont currently dynamically update client-side keystores for controller or transaction coordinator. So a broker restart (or controller change) is required for cert update for this case. Since short-lived SSL cert is a common usecase, we should enable client-side cert updates for all client connections initiated by the broker to ensure that SSL certificate expiry can be handled with dynamic config updates on brokers for all configurations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-1774) REPL and Shell Client for Admin Message RQ/RP
[ https://issues.apache.org/jira/browse/KAFKA-1774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835466#comment-16835466 ] Viktor Somogyi-Vass commented on KAFKA-1774: The POC is here: https://github.com/viktorsomogyi/kafka/tree/kafka_shell > REPL and Shell Client for Admin Message RQ/RP > - > > Key: KAFKA-1774 > URL: https://issues.apache.org/jira/browse/KAFKA-1774 > Project: Kafka > Issue Type: Sub-task >Reporter: Joe Stein >Assignee: Viktor Somogyi-Vass >Priority: Major > > We should have a REPL we can work in and execute the commands with the > arguments. With this we can do: > ./kafka.sh --shell > kafka>attach cluster -b localhost:9092; > kafka>describe topic sampleTopicNameForExample; > the command line version can work like it does now so folks don't have to > re-write all of their tooling. > kafka.sh --topics --everything the same like kafka-topics.sh is > kafka.sh --reassign --everything the same like kafka-reassign-partitions.sh > is -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8106) Reducing the allocation and copying of ByteBuffer when logValidator do validation.
[ https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835563#comment-16835563 ] ASF GitHub Bot commented on KAFKA-8106: --- ijuma commented on pull request #6476: KAFKA-8106:Reducing the allocation and copying of ByteBuffer when logValidator do validation. URL: https://github.com/apache/kafka/pull/6476 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Reducing the allocation and copying of ByteBuffer when logValidator do > validation. > > > Key: KAFKA-8106 > URL: https://issues.apache.org/jira/browse/KAFKA-8106 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1 > Environment: Server : > cpu:2*16 ; > MemTotal : 256G; > Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network > Connection ; > SSD. >Reporter: Flower.min >Assignee: Flower.min >Priority: Major > Labels: performance > > We do performance testing about Kafka in specific scenarios as > described below .We build a kafka cluster with one broker,and create topics > with different number of partitions.Then we start lots of producer processes > to send large amounts of messages to one of the topics at one testing . > *_Specific Scenario_* > > *_1.Main config of Kafka_* > # Main config of Kafka > server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/] > # Number of TopicPartition : 50~2000 > # Size of Single Message : 1024B > > *_2.Config of KafkaProducer_* > ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory|| > |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB| > *_3.The best result of performance testing_* > ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of > production|| > |550MB/s~610MB/s|97%~99%|550MB/s~610MB/s |23,000,000 messages/s| > *_4.Phenomenon and my doubt_* > _The upper limit of CPU usage has been reached But it does not > reach the upper limit of the bandwidth of the server network. *We are > doubtful about which cost too much CPU time and we want to Improve > performance and reduces CPU usage of Kafka server.*_ > _*5.Analysis*_ > We analysis the JFIR of Kafka server when doing performance testing > .After we checked and completed the performance test again, we located the > code "*ByteBuffer recordBuffer = > ByteBuffer.allocate(sizeOfBodyInBytes);*(*Class:DefaultRecord,Function:readFrom()*)” > which consumed CPU resources and caused a lot of GC .Our modified code > reduces the allocation and copying of ByteBuffer, so the test performance is > greatly improved, and the CPU's stable usage is *below 60%*. The following is > a comparison of different code test performance under the same conditions. > *Result of performance testing* > *Main config of Kafka: Single > Message:1024B;TopicPartitions:200;linger.ms:1000ms.* > | Single Message : 1024B,|Network inflow rate|CPU(%)|Messages/s| > |Source code|600M/s|97%|25,000,000| > |Modified code|1GB/s|<60%|41,660,000| > **1.Before modified code(Source code) GC:** >  > **2.After modified code(remove allocation of ByteBuffer) GC:** >  -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8195) Unstable Producer After Send Failure in Transaction
[ https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835601#comment-16835601 ] Viktor Somogyi-Vass commented on KAFKA-8195: Tried to reproduce the second flavor with your code and from the debug level logs it seems like the first transaction with won't be finished and starting the attempts of the second producer would be interpreted as concurrent transactions: {format} [2019-05-08 15:19:16,293] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-3, correlationId=1725) -- {transactional_id=txd-,transaction_timeout_ms=6},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1} from connection 172.30.64.130:9091-172.30.64.130:58192-14;totalTime:0.241,requestQueueTime:0.06,localTime:0.085,remoteTime:0.0,throttleTime:0.035,responseQueueTime:0.052,sendTime:0.056,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) {format} This causes the client to reeenqueue the InitProducerId request until it times out. I guess the best would be to detect this situation in the brokers and remove that glitched "transaction" but I need to look into this more. > Unstable Producer After Send Failure in Transaction > --- > > Key: KAFKA-8195 > URL: https://issues.apache.org/jira/browse/KAFKA-8195 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.1, 2.2.0 >Reporter: Gary Russell >Priority: Blocker > > This journey started with [this Stack Overflow question | > https://stackoverflow.com/questions/55510898]. > I easily reproduced his issue (see my comments on his question). > My first step was to take Spring out of the picture and replicate the issue > with the native {{Producer}} apis. The following code shows the result; I > have attached logs and stack traces. > There are four methods in the test; the first performs 2 transactions, > successfully, on the same {{Producer}} instance. > The second aborts 2 transactions, successfully, on the same {{Producer}} > instance - application level failures after performing a send. > There are two flavors of the problem: > The third attempts to send 2 messages, on the same {{Producer}} that are too > large for the topic; the first aborts as expected; the second send hangs in > {{abortTransaction}} after getting a {{TimeoutException}} when attempting to > {{get}} the send metadata. See log {{hang.same.producer.log}} - it also > includes the stack trace showing the hang. > The fourth test is similar to the third but it closes the producer after the > first failure; this time, we timeout in {{initTransactions()}}. > Subsequent executions of this test get the timeout on the first attempt - > that {{transactional.id}} seems to be unusable. Removing the logs was one way > I found to get past the problem. > Test code > {code:java} > public ApplicationRunner runner(AdminClient client, > DefaultKafkaProducerFactory pf) { > return args -> { > try { > Map configs = new > HashMap<>(pf.getConfigurationProperties()); > int committed = testGoodTx(client, configs); > System.out.println("Successes (same producer) > committed: " + committed); > int rolledBack = testAppFailureTx(client, > configs); > System.out.println("App failures (same > producer) rolled back: " + rolledBack); > > // first flavor - hung thread in > abortTransaction() > //rolledBack = > testSendFailureTxSameProducer(client, configs); > //System.out.println("Send failures (same > producer) rolled back: " + rolledBack); > > // second flavor - timeout in initTransactions() > rolledBack = > testSendFailureTxNewProducer(client, configs); > System.out.println("Send failures (new > producer) rolled back: " + rolledBack); > } > catch (Exception e) { > e.printStackTrace(); > } > }; > } > private int testGoodTx(AdminClient client, Map configs) > throws ExecutionException { > int commits = 0; > NewTopic topic = TopicBuilder.name("so55510898a") > .partitions(1) > .replicas(1) > .build(); > createTopic(client, topic); >
[jira] [Assigned] (KAFKA-8195) Unstable Producer After Send Failure in Transaction
[ https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass reassigned KAFKA-8195: -- Assignee: Viktor Somogyi-Vass > Unstable Producer After Send Failure in Transaction > --- > > Key: KAFKA-8195 > URL: https://issues.apache.org/jira/browse/KAFKA-8195 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.1, 2.2.0 >Reporter: Gary Russell >Assignee: Viktor Somogyi-Vass >Priority: Blocker > > This journey started with [this Stack Overflow question | > https://stackoverflow.com/questions/55510898]. > I easily reproduced his issue (see my comments on his question). > My first step was to take Spring out of the picture and replicate the issue > with the native {{Producer}} apis. The following code shows the result; I > have attached logs and stack traces. > There are four methods in the test; the first performs 2 transactions, > successfully, on the same {{Producer}} instance. > The second aborts 2 transactions, successfully, on the same {{Producer}} > instance - application level failures after performing a send. > There are two flavors of the problem: > The third attempts to send 2 messages, on the same {{Producer}} that are too > large for the topic; the first aborts as expected; the second send hangs in > {{abortTransaction}} after getting a {{TimeoutException}} when attempting to > {{get}} the send metadata. See log {{hang.same.producer.log}} - it also > includes the stack trace showing the hang. > The fourth test is similar to the third but it closes the producer after the > first failure; this time, we timeout in {{initTransactions()}}. > Subsequent executions of this test get the timeout on the first attempt - > that {{transactional.id}} seems to be unusable. Removing the logs was one way > I found to get past the problem. > Test code > {code:java} > public ApplicationRunner runner(AdminClient client, > DefaultKafkaProducerFactory pf) { > return args -> { > try { > Map configs = new > HashMap<>(pf.getConfigurationProperties()); > int committed = testGoodTx(client, configs); > System.out.println("Successes (same producer) > committed: " + committed); > int rolledBack = testAppFailureTx(client, > configs); > System.out.println("App failures (same > producer) rolled back: " + rolledBack); > > // first flavor - hung thread in > abortTransaction() > //rolledBack = > testSendFailureTxSameProducer(client, configs); > //System.out.println("Send failures (same > producer) rolled back: " + rolledBack); > > // second flavor - timeout in initTransactions() > rolledBack = > testSendFailureTxNewProducer(client, configs); > System.out.println("Send failures (new > producer) rolled back: " + rolledBack); > } > catch (Exception e) { > e.printStackTrace(); > } > }; > } > private int testGoodTx(AdminClient client, Map configs) > throws ExecutionException { > int commits = 0; > NewTopic topic = TopicBuilder.name("so55510898a") > .partitions(1) > .replicas(1) > .build(); > createTopic(client, topic); > configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txa-"); > Producer producer = new > KafkaProducer<>(configs); > try { > producer.initTransactions(); > for (int i = 0; i < 2; i++) { > producer.beginTransaction(); > RecordMetadata recordMetadata = producer.send( > new > ProducerRecord<>("so55510898a", "foo")).get(10, > TimeUnit.SECONDS); > System.out.println(recordMetadata); > producer.commitTransaction(); > commits++; > } > } > catch (ProducerFencedException | OutOfOrderSequenceException | > AuthorizationException e) { > // We can't recover from these exceptions, so our only > option is to close the producer and exit. >
[jira] [Updated] (KAFKA-8195) Unstable Producer After Send Failure in Transaction
[ https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass updated KAFKA-8195: --- Affects Version/s: 2.3.0 > Unstable Producer After Send Failure in Transaction > --- > > Key: KAFKA-8195 > URL: https://issues.apache.org/jira/browse/KAFKA-8195 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.1, 2.2.0, 2.3.0 >Reporter: Gary Russell >Assignee: Viktor Somogyi-Vass >Priority: Blocker > > This journey started with [this Stack Overflow question | > https://stackoverflow.com/questions/55510898]. > I easily reproduced his issue (see my comments on his question). > My first step was to take Spring out of the picture and replicate the issue > with the native {{Producer}} apis. The following code shows the result; I > have attached logs and stack traces. > There are four methods in the test; the first performs 2 transactions, > successfully, on the same {{Producer}} instance. > The second aborts 2 transactions, successfully, on the same {{Producer}} > instance - application level failures after performing a send. > There are two flavors of the problem: > The third attempts to send 2 messages, on the same {{Producer}} that are too > large for the topic; the first aborts as expected; the second send hangs in > {{abortTransaction}} after getting a {{TimeoutException}} when attempting to > {{get}} the send metadata. See log {{hang.same.producer.log}} - it also > includes the stack trace showing the hang. > The fourth test is similar to the third but it closes the producer after the > first failure; this time, we timeout in {{initTransactions()}}. > Subsequent executions of this test get the timeout on the first attempt - > that {{transactional.id}} seems to be unusable. Removing the logs was one way > I found to get past the problem. > Test code > {code:java} > public ApplicationRunner runner(AdminClient client, > DefaultKafkaProducerFactory pf) { > return args -> { > try { > Map configs = new > HashMap<>(pf.getConfigurationProperties()); > int committed = testGoodTx(client, configs); > System.out.println("Successes (same producer) > committed: " + committed); > int rolledBack = testAppFailureTx(client, > configs); > System.out.println("App failures (same > producer) rolled back: " + rolledBack); > > // first flavor - hung thread in > abortTransaction() > //rolledBack = > testSendFailureTxSameProducer(client, configs); > //System.out.println("Send failures (same > producer) rolled back: " + rolledBack); > > // second flavor - timeout in initTransactions() > rolledBack = > testSendFailureTxNewProducer(client, configs); > System.out.println("Send failures (new > producer) rolled back: " + rolledBack); > } > catch (Exception e) { > e.printStackTrace(); > } > }; > } > private int testGoodTx(AdminClient client, Map configs) > throws ExecutionException { > int commits = 0; > NewTopic topic = TopicBuilder.name("so55510898a") > .partitions(1) > .replicas(1) > .build(); > createTopic(client, topic); > configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txa-"); > Producer producer = new > KafkaProducer<>(configs); > try { > producer.initTransactions(); > for (int i = 0; i < 2; i++) { > producer.beginTransaction(); > RecordMetadata recordMetadata = producer.send( > new > ProducerRecord<>("so55510898a", "foo")).get(10, > TimeUnit.SECONDS); > System.out.println(recordMetadata); > producer.commitTransaction(); > commits++; > } > } > catch (ProducerFencedException | OutOfOrderSequenceException | > AuthorizationException e) { > // We can't recover from these exceptions, so our only > option is to close the producer and exit. > } >
[jira] [Comment Edited] (KAFKA-8195) Unstable Producer After Send Failure in Transaction
[ https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835601#comment-16835601 ] Viktor Somogyi-Vass edited comment on KAFKA-8195 at 5/8/19 1:30 PM: Tried to reproduce the second flavor with your code and from the debug level logs it seems like the first transaction with won't be finished and starting the attempts of the second producer would be interpreted as concurrent transactions: {noformat} [2019-05-08 15:19:16,293] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-3, correlationId=1725) -- {transactional_id=txd-,transaction_timeout_ms=6},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1} from connection 172.30.64.130:9091-172.30.64.130:58192-14;totalTime:0.241,requestQueueTime:0.06,localTime:0.085,remoteTime:0.0,throttleTime:0.035,responseQueueTime:0.052,sendTime:0.056,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) {noformat} This causes the client to reeenqueue the InitProducerId request until it times out. I guess the best would be to detect this situation in the brokers and remove that glitched "transaction" but I need to look into this more. was (Author: viktorsomogyi): Tried to reproduce the second flavor with your code and from the debug level logs it seems like the first transaction with won't be finished and starting the attempts of the second producer would be interpreted as concurrent transactions: {format} [2019-05-08 15:19:16,293] DEBUG Completed request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-3, correlationId=1725) -- {transactional_id=txd-,transaction_timeout_ms=6},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1} from connection 172.30.64.130:9091-172.30.64.130:58192-14;totalTime:0.241,requestQueueTime:0.06,localTime:0.085,remoteTime:0.0,throttleTime:0.035,responseQueueTime:0.052,sendTime:0.056,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT (kafka.request.logger) {format} This causes the client to reeenqueue the InitProducerId request until it times out. I guess the best would be to detect this situation in the brokers and remove that glitched "transaction" but I need to look into this more. > Unstable Producer After Send Failure in Transaction > --- > > Key: KAFKA-8195 > URL: https://issues.apache.org/jira/browse/KAFKA-8195 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.1, 2.2.0 >Reporter: Gary Russell >Priority: Blocker > > This journey started with [this Stack Overflow question | > https://stackoverflow.com/questions/55510898]. > I easily reproduced his issue (see my comments on his question). > My first step was to take Spring out of the picture and replicate the issue > with the native {{Producer}} apis. The following code shows the result; I > have attached logs and stack traces. > There are four methods in the test; the first performs 2 transactions, > successfully, on the same {{Producer}} instance. > The second aborts 2 transactions, successfully, on the same {{Producer}} > instance - application level failures after performing a send. > There are two flavors of the problem: > The third attempts to send 2 messages, on the same {{Producer}} that are too > large for the topic; the first aborts as expected; the second send hangs in > {{abortTransaction}} after getting a {{TimeoutException}} when attempting to > {{get}} the send metadata. See log {{hang.same.producer.log}} - it also > includes the stack trace showing the hang. > The fourth test is similar to the third but it closes the producer after the > first failure; this time, we timeout in {{initTransactions()}}. > Subsequent executions of this test get the timeout on the first attempt - > that {{transactional.id}} seems to be unusable. Removing the logs was one way > I found to get past the problem. > Test code > {code:java} > public ApplicationRunner runner(AdminClient client, > DefaultKafkaProducerFactory pf) { > return args -> { > try { > Map configs = new > HashMap<>(pf.getConfigurationProperties()); > int committed = testGoodTx(client, configs); > System.out.println("Successes (same producer) > committed: " + committed); > int rolledBack = testAppFailureTx(client, > configs); > System.out.println("App failures (same > producer) rolled back: " + rolledBack); > > // first flavor - hung thread in >
[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read
[ https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] arthur updated KAFKA-7757: -- Attachment: dump.txt > Too many open files after java.io.IOException: Connection to n was > disconnected before the response was read > > > Key: KAFKA-7757 > URL: https://issues.apache.org/jira/browse/KAFKA-7757 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Pedro Gontijo >Priority: Major > Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, > fd-spike-threads.txt, kafka-allocated-file-handles.png, server.properties, > td1.txt, td2.txt, td3.txt > > > We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers) > After a while (hours) 2 brokers start to throw: > {code:java} > java.io.IOException: Connection to NN was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > File descriptors start to pile up and if I do not restart it throws "Too many > open files" and crashes. > {code:java} > ERROR Error while accepting connection (kafka.network.Acceptor) > java.io.IOException: Too many open files in system > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:460) > at kafka.network.Acceptor.run(SocketServer.scala:403) > at java.lang.Thread.run(Thread.java:748) > {code} > > After some hours the issue happens again... It has happened with all > brokers, so it is not something specific to an instance. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read
[ https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835621#comment-16835621 ] arthur commented on KAFKA-7757: --- Hello, we are also facing the same issue running kafka 2.2.0: ls /proc/61020/fd | wc -l 165651 kafka-topics --version 2.2.0-cp2 (Commit:325e9879cbc6d612) link to thread dump: [^dump.txt] > Too many open files after java.io.IOException: Connection to n was > disconnected before the response was read > > > Key: KAFKA-7757 > URL: https://issues.apache.org/jira/browse/KAFKA-7757 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0 >Reporter: Pedro Gontijo >Priority: Major > Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, > fd-spike-threads.txt, kafka-allocated-file-handles.png, server.properties, > td1.txt, td2.txt, td3.txt > > > We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers) > After a while (hours) 2 brokers start to throw: > {code:java} > java.io.IOException: Connection to NN was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97) > at > kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190) > at > kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130) > at > kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129) > at scala.Option.foreach(Option.scala:257) > at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > File descriptors start to pile up and if I do not restart it throws "Too many > open files" and crashes. > {code:java} > ERROR Error while accepting connection (kafka.network.Acceptor) > java.io.IOException: Too many open files in system > at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) > at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) > at kafka.network.Acceptor.accept(SocketServer.scala:460) > at kafka.network.Acceptor.run(SocketServer.scala:403) > at java.lang.Thread.run(Thread.java:748) > {code} > > After some hours the issue happens again... It has happened with all > brokers, so it is not something specific to an instance. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7847) KIP-421: Automatically resolve external configurations.
[ https://issues.apache.org/jira/browse/KAFKA-7847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-7847: -- Fix Version/s: 2.3.0 > KIP-421: Automatically resolve external configurations. > --- > > Key: KAFKA-7847 > URL: https://issues.apache.org/jira/browse/KAFKA-7847 > Project: Kafka > Issue Type: Improvement > Components: config >Reporter: TEJAL ADSUL >Priority: Minor > Fix For: 2.3.0 > > Original Estimate: 336h > Remaining Estimate: 336h > > This proposal intends to enhance the AbstractConfig base class to support > replacing variables in configurations just prior to parsing and validation. > This simple change will make it very easy for client applications, Kafka > Connect, and Kafka Streams to use shared code to easily incorporate > externalized secrets and other variable replacements within their > configurations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8337) Fix tests/setup.cfg to work with pytest 4.x
Kengo Seki created KAFKA-8337: - Summary: Fix tests/setup.cfg to work with pytest 4.x Key: KAFKA-8337 URL: https://issues.apache.org/jira/browse/KAFKA-8337 Project: Kafka Issue Type: Bug Components: system tests, unit tests Reporter: Kengo Seki In accordance with tests/README.md, I ran {{`python setup.py test`}} in the {{tests}} directory to execute unit tests for the system tests, but got the following error: {code} $ python setup.py test running test (snip) Using /home/sekikn/repo/kafka/tests/.eggs/docutils-0.14-py2.7.egg Searching for pytest Best match: pytest 4.4.1 Processing pytest-4.4.1-py2.7.egg (snip) builtins.Failed: [pytest] section in setup.cfg files is no longer supported, change to [tool:pytest] instead. {code} This is because [\[pytest\] section in setup.cfg has been removed in pytest 4.0|https://docs.pytest.org/en/4.1.0/deprecations.html#pytest-section-in-setup-cfg-files]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8337) Fix tests/setup.cfg to work with pytest 4.x
[ https://issues.apache.org/jira/browse/KAFKA-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835682#comment-16835682 ] ASF GitHub Bot commented on KAFKA-8337: --- sekikn commented on pull request #6701: KAFKA-8337: Fix tests/setup.cfg to work with pytest 4.x URL: https://github.com/apache/kafka/pull/6701 This PR replaces [pytest] section in tests/setup.cfg with [tool:pytest] so that the unit tests for the system tests work with pytest 4.x. I ran `python setup.py test` and confirmed it succeeded. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix tests/setup.cfg to work with pytest 4.x > --- > > Key: KAFKA-8337 > URL: https://issues.apache.org/jira/browse/KAFKA-8337 > Project: Kafka > Issue Type: Bug > Components: system tests, unit tests >Reporter: Kengo Seki >Priority: Minor > > In accordance with tests/README.md, I ran {{`python setup.py test`}} in the > {{tests}} directory to execute unit tests for the system tests, but got the > following error: > {code} > $ python setup.py test > running test > (snip) > Using /home/sekikn/repo/kafka/tests/.eggs/docutils-0.14-py2.7.egg > Searching for pytest > Best match: pytest 4.4.1 > Processing pytest-4.4.1-py2.7.egg > (snip) > builtins.Failed: [pytest] section in setup.cfg files is no longer supported, > change to [tool:pytest] instead. > {code} > This is because [\[pytest\] section in setup.cfg has been removed in pytest > 4.0|https://docs.pytest.org/en/4.1.0/deprecations.html#pytest-section-in-setup-cfg-files]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835720#comment-16835720 ] Jason Gustafson commented on KAFKA-8335: Thanks for the report. Can you provide your broker configuration? COMMIT markers should be cleaned as soon as all the data from the transaction has also been deleted. It can be delayed by as long as `delete.retention.ms` though. It might be helpful if you provide the full dump of the segment. > Log cleaner skips Transactional mark and batch record, causing unlimited > growth of __consumer_offsets > - > > Key: KAFKA-8335 > URL: https://issues.apache.org/jira/browse/KAFKA-8335 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Priority: Major > > My Colleague Weichu already sent out a mail to kafka user mailing list > regarding this issue, but we think it's worth having a ticket tracking it. > We are using Kafka Streams with exactly-once enabled on a Kafka cluster for > a while. > Recently we found that the size of __consumer_offsets partitions grew huge. > Some partition went over 30G. This caused Kafka to take quite long to load > "__consumer_offsets" topic on startup (it loads the topic in order to > become group coordinator). > We dumped the __consumer_offsets segments and found that while normal > offset commits are nicely compacted, transaction records (COMMIT, etc) are > all preserved. Looks like that since these messages don't have a key, the > LogCleaner is keeping them all: > -- > $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /003484332061.log --key-decoder-class > kafka.serializer.StringDecoder 2>/dev/null | cat -v | head > Dumping 003484332061.log > Starting offset: 3484332061 > offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006 > producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 81 > offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005 > producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 84 > ... > -- > Streams is doing transaction commits per 100ms (commit.interval.ms=100 when > exactly-once) so the __consumer_offsets is growing really fast. > Is this (to keep all transactions) by design, or is that a bug for > LogCleaner? What would be the way to clean up the topic? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7320) Provide ability to disable auto topic creation in KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-7320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835742#comment-16835742 ] ASF GitHub Bot commented on KAFKA-7320: --- hachikuji commented on pull request #5542: KAFKA-7320: Add consumer configuration to disable auto topic creation URL: https://github.com/apache/kafka/pull/5542 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Provide ability to disable auto topic creation in KafkaConsumer > --- > > Key: KAFKA-7320 > URL: https://issues.apache.org/jira/browse/KAFKA-7320 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Dhruvil Shah >Assignee: Dhruvil Shah >Priority: Major > > Consumers should have a configuration to control whether subscribing to > non-existent topics should automatically create the topic or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7320) Provide ability to disable auto topic creation in KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-7320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-7320. Resolution: Fixed Fix Version/s: 2.3.0 > Provide ability to disable auto topic creation in KafkaConsumer > --- > > Key: KAFKA-7320 > URL: https://issues.apache.org/jira/browse/KAFKA-7320 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Dhruvil Shah >Assignee: Dhruvil Shah >Priority: Major > Fix For: 2.3.0 > > > Consumers should have a configuration to control whether subscribing to > non-existent topics should automatically create the topic or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8326) Add List Serde
[ https://issues.apache.org/jira/browse/KAFKA-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniyar Yeralin updated KAFKA-8326: --- Description: I propose _adding new ListSerializer and ListDeserializer classes as well as support for the new classes into the Serdes class. This will allow using List Serde directly from Consumers, Producers and Streams._ _List serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic T's Serde._ _For example, if you want to create List of Strings serde, you will have to declare `new ListSerde<>(Serdes.String())`, in this case serializer/deserializer of String Serde will be used to serialize/deserialize each entry in `List`._ I believe there are many use cases where List Serde could be used. Ex. [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows], [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api] For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection. KIP Link: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization] was: I propose _adding new ListSerializer and ListDeserializer classes as well as support for the new classes into the Serdes class. This will allow using List Serde directly from Consumers, Producers and Streams._ _List serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic T's Serde._ _For example, if you want to create List of Strings serde, you will have to declare `new Serdes.ListSerde<>(Serdes.String(), Comparator.comparing(String::length))`, in this case serializer/deserializer of String Serde will be used to serialize/deserialize each entry in `List`._ I believe there are many use cases where List Serde could be used. Ex. [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows], [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api] For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection. KIP Link: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization] > Add List Serde > - > > Key: KAFKA-8326 > URL: https://issues.apache.org/jira/browse/KAFKA-8326 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Daniyar Yeralin >Assignee: Daniyar Yeralin >Priority: Minor > Labels: kip > > I propose _adding new ListSerializer and ListDeserializer classes as well as > support for the new classes into the Serdes class. This will allow using > List Serde directly from Consumers, Producers and Streams._ > _List serialization and deserialization will be done through repeatedly > calling a serializer/deserializer for each entry provided by passed generic > T's Serde._ > _For example, if you want to create List of Strings serde, you will have to > declare `new ListSerde<>(Serdes.String())`, in this case > serializer/deserializer of String Serde will be used to serialize/deserialize > each entry in `List`._ > I believe there are many use cases where List Serde could be used. Ex. > [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows], > > [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api] > For instance, aggregate grouped (by key) values together in a list to do > other subsequent operations on the collection. > > KIP Link: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835784#comment-16835784 ] Andrew commented on KAFKA-8315: --- After a lot of investigation, we think this issue is down to the fact that we have a left stream with minute-by-minute data and a right topic with daily data. It is not clear what logic controls the rate at which records are read from left and right streams, but we believe that the right topic is being read at a rate such that that it quickly gets too far ahead of the left stream (in terms of event-time) and therefore the right stream windows are being expired before the left stream data has been read. [~vvcephei] What controls the rate that records are read from the left and right streams? Is there any guarantee that the timestamps for the records in the left and right streams are kept more-or-less in line with the records from the right stream? If not, is there any way we can somehow delay the right stream? Thanks for your help above. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835784#comment-16835784 ] Andrew edited comment on KAFKA-8315 at 5/8/19 5:54 PM: --- After a lot of investigation, we think this issue is down to the fact that we have a left stream with minute-by-minute data and a right topic with daily data. It is not clear what logic controls the rate at which records are read from left and right streams, but we believe that the right topic is being read at a rate such that that it quickly gets too far ahead of the left stream (in terms of event-time), as there are far fewer records, and therefore the right stream windows are being expired before the left stream data has been read. [~vvcephei] What controls the rate that records are read from the left and right streams? Is there any guarantee that the timestamps for the records in the left and right streams are kept more-or-less in line with the records from the right stream? If not, is there any way we can somehow delay the right stream? Thanks for your help above. was (Author: the4thamigo_uk): After a lot of investigation, we think this issue is down to the fact that we have a left stream with minute-by-minute data and a right topic with daily data. It is not clear what logic controls the rate at which records are read from left and right streams, but we believe that the right topic is being read at a rate such that that it quickly gets too far ahead of the left stream (in terms of event-time) and therefore the right stream windows are being expired before the left stream data has been read. [~vvcephei] What controls the rate that records are read from the left and right streams? Is there any guarantee that the timestamps for the records in the left and right streams are kept more-or-less in line with the records from the right stream? If not, is there any way we can somehow delay the right stream? Thanks for your help above. > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8326) Add List Serde
[ https://issues.apache.org/jira/browse/KAFKA-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniyar Yeralin updated KAFKA-8326: --- Description: _This ticket proposes adding new ListSerializer and ListDeserializer classes as well as support for the new classes into the Serdes class. This will allow using List Serde of type T directly from Consumers, Producers and Streams._ _List serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic T's Serde. For example, if you want to create List of Strings serde, then serializer/deserializer of StringSerde will be used to serialize/deserialize each entry in `List`._ I believe there are many use cases where List Serde could be used. Ex. [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows], [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api] For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection. KIP Link: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization] was: I propose _adding new ListSerializer and ListDeserializer classes as well as support for the new classes into the Serdes class. This will allow using List Serde directly from Consumers, Producers and Streams._ _List serialization and deserialization will be done through repeatedly calling a serializer/deserializer for each entry provided by passed generic T's Serde._ _For example, if you want to create List of Strings serde, you will have to declare `new ListSerde<>(Serdes.String())`, in this case serializer/deserializer of String Serde will be used to serialize/deserialize each entry in `List`._ I believe there are many use cases where List Serde could be used. Ex. [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows], [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api] For instance, aggregate grouped (by key) values together in a list to do other subsequent operations on the collection. KIP Link: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization] > Add List Serde > - > > Key: KAFKA-8326 > URL: https://issues.apache.org/jira/browse/KAFKA-8326 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Daniyar Yeralin >Assignee: Daniyar Yeralin >Priority: Minor > Labels: kip > > _This ticket proposes adding new ListSerializer and ListDeserializer classes > as well as support for the new classes into the Serdes class. This will allow > using List Serde of type T directly from Consumers, Producers and Streams._ > _List serialization and deserialization will be done through repeatedly > calling a serializer/deserializer for each entry provided by passed generic > T's Serde. For example, if you want to create List of Strings serde, then > serializer/deserializer of StringSerde will be used to serialize/deserialize > each entry in `List`._ > I believe there are many use cases where List Serde could be used. Ex. > [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows], > > [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api] > For instance, aggregate grouped (by key) values together in a list to do > other subsequent operations on the collection. > KIP Link: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8338) Improve consumer offset expiration logic to take subscription into account
Gwen Shapira created KAFKA-8338: --- Summary: Improve consumer offset expiration logic to take subscription into account Key: KAFKA-8338 URL: https://issues.apache.org/jira/browse/KAFKA-8338 Project: Kafka Issue Type: Improvement Reporter: Gwen Shapira Currently, we expire consumer offsets for a group after the group is considered gone. There is a case where the consumer group still exists, but is now subscribed to different topics. In that case, the offsets of the old topics will never expire and if lag is monitored, the monitors will show ever-growing lag on those topics. We need to improve the logic to expire the consumer offsets if the consumer group didn't subscribe to specific topics/partitions for enough time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8048) remove KafkaMbean when network close
[ https://issues.apache.org/jira/browse/KAFKA-8048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8048. Resolution: Duplicate > remove KafkaMbean when network close > > > Key: KAFKA-8048 > URL: https://issues.apache.org/jira/browse/KAFKA-8048 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: limeng >Priority: Critical > Fix For: 2.2.2 > > > the broker server will be oom when > * a large number of clients frequently close and reconnect > * the clientId changes every time when reconnect,that gives rise to too much > kafkaMbean in broker > the reason is that broker forget to remove kafkaMbean when detect connection > closes. > h2. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8049) remove KafkaMbean when network close
[ https://issues.apache.org/jira/browse/KAFKA-8049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8049. Resolution: Duplicate > remove KafkaMbean when network close > > > Key: KAFKA-8049 > URL: https://issues.apache.org/jira/browse/KAFKA-8049 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: limeng >Priority: Critical > Fix For: 2.2.2 > > > the broker server will be oom when > * a large number of clients frequently close and reconnect > * the clientId changes every time when reconnect,that gives rise to too much > kafkaMbean in broker > the reason is that broker forget to remove kafkaMbean when detect connection > closes. > h2. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8050) remove KafkaMbean when network close
[ https://issues.apache.org/jira/browse/KAFKA-8050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8050. Resolution: Duplicate > remove KafkaMbean when network close > > > Key: KAFKA-8050 > URL: https://issues.apache.org/jira/browse/KAFKA-8050 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: limeng >Priority: Critical > Fix For: 2.2.2 > > > the broker server will be oom when > * a large number of clients frequently close and reconnect > * the clientId changes every time when reconnect,that gives rise to too much > kafkaMbean in broker > the reason is that broker forget to remove kafkaMbean when detect connection > closes. > h2. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8051) remove KafkaMbean when network close
[ https://issues.apache.org/jira/browse/KAFKA-8051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-8051. Resolution: Duplicate > remove KafkaMbean when network close > > > Key: KAFKA-8051 > URL: https://issues.apache.org/jira/browse/KAFKA-8051 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2 >Reporter: limeng >Priority: Critical > Fix For: 2.2.2 > > > the broker server will be oom when > * a large number of clients frequently close and reconnect > * the clientId changes every time when reconnect,that gives rise to too much > kafkaMbean in broker > the reason is that broker forget to remove kafkaMbean when detect connection > closes. > h2. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835832#comment-16835832 ] John Roesler commented on KAFKA-8315: - Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your investigation is progressing. Last question first: Sterams should choose to consume from the left or right based on which one has the lower timestamp in the next record, so I would not expect one side to "run ahead" of the other. There's one caveat, that when one side is being produced more slowly, Streams won't just wait indefinitely for the next data, but instead just process the side that does have data. This is controled by the "max idle ms" config, but since you're processing historically, this shouldn't be your problem. Still might be worth a look. Maybe for debugging purposes, you can print out the key, value, and timestamp for each of the sides as well as in the joiner, so you can identify which side is triggering the join, and evaluate whether or not it's correctly time-ordered. If it is in fact running ahead on one side, despite what it should be doing, this would explain why you see better results with a larger grace period. To confirm, the grace period should only matter up to the maximum time skew in your stream. So, as you said, if you have two producers that each produce a full 24 hours of data, sequentially, then you should see stream time advance when the first producer writes its data, and then "freeze" while the second producer writes its (out-of-order) data. Thus, you'll want to set the grace period to keep old windows around for at least 24 hours, since you know you have to wait for that second producer's data. Finally, to answer your earlier questions, yes, each task is handling just one partition of both input topics (the same partition on the left and right). Stream Time is independently maintained for each task/partition, and it is computed simply as the highest timestamp yet observed for that partition. If you want to look at it in detail, it's tracked in org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . Actually, you can set that class's logger to DEBUG mode and it'll print out every time it skips a record that is outside of retention. Minor point, you should not need to mess with the retention of the changelog topic. Streams sets this appropriately to preserve the same data as the store, but this is only apparent when restoring the store. The actual results of the join are served out of the state store, so only the state store's retention matters. This is what you're setting with the grace period. I hope this helps! -John > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835832#comment-16835832 ] John Roesler edited comment on KAFKA-8315 at 5/8/19 7:17 PM: - Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your investigation is progressing. Last question first: Streams should choose to consume from the left or right based on which one has the lower timestamp in the next record, so I would not expect one side to "run ahead" of the other. There's one caveat, that when one side is being produced more slowly, Streams won't just wait indefinitely for the next data, but instead just process the side that does have data. This is controled by the "max idle ms" config, but since you're processing historically, this shouldn't be your problem. Still might be worth a look. Maybe for debugging purposes, you can print out the key, value, and timestamp for each of the sides as well as in the joiner, so you can identify which side is triggering the join, and evaluate whether or not it's correctly time-ordered. If it is in fact running ahead on one side, despite what it should be doing, this would explain why you see better results with a larger grace period. To confirm, the grace period should only matter up to the maximum time skew in your stream. So, as you said, if you have two producers that each produce a full 24 hours of data, sequentially, then you should see stream time advance when the first producer writes its data, and then "freeze" while the second producer writes its (out-of-order) data. Thus, you'll want to set the grace period to keep old windows around for at least 24 hours, since you know you have to wait for that second producer's data. Finally, to answer your earlier questions, yes, each task is handling just one partition of both input topics (the same partition on the left and right). Stream Time is independently maintained for each task/partition, and it is computed simply as the highest timestamp yet observed for that partition. If you want to look at it in detail, it's tracked in org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . Actually, you can set that class's logger to DEBUG mode and it'll print out every time it skips a record that is outside of retention. Minor point, you should not need to mess with the retention of the changelog topic. Streams sets this appropriately to preserve the same data as the store, but this is only apparent when restoring the store. The actual results of the join are served out of the state store, so only the state store's retention matters. This is what you're setting with the grace period. I hope this helps! -John was (Author: vvcephei): Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your investigation is progressing. Last question first: Sterams should choose to consume from the left or right based on which one has the lower timestamp in the next record, so I would not expect one side to "run ahead" of the other. There's one caveat, that when one side is being produced more slowly, Streams won't just wait indefinitely for the next data, but instead just process the side that does have data. This is controled by the "max idle ms" config, but since you're processing historically, this shouldn't be your problem. Still might be worth a look. Maybe for debugging purposes, you can print out the key, value, and timestamp for each of the sides as well as in the joiner, so you can identify which side is triggering the join, and evaluate whether or not it's correctly time-ordered. If it is in fact running ahead on one side, despite what it should be doing, this would explain why you see better results with a larger grace period. To confirm, the grace period should only matter up to the maximum time skew in your stream. So, as you said, if you have two producers that each produce a full 24 hours of data, sequentially, then you should see stream time advance when the first producer writes its data, and then "freeze" while the second producer writes its (out-of-order) data. Thus, you'll want to set the grace period to keep old windows around for at least 24 hours, since you know you have to wait for that second producer's data. Finally, to answer your earlier questions, yes, each task is handling just one partition of both input topics (the same partition on the left and right). Stream Time is independently maintained for each task/partition, and it is computed simply as the highest timestamp yet observed for that partition. If you want to look at it in detail, it's tracked in org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . Actually, you can set that class's logger to DEBUG mode and it'll print out every time it skips a record that is outside of retention. Minor point, you sh
[jira] [Created] (KAFKA-8339) At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition
tdp created KAFKA-8339: -- Summary: At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition Key: KAFKA-8339 URL: https://issues.apache.org/jira/browse/KAFKA-8339 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.0.1 Reporter: tdp We have hit a race condition several times now between the StreamThread committing its offsets for a task before the task has fully processed the record through the topology. Consider part of a topology that looks like this: TOPIC T1 -> KSTREAM-SOURCE-NODE1 > KSTREAM-TRANSFORMVALUES-NODE1 > KSTREAM-FILTER-NODE1 > KSTREAM-MAPVALUES-NODE1 -> KSTREAM-SINK-NODE1 -> TOPIC T2 Records are committed to topic T1. KSTREAM-SOURCE-NODE1 consumes these records from topic T1. KSTREAM-TRANSFORMVALUES-NODE1 aggregates these records using a local state store. KSTREAM-TRANSFORMVALUES-NODE1 returns null if not all necessary records from topic T1 have been consumed yet or an object representing an aggregation of records if all necessary records from topic T1 have been consumed. KSTREAM-FILTER-NODE1 then filters out anything that is null. Only an aggregation of records is passed to the KSTREAM-MAPVALUES-NODE1 node. KSTREAM-MAPVALUES-NODE1 then maps the aggregation of records into another object type. KSTREAM-SINK-NODE1 then attempts to produce this other object to topic T2. The race condition occurs when the stream thread commits its offsets for topic T1 after it consumes some or all of the necessary records from topic T1 for an aggregation but before it gets the failure response back from the async produce kicked off by KSTREAM-SINK-NODE1. We are running with a LogAndFailExceptionHandler, so when the stream thread tries to commit the next time it fails and the stream thread shuts itself down. The stream task is then reassigned to another stream thread, which reads the offsets previously committed by the original stream thread. That means the new stream thread's KSTREAM-SOURCE-NODE1 will never be able to consume the messages required for the aggregation and the KSTREAM-SINK-NODE1 will never end up producing the required records to topic T2. This is why it seems the at-least-once delivery guarantee is not met - KSTREAM-SINK-NODE1 never successfully processed records and the stream application continued on past it. Note: we are running with StreamsConfig.RETRIES_CONFIG set to 10, which increases the likelihood of occurrence of the issue when all retries fail since it widens the window at which the async offset commit can occur before the produce record request is marked as failed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8340) ServiceLoader fails when used from isolated plugin path directory
Chris Egerton created KAFKA-8340: Summary: ServiceLoader fails when used from isolated plugin path directory Key: KAFKA-8340 URL: https://issues.apache.org/jira/browse/KAFKA-8340 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Chris Egerton Under some circumstances, the {{ServiceLoader.load}} mechanism will fail when used from an isolated plugin path directory and return an incomplete (often empty) {{ServiceLoader}} instance. To replicate: * Include a {{META-INF/services/...}} file in one of the JARS located in a plugin's directory with one or more implementations of that service listed inside. For the sake of example, let's say the name of this service is {{com.example.MyService}} * Program that plugin to invoke {{ServiceLoader.load(com.example.MyService.class)}} * Start the Connect framework, making sure this plugin is included on the plugin path and that it somehow invokes the {{ServiceLoader.load(...)}} method * Observe that the services loaded by that invocation do not include the ones described in the {{META-INF/services/...}} file contained in the JAR in the plugin's directory This is because the [ServiceLoader.load(Class)|https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html#load(java.lang.Class)] method uses the current thread's context classloader to locate resources and load services. The current thread's context classloader is, in most cases, an instance of {{DelegatingClassLoader}}, which will (unless asked to locate resources corresponding to a provider-configuration file for a REST extension or config provider) simply delegate resource location to the parent and, unless asked to locate a class for a recognized plugin, also delegate class loading to the parent. Thus, none of the plugin's JARs are scanned for either provider-configuration files or for actual service classes. A viable workaround for some cases is to instead use the [ServiceLoader.load(Class, ClassLoader)|https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html#load(java.lang.Class,%20java.lang.ClassLoader)] method, specifying the current class's classloader as the second argument. This causes the plugin's {{PluginClassLoader}}, which will scan all JARs in the plugin's directory to be used to locate resources and classes. However, this may not be feasible in all cases, especially when working with external libraries that may be difficult or impossible to apply this workaround on. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8337) Fix tests/setup.cfg to work with pytest 4.x
[ https://issues.apache.org/jira/browse/KAFKA-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kengo Seki reassigned KAFKA-8337: - Assignee: Kengo Seki > Fix tests/setup.cfg to work with pytest 4.x > --- > > Key: KAFKA-8337 > URL: https://issues.apache.org/jira/browse/KAFKA-8337 > Project: Kafka > Issue Type: Bug > Components: system tests, unit tests >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Minor > > In accordance with tests/README.md, I ran {{`python setup.py test`}} in the > {{tests}} directory to execute unit tests for the system tests, but got the > following error: > {code} > $ python setup.py test > running test > (snip) > Using /home/sekikn/repo/kafka/tests/.eggs/docutils-0.14-py2.7.egg > Searching for pytest > Best match: pytest 4.4.1 > Processing pytest-4.4.1-py2.7.egg > (snip) > builtins.Failed: [pytest] section in setup.cfg files is no longer supported, > change to [tool:pytest] instead. > {code} > This is because [\[pytest\] section in setup.cfg has been removed in pytest > 4.0|https://docs.pytest.org/en/4.1.0/deprecations.html#pytest-section-in-setup-cfg-files]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8341) AdminClient should retry coordinator lookup after NOT_COORDINATOR error
Jason Gustafson created KAFKA-8341: -- Summary: AdminClient should retry coordinator lookup after NOT_COORDINATOR error Key: KAFKA-8341 URL: https://issues.apache.org/jira/browse/KAFKA-8341 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Vikas Singh If a group operation (e.g. DescribeGroup) fails because the coordinator has moved, the AdminClient should lookup the coordinator before retrying the operation. Currently we will either fail or just retry anyway. This is similar in some ways to controller rediscovery after getting NOT_CONTROLLER errors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8339) At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition
[ https://issues.apache.org/jira/browse/KAFKA-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16835931#comment-16835931 ] Matthias J. Sax commented on KAFKA-8339: {quote}The race condition occurs when the stream thread commits its offsets for topic T1 after it consumes some or all of the necessary records from topic T1 for an aggregation but before it gets the failure response back from the async produce kicked off by KSTREAM-SINK-NODE1. {quote} This does not sound correct. Before offsets are committed, `producer.flush()` is called and all pending in-flight request should be written. Committing offset should only happen if no error occurred during `flush()`. Can you confirm your observation? If your description is correct, we need to figure out why we still commit even if an error occurred. Also, offsets are not committed async but via `consumer.commitSync()`. {quote}LogAndFailExceptionHandler {quote} This handler is for the input path, and should only be called if there is a deserialization exception. Thus, I don't see how it is related to the other things reported here? Can you elaborate? {quote}so when the stream thread tries to commit the next time it fails and the stream thread shuts itself down. {quote} I don't see the causality? Why would committing fail? Also, what error message do you see on a failed commit? > At-least-once delivery guarantee seemingly not met due to async commit / > produce failure race condition > --- > > Key: KAFKA-8339 > URL: https://issues.apache.org/jira/browse/KAFKA-8339 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: tdp >Priority: Major > > We have hit a race condition several times now between the StreamThread > committing its offsets for a task before the task has fully processed the > record through the topology. > > Consider part of a topology that looks like this: > > TOPIC T1 -> KSTREAM-SOURCE-NODE1 > KSTREAM-TRANSFORMVALUES-NODE1 > > KSTREAM-FILTER-NODE1 > KSTREAM-MAPVALUES-NODE1 -> KSTREAM-SINK-NODE1 -> TOPIC > T2 > > Records are committed to topic T1. KSTREAM-SOURCE-NODE1 consumes these > records from topic T1. KSTREAM-TRANSFORMVALUES-NODE1 aggregates these records > using a local state store. KSTREAM-TRANSFORMVALUES-NODE1 returns null if not > all necessary records from topic T1 have been consumed yet or an object > representing an aggregation of records if all necessary records from topic T1 > have been consumed. KSTREAM-FILTER-NODE1 then filters out anything that is > null. Only an aggregation of records is passed to the KSTREAM-MAPVALUES-NODE1 > node. KSTREAM-MAPVALUES-NODE1 then maps the aggregation of records into > another object type. KSTREAM-SINK-NODE1 then attempts to produce this other > object to topic T2. > > The race condition occurs when the stream thread commits its offsets for > topic T1 after it consumes some or all of the necessary records from topic T1 > for an aggregation but before it gets the failure response back from the > async produce kicked off by KSTREAM-SINK-NODE1. > > We are running with a LogAndFailExceptionHandler, so when the stream thread > tries to commit the next time it fails and the stream thread shuts itself > down. The stream task is then reassigned to another stream thread, which > reads the offsets previously committed by the original stream thread. That > means the new stream thread's KSTREAM-SOURCE-NODE1 will never be able to > consume the messages required for the aggregation and the KSTREAM-SINK-NODE1 > will never end up producing the required records to topic T2. This is why it > seems the at-least-once delivery guarantee is not met - KSTREAM-SINK-NODE1 > never successfully processed records and the stream application continued on > past it. > Note: we are running with StreamsConfig.RETRIES_CONFIG set to 10, which > increases the likelihood of occurrence of the issue when all retries fail > since it widens the window at which the async offset commit can occur before > the produce record request is marked as failed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8196) Replace InitProducerId request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-8196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-8196. Resolution: Fixed > Replace InitProducerId request/response with automated protocol > --- > > Key: KAFKA-8196 > URL: https://issues.apache.org/jira/browse/KAFKA-8196 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Jason Gustafson >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-7830) Convert Kafka RPCs to use automatically generated code
[ https://issues.apache.org/jira/browse/KAFKA-7830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reopened KAFKA-7830: > Convert Kafka RPCs to use automatically generated code > -- > > Key: KAFKA-7830 > URL: https://issues.apache.org/jira/browse/KAFKA-7830 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > > KAFKA-7609 added a way of automatically generating code for reading and > writing Kafka RPC message types from JSON schemas. > Automatically generated code is preferrable to manually written serialization > code. > * * It is less tedious and error-prone to use than hand-written code. > * For developers writing Kafka clients in other languages, the JSON schemas > are useful in a way that the java serialization code is not. > * It will eventually be possible to automatically validate aspects of > cross-version compatibility, when using JSON message schemas. > * Once all of the RPCs are converted, we can drop using Structs in favor of > serializing directly to ByteBuffer, to reduce GC load. > This JIRA tracks converting the current hand-written message serialization > code to automatically generated serialization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7830) Convert Kafka RPCs to use automatically generated code
[ https://issues.apache.org/jira/browse/KAFKA-7830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-7830. Resolution: Fixed > Convert Kafka RPCs to use automatically generated code > -- > > Key: KAFKA-7830 > URL: https://issues.apache.org/jira/browse/KAFKA-7830 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > > KAFKA-7609 added a way of automatically generating code for reading and > writing Kafka RPC message types from JSON schemas. > Automatically generated code is preferrable to manually written serialization > code. > * * It is less tedious and error-prone to use than hand-written code. > * For developers writing Kafka clients in other languages, the JSON schemas > are useful in a way that the java serialization code is not. > * It will eventually be possible to automatically validate aspects of > cross-version compatibility, when using JSON message schemas. > * Once all of the RPCs are converted, we can drop using Structs in favor of > serializing directly to ByteBuffer, to reduce GC load. > This JIRA tracks converting the current hand-written message serialization > code to automatically generated serialization. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8286) KIP-460 Admin Leader Election RPC
[ https://issues.apache.org/jira/browse/KAFKA-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-8286: -- Description: Tracking issue for implementing KIP-460. Tasks: # [Done] Design KIP # [Done] Review KIP # [Done] Approve KIP # [Done] Update RPC to support KIP # [Done] Update controller to support KIP # [Done] Create CLI command (kafka-leader-election) that implement KIP # [Done] Search and replace any usage of “preferred” in the code # Add test for command # Add test for controller functionality # Revisit all of the documentation - generate and audit the new javadocs # Deprecated since... needs to be update # Review PR # Merge PR was: Tracking issue for implementing KIP-460. Tasks: # [Done] Design KIP # [Done] Review KIP # [Done] Approve KIP # [Done] Update RPC to support KIP # [Done] Update controller to support KIP # [Done] Create CLI command (kafka-leader-election) that implement KIP # Search and replace any usage of “preferred” in the code # Add test for command # Add test for controller functionality # Revisit all of the documentation - generate and audit the new javadocs # Review PR # Merge PR > KIP-460 Admin Leader Election RPC > - > > Key: KAFKA-8286 > URL: https://issues.apache.org/jira/browse/KAFKA-8286 > Project: Kafka > Issue Type: New Feature > Components: admin, clients, core >Reporter: Jose Armando Garcia Sancio >Assignee: Jose Armando Garcia Sancio >Priority: Major > > Tracking issue for implementing KIP-460. Tasks: > # [Done] Design KIP > # [Done] Review KIP > # [Done] Approve KIP > # [Done] Update RPC to support KIP > # [Done] Update controller to support KIP > # [Done] Create CLI command (kafka-leader-election) that implement KIP > # [Done] Search and replace any usage of “preferred” in the code > # Add test for command > # Add test for controller functionality > # Revisit all of the documentation - generate and audit the new javadocs > # Deprecated since... needs to be update > # Review PR > # Merge PR -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics
Boyang Chen created KAFKA-8342: -- Summary: Admin tool to setup Kafka Stream topology (internal) topics Key: KAFKA-8342 URL: https://issues.apache.org/jira/browse/KAFKA-8342 Project: Kafka Issue Type: Bug Reporter: Boyang Chen We have seen customers who need to deploy their application to production environment but don't have access to create changelog and repartition topics. They need to ask admin team to manually create those topics before proceeding to start the actual stream job. We could add an admin tool to help them go through the process quicker by providing a command that could # Read through current stream topology # Create corresponding topics as needed, even including output topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling
[ https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836028#comment-16836028 ] WooYoung commented on KAFKA-8311: - Hello [~bchen225242]. I`m a new committer Kafka Project. I want to analyze this issue and to contribute to this project. Could you give me an assigner on this issue? > Better consumer timeout exception handling > --- > > Key: KAFKA-8311 > URL: https://issues.apache.org/jira/browse/KAFKA-8311 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Labels: newbie > > When stream application crashed due to underlying consumer commit timeout, we > have seen following gaps: > 1. The current timeout exception doesn't provide meaningful tuning > instructions. We should augment the error message to let user change > `default.api.timeout.ms` in order to tolerate longer reaction time. > 2. Currently we have 3 different types of consumers on KStream: > thread-consumer, global-consumer and restore-consumer. Although we don't plan > to explicitly handle this consumer timeout on stream level, we could wrap it > with more meaningful message either on consumer or stream level to let user > be aware which consumer is having trouble. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics
[ https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-8342: --- Labels: newbie (was: ) > Admin tool to setup Kafka Stream topology (internal) topics > --- > > Key: KAFKA-8342 > URL: https://issues.apache.org/jira/browse/KAFKA-8342 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > We have seen customers who need to deploy their application to production > environment but don't have access to create changelog and repartition topics. > They need to ask admin team to manually create those topics before proceeding > to start the actual stream job. We could add an admin tool to help them go > through the process quicker by providing a command that could > # Read through current stream topology > # Create corresponding topics as needed, even including output topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics
[ https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-8342: --- Issue Type: New Feature (was: Bug) > Admin tool to setup Kafka Stream topology (internal) topics > --- > > Key: KAFKA-8342 > URL: https://issues.apache.org/jira/browse/KAFKA-8342 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Priority: Major > > We have seen customers who need to deploy their application to production > environment but don't have access to create changelog and repartition topics. > They need to ask admin team to manually create those topics before proceeding > to start the actual stream job. We could add an admin tool to help them go > through the process quicker by providing a command that could > # Read through current stream topology > # Create corresponding topics as needed, even including output topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836033#comment-16836033 ] Dan Casey commented on KAFKA-7500: -- I'm excited to test this as well. We've been fighting with duplicates for a long time, and wasting a lot of resources dealing with duplicate detection and remediation. I have a few questions about the initial release. # Is this expected to work with kafka 1.1.1 clusters? This is currently the max version we can upgrade to without breaking support for vertica integration. # KIP-382 mentions that you will prefix remote topic names to avoid replication loops. Would this be configurable? I am already using a similar approach by adding the local datacenter as a suffix to my topics although it requires the producer to be datacenter aware, and additionally prevents the ability to create aggregate topics without help of app. Would be great to see options to transform destination topic names. > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Minor > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Weichu Liu updated KAFKA-8335: -- Attachment: segment.zip > Log cleaner skips Transactional mark and batch record, causing unlimited > growth of __consumer_offsets > - > > Key: KAFKA-8335 > URL: https://issues.apache.org/jira/browse/KAFKA-8335 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Priority: Major > Attachments: segment.zip > > > My Colleague Weichu already sent out a mail to kafka user mailing list > regarding this issue, but we think it's worth having a ticket tracking it. > We are using Kafka Streams with exactly-once enabled on a Kafka cluster for > a while. > Recently we found that the size of __consumer_offsets partitions grew huge. > Some partition went over 30G. This caused Kafka to take quite long to load > "__consumer_offsets" topic on startup (it loads the topic in order to > become group coordinator). > We dumped the __consumer_offsets segments and found that while normal > offset commits are nicely compacted, transaction records (COMMIT, etc) are > all preserved. Looks like that since these messages don't have a key, the > LogCleaner is keeping them all: > -- > $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /003484332061.log --key-decoder-class > kafka.serializer.StringDecoder 2>/dev/null | cat -v | head > Dumping 003484332061.log > Starting offset: 3484332061 > offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006 > producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 81 > offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005 > producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 84 > ... > -- > Streams is doing transaction commits per 100ms (commit.interval.ms=100 when > exactly-once) so the __consumer_offsets is growing really fast. > Is this (to keep all transactions) by design, or is that a bug for > LogCleaner? What would be the way to clean up the topic? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836061#comment-16836061 ] Weichu Liu commented on KAFKA-8335: --- Hi, I uploaded a sample segment here: [^segment.zip] And here is the broker setting on our Kafka {noformat} [2019-05-07 02:00:50,472] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null alter.config.policy.class.name = null alter.log.dirs.replication.quota.window.num = 11 alter.log.dirs.replication.quota.window.size.seconds = 1 authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 1 broker.id.generation.enable = true broker.rack = null client.quota.callback.class = null compression.type = producer connection.failed.authentication.delay.ms = 100 connections.max.idle.ms = 60 connections.max.reauth.ms = 0 control.plane.listener.name = null controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 3 create.topic.policy.class.name = null default.replication.factor = 3 delegation.token.expiry.check.interval.ms = 360 delegation.token.expiry.time.ms = 8640 delegation.token.master.key = null delegation.token.max.lifetime.ms = 60480 delete.records.purgatory.purge.interval.requests = 1 delete.topic.enable = true fetch.purgatory.purge.interval.requests = 1000 group.initial.rebalance.delay.ms = 3000 group.max.session.timeout.ms = 30 group.max.size = 2147483647 group.min.session.timeout.ms = 6000 host.name = inter.broker.listener.name = null inter.broker.protocol.version = 2.2-IV1 kafka.metrics.polling.interval.secs = 10 kafka.metrics.reporters = [] leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL listeners = PLAINTEXT://:9092,SSL://:9093 log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 8640 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /var/lib/kafka/data log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 6 log.flush.scheduler.interval.ms = 9223372036854775807 log.flush.start.offset.checkpoint.interval.ms = 6 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.downconversion.enable = true log.message.format.version = 2.2-IV1 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 30 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 6 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = max.incremental.fetch.session.cache.slots = 1000 message.max.bytes = 2097152 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 5 num.recovery.threads.per.data.dir = 1 num.replica.alter.log.dirs.threads = null num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 60 offsets.retention.minutes = 10080 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 3 offsets.topic.
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836058#comment-16836058 ] Ryanne Dolan commented on KAFKA-7500: - [~cloudfrog], glad you are taking a look. I'm looking forward to hearing about your experience. > Is this expected to work with kafka 1.1.1 clusters? Yes, I believe it will work with 0.11.0.0 or higher, but maybe you can test it to verify :) > will prefix remote topic names ... be configurable? Yes, you can implement your own ReplicationPolicy to define remote topics however you like: {code:java} replication.policy.class = my.SuffixReplicationPolicy {code} Also, MM2 doesn't care how existing source topics are named. If your topics are prefixed with their local DC (a common pattern), you can leave them as-is without breaking anything. By default you'd get topics like "dc1.topic1-dc1", so you might consider implementing a ReplicationPolicy that strips the suffix during replication so you get just "dc1.topic1". Ryanne > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Minor > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8339) At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition
[ https://issues.apache.org/jira/browse/KAFKA-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836065#comment-16836065 ] tdp commented on KAFKA-8339: Hi Matthias, Thanks for the quick reply! After digging into this a bit more, I think the issue is in our aggregation logic. We aggregate records and put the sub-aggregation result into the local state store. When the aggregation is "full" and the non-null value is returned by KSTREAM-TRANSFORMVALUES-NODE1, the transform node logic also deletes the sub-aggregation from the local state store. This is the flaw in our logic, since only the last record from topic T1 is consumed by the new stream thread and it does not see the sub-aggregation in the local state store since it was previously deleted. I was incorrectly assuming that the earlier records should have been re-consumed. The logs below explain in more detail. I also should have specified that we are using a customized exception handler that looks like this: {code:java} public class CustomLogAndFailExceptionHandler implements ProductionExceptionHandler, DeserializationExceptionHandler { // ... @Override public ProductionExceptionHandlerResponse handle(ProducerRecord record, Exception exception) { // ... return ProductionExceptionHandlerResponse.FAIL; } @Override public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord record, Exception exception) { // ... return DeserializationHandlerResponse.FAIL; } } {code} For clarity, here's some selected logs (with additional custom debug/info logs added): Record 1 (of 3) aggregated and sub-aggregation written to local state store but filtered out by KSTREAM-FILTER-NODE1: {noformat} 07 May 2019 14:03:10,951 [INFO] (StreamThread-18) org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [StreamThread-18] Polled record for topic TOPIC-T1 and partition 10 with key !0d81fc45-f485-4676-901f-6c1ced7042b0 and offset 5 for request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1. 07 May 2019 14:03:10,951 [INFO] (StreamThread-18) org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Adding record to task 1_10 for topic TOPIC-T1 and partition 10 with key !0d81fc45-f485-4676-901f-6c1ced7042b0 and offset 5 for request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1. 07 May 2019 14:03:10,951 [INFO] (StreamThread-18) org.apache.kafka.streams.processor.internals.SourceNode: KSTREAM-SOURCE-NODE1 consuming key !0d81fc45-f485-4676-901f-6c1ced7042b0 for request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1. 07 May 2019 14:03:10,953 [INFO] (StreamThread-18) org.apache.kafka.streams.processor.internals.ProcessorNode: KSTREAM-TRANSFORMVALUES-NODE1 processing key !0d81fc45-f485-4676-901f-6c1ced7042b0 for request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1. 07 May 2019 14:03:10,954 [INFO] 554a65bc-e0bd-486d-abfa-ed3a3ac75af1 10 (StreamThread-15) Aggregator: Processed '1' of '3' messages for 'c72d609c-2d8d-420f-99d0-b11593e32c981466642757'. 07 May 2019 14:03:10,955 [INFO] (StreamThread-18) org.apache.kafka.streams.processor.internals.ProcessorNode: KSTREAM-FILTER-NODE1 processing key !0d81fc45-f485-4676-901f-6c1ced7042b0 for request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1. {noformat} Record 2 (of 3) aggregated and sub-aggregation written to local state store but filtered out by KSTREAM-FILTER-NODE1: {noformat} 07 May 2019 14:03:10,969 [INFO] (StreamThread-18) org.apache.kafka.streams.processor.internals.StreamThread: stream-thread [StreamThread-18] Polled record for topic TOPIC-T1 and partition 10 with key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c and offset 6 for request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1. 07 May 2019 14:03:10,969 [INFO] (StreamThread-18) org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Adding record to task 1_10 for topic TOPIC-T1 and partition 10 with key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c and offset 6 for request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1. 07 May 2019 14:03:10,971 [INFO] (StreamThread-18) org.apache.kafka.streams.processor.internals.SourceNode: KSTREAM-SOURCE-NODE1 consuming key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c for request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1. 07 May 2019 14:03:10,974 [INFO] (StreamThread-18) org.apache.kafka.streams.processor.internals.ProcessorNode: KSTREAM-TRANSFORMVALUES-NODE1 processing key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c for request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1. 07 May 2019 14:03:10,978 [INFO] 554a65bc-e0bd-486d-abfa-ed3a3ac75af1 10 (StreamThread-15) Aggregator: Processed '2' of '3' messages for 'c72d609c-2d8d-420f-99d0-b11593e32c981466642757'. 07 May 2019 14:03:10,981 [INFO] (StreamThread-18) org.apache.kafka.streams.processor.internals.ProcessorNode: KSTREAM-FILTER-NODE1 processing key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c for request ID 554a65bc-e0bd-48
[jira] [Updated] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boquan Tang updated KAFKA-8335: --- Attachment: seg_april_25.zip > Log cleaner skips Transactional mark and batch record, causing unlimited > growth of __consumer_offsets > - > > Key: KAFKA-8335 > URL: https://issues.apache.org/jira/browse/KAFKA-8335 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Priority: Major > Attachments: seg_april_25.zip, segment.zip > > > My Colleague Weichu already sent out a mail to kafka user mailing list > regarding this issue, but we think it's worth having a ticket tracking it. > We are using Kafka Streams with exactly-once enabled on a Kafka cluster for > a while. > Recently we found that the size of __consumer_offsets partitions grew huge. > Some partition went over 30G. This caused Kafka to take quite long to load > "__consumer_offsets" topic on startup (it loads the topic in order to > become group coordinator). > We dumped the __consumer_offsets segments and found that while normal > offset commits are nicely compacted, transaction records (COMMIT, etc) are > all preserved. Looks like that since these messages don't have a key, the > LogCleaner is keeping them all: > -- > $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /003484332061.log --key-decoder-class > kafka.serializer.StringDecoder 2>/dev/null | cat -v | head > Dumping 003484332061.log > Starting offset: 3484332061 > offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006 > producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 81 > offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005 > producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 84 > ... > -- > Streams is doing transaction commits per 100ms (commit.interval.ms=100 when > exactly-once) so the __consumer_offsets is growing really fast. > Is this (to keep all transactions) by design, or is that a bug for > LogCleaner? What would be the way to clean up the topic? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling
[ https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836068#comment-16836068 ] Boyang Chen commented on KAFKA-8311: [~clearpal7] Thanks for interest! You need to send an email to [d...@kafka.apache.org|mailto:d...@kafka.apache.org] to get access to wiki and jira, and I will assign this ticket to you. > Better consumer timeout exception handling > --- > > Key: KAFKA-8311 > URL: https://issues.apache.org/jira/browse/KAFKA-8311 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > When stream application crashed due to underlying consumer commit timeout, we > have seen following gaps: > 1. The current timeout exception doesn't provide meaningful tuning > instructions. We should augment the error message to let user change > `default.api.timeout.ms` in order to tolerate longer reaction time. > 2. Currently we have 3 different types of consumers on KStream: > thread-consumer, global-consumer and restore-consumer. Although we don't plan > to explicitly handle this consumer timeout on stream level, we could wrap it > with more meaningful message either on consumer or stream level to let user > be aware which consumer is having trouble. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8284) Enable static membership on KStream
[ https://issues.apache.org/jira/browse/KAFKA-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-8284. Resolution: Fixed > Enable static membership on KStream > --- > > Key: KAFKA-8284 > URL: https://issues.apache.org/jira/browse/KAFKA-8284 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boquan Tang updated KAFKA-8335: --- Attachment: seg_april_25.zip > Log cleaner skips Transactional mark and batch record, causing unlimited > growth of __consumer_offsets > - > > Key: KAFKA-8335 > URL: https://issues.apache.org/jira/browse/KAFKA-8335 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Priority: Major > Attachments: seg_april_25.zip, segment.zip > > > My Colleague Weichu already sent out a mail to kafka user mailing list > regarding this issue, but we think it's worth having a ticket tracking it. > We are using Kafka Streams with exactly-once enabled on a Kafka cluster for > a while. > Recently we found that the size of __consumer_offsets partitions grew huge. > Some partition went over 30G. This caused Kafka to take quite long to load > "__consumer_offsets" topic on startup (it loads the topic in order to > become group coordinator). > We dumped the __consumer_offsets segments and found that while normal > offset commits are nicely compacted, transaction records (COMMIT, etc) are > all preserved. Looks like that since these messages don't have a key, the > LogCleaner is keeping them all: > -- > $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /003484332061.log --key-decoder-class > kafka.serializer.StringDecoder 2>/dev/null | cat -v | head > Dumping 003484332061.log > Starting offset: 3484332061 > offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006 > producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 81 > offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005 > producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 84 > ... > -- > Streams is doing transaction commits per 100ms (commit.interval.ms=100 when > exactly-once) so the __consumer_offsets is growing really fast. > Is this (to keep all transactions) by design, or is that a bug for > LogCleaner? What would be the way to clean up the topic? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boquan Tang updated KAFKA-8335: --- Attachment: (was: seg_april_25.zip) > Log cleaner skips Transactional mark and batch record, causing unlimited > growth of __consumer_offsets > - > > Key: KAFKA-8335 > URL: https://issues.apache.org/jira/browse/KAFKA-8335 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Priority: Major > Attachments: seg_april_25.zip, segment.zip > > > My Colleague Weichu already sent out a mail to kafka user mailing list > regarding this issue, but we think it's worth having a ticket tracking it. > We are using Kafka Streams with exactly-once enabled on a Kafka cluster for > a while. > Recently we found that the size of __consumer_offsets partitions grew huge. > Some partition went over 30G. This caused Kafka to take quite long to load > "__consumer_offsets" topic on startup (it loads the topic in order to > become group coordinator). > We dumped the __consumer_offsets segments and found that while normal > offset commits are nicely compacted, transaction records (COMMIT, etc) are > all preserved. Looks like that since these messages don't have a key, the > LogCleaner is keeping them all: > -- > $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /003484332061.log --key-decoder-class > kafka.serializer.StringDecoder 2>/dev/null | cat -v | head > Dumping 003484332061.log > Starting offset: 3484332061 > offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006 > producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 81 > offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005 > producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 84 > ... > -- > Streams is doing transaction commits per 100ms (commit.interval.ms=100 when > exactly-once) so the __consumer_offsets is growing really fast. > Is this (to keep all transactions) by design, or is that a bug for > LogCleaner? What would be the way to clean up the topic? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8311) Better consumer timeout exception handling
[ https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-8311: -- Assignee: (was: Boyang Chen) > Better consumer timeout exception handling > --- > > Key: KAFKA-8311 > URL: https://issues.apache.org/jira/browse/KAFKA-8311 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > When stream application crashed due to underlying consumer commit timeout, we > have seen following gaps: > 1. The current timeout exception doesn't provide meaningful tuning > instructions. We should augment the error message to let user change > `default.api.timeout.ms` in order to tolerate longer reaction time. > 2. Currently we have 3 different types of consumers on KStream: > thread-consumer, global-consumer and restore-consumer. Although we don't plan > to explicitly handle this consumer timeout on stream level, we could wrap it > with more meaningful message either on consumer or stream level to let user > be aware which consumer is having trouble. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836075#comment-16836075 ] Boquan Tang commented on KAFKA-8335: Hi [~hachikuji] thanks for replying. As Weichu commented we have log.cleaner.delete.retention.ms = 8640 which is one day. To better illustrate the suspected issue, I uploaded the full segment from April 25 [^seg_april_25.zip], which is 2 weeks ago from the time it was retrieved. log dump shows not only endTxnMarker is not deleted, the record batch metadata is also retained: {code:java} Dumping /home/boquan/Downloads/Users/boquan/Documents/003530931566.log Starting offset: 3530931566 baseOffset: 3530931566 lastOffset: 3530931567 count: 0 baseSequence: 0 lastSequence: 1 producerId: 4001 producerEpoch: 2539 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 0 CreateTime: 1556161832882 size: 61 magic: 2 compresscodec: NONE crc: 1683579819 isvalid: true baseOffset: 3530931575 lastOffset: 3530931575 count: 1 baseSequence: -1 lastSequence: -1 producerId: 4001 producerEpoch: 2539 partitionLeaderEpoch: 94 isTransactional: true isControl: true position: 61 CreateTime: 1556161832899 size: 78 magic: 2 compresscodec: NONE crc: 535474521 isvalid: true | offset: 3530931575 CreateTime: 1556161832899 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 84 baseOffset: 3530931576 lastOffset: 3530931577 count: 0 baseSequence: 0 lastSequence: 1 producerId: 3007 producerEpoch: 2516 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 139 CreateTime: 1556161832997 size: 61 magic: 2 compresscodec: NONE crc: 3760382141 isvalid: true baseOffset: 3530931578 lastOffset: 3530931579 count: 0 baseSequence: 0 lastSequence: 1 producerId: 1004 producerEpoch: 2576 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 200 CreateTime: 1556161832998 size: 61 magic: 2 compresscodec: NONE crc: 3285369041 isvalid: true baseOffset: 3530931580 lastOffset: 3530931581 count: 0 baseSequence: 0 lastSequence: 1 producerId: 1005 producerEpoch: 2545 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 261 CreateTime: 1556161832998 size: 61 magic: 2 compresscodec: NONE crc: 1698037918 isvalid: true baseOffset: 3530931582 lastOffset: 3530931583 count: 0 baseSequence: 0 lastSequence: 1 producerId: 3003 producerEpoch: 2529 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 322 CreateTime: 1556161832998 size: 61 magic: 2 compresscodec: NONE crc: 3446788505 isvalid: true baseOffset: 3530931584 lastOffset: 3530931585 count: 0 baseSequence: 0 lastSequence: 1 producerId: 3001 producerEpoch: 2486 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 383 CreateTime: 1556161832998 size: 61 magic: 2 compresscodec: NONE crc: 2245471394 isvalid: true baseOffset: 3530931586 lastOffset: 3530931587 count: 0 baseSequence: 0 lastSequence: 1 producerId: 3006 producerEpoch: 2503 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 444 CreateTime: 1556161832999 size: 61 magic: 2 compresscodec: NONE crc: 1819109301 isvalid: true baseOffset: 3530931588 lastOffset: 3530931588 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3007 producerEpoch: 2516 partitionLeaderEpoch: 94 isTransactional: true isControl: true position: 505 CreateTime: 1556161833001 size: 78 magic: 2 compresscodec: NONE crc: 2403915653 isvalid: true | offset: 3530931588 CreateTime: 1556161833001 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 95 baseOffset: 3530931589 lastOffset: 3530931589 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3003 producerEpoch: 2529 partitionLeaderEpoch: 94 isTransactional: true isControl: true position: 583 CreateTime: 1556161833004 size: 78 magic: 2 compresscodec: NONE crc: 4184380477 isvalid: true | offset: 3530931589 CreateTime: 1556161833004 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 95 {code} > Log cleaner skips Transactional mark and batch record, causing unlimited > growth of __consumer_offsets > - > > Key: KAFKA-8335 > URL: https://issues.apache.org/jira/browse/KAFKA-8335 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Priority: Major > Attachments: seg_april_25.zip, segment.zip > > > My Colleague Weichu already sent out a mail to kafka user mailing list > regarding this issue, but we think it's worth having a ticket tracking it. > We are using Kafka Streams with exactly-once enabled on a Kafka cluster for > a while. > Recently we found that the size of __consumer_offse
[jira] [Comment Edited] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836075#comment-16836075 ] Boquan Tang edited comment on KAFKA-8335 at 5/9/19 4:56 AM: Hi [~hachikuji] thanks for replying. As Weichu commented we have log.cleaner.delete.retention.ms = 8640 which is one day. To better illustrate the suspected issue, I uploaded the full segment from April 25 [^seg_april_25.zip], which is 2 weeks ago from the time it was retrieved. log dump shows not only endTxnMarker is not deleted, the record batch metadata is also retained: {code:java} Dumping /home/boquan/Downloads/Users/boquan/Documents/003530931566.log Starting offset: 3530931566 baseOffset: 3530931566 lastOffset: 3530931567 count: 0 baseSequence: 0 lastSequence: 1 producerId: 4001 producerEpoch: 2539 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 0 CreateTime: 1556161832882 size: 61 magic: 2 compresscodec: NONE crc: 1683579819 isvalid: true baseOffset: 3530931575 lastOffset: 3530931575 count: 1 baseSequence: -1 lastSequence: -1 producerId: 4001 producerEpoch: 2539 partitionLeaderEpoch: 94 isTransactional: true isControl: true position: 61 CreateTime: 1556161832899 size: 78 magic: 2 compresscodec: NONE crc: 535474521 isvalid: true | offset: 3530931575 CreateTime: 1556161832899 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 84 baseOffset: 3530931576 lastOffset: 3530931577 count: 0 baseSequence: 0 lastSequence: 1 producerId: 3007 producerEpoch: 2516 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 139 CreateTime: 1556161832997 size: 61 magic: 2 compresscodec: NONE crc: 3760382141 isvalid: true baseOffset: 3530931578 lastOffset: 3530931579 count: 0 baseSequence: 0 lastSequence: 1 producerId: 1004 producerEpoch: 2576 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 200 CreateTime: 1556161832998 size: 61 magic: 2 compresscodec: NONE crc: 3285369041 isvalid: true baseOffset: 3530931580 lastOffset: 3530931581 count: 0 baseSequence: 0 lastSequence: 1 producerId: 1005 producerEpoch: 2545 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 261 CreateTime: 1556161832998 size: 61 magic: 2 compresscodec: NONE crc: 1698037918 isvalid: true baseOffset: 3530931582 lastOffset: 3530931583 count: 0 baseSequence: 0 lastSequence: 1 producerId: 3003 producerEpoch: 2529 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 322 CreateTime: 1556161832998 size: 61 magic: 2 compresscodec: NONE crc: 3446788505 isvalid: true baseOffset: 3530931584 lastOffset: 3530931585 count: 0 baseSequence: 0 lastSequence: 1 producerId: 3001 producerEpoch: 2486 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 383 CreateTime: 1556161832998 size: 61 magic: 2 compresscodec: NONE crc: 2245471394 isvalid: true baseOffset: 3530931586 lastOffset: 3530931587 count: 0 baseSequence: 0 lastSequence: 1 producerId: 3006 producerEpoch: 2503 partitionLeaderEpoch: 94 isTransactional: true isControl: false position: 444 CreateTime: 1556161832999 size: 61 magic: 2 compresscodec: NONE crc: 1819109301 isvalid: true baseOffset: 3530931588 lastOffset: 3530931588 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3007 producerEpoch: 2516 partitionLeaderEpoch: 94 isTransactional: true isControl: true position: 505 CreateTime: 1556161833001 size: 78 magic: 2 compresscodec: NONE crc: 2403915653 isvalid: true | offset: 3530931588 CreateTime: 1556161833001 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 95 baseOffset: 3530931589 lastOffset: 3530931589 count: 1 baseSequence: -1 lastSequence: -1 producerId: 3003 producerEpoch: 2529 partitionLeaderEpoch: 94 isTransactional: true isControl: true position: 583 CreateTime: 1556161833004 size: 78 magic: 2 compresscodec: NONE crc: 4184380477 isvalid: true | offset: 3530931589 CreateTime: 1556161833004 keysize: 4 valuesize: 6 sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 95 {code} Is this intended? If so will all compact topic grow unlimited in the end? was (Author: boquan): Hi [~hachikuji] thanks for replying. As Weichu commented we have log.cleaner.delete.retention.ms = 8640 which is one day. To better illustrate the suspected issue, I uploaded the full segment from April 25 [^seg_april_25.zip], which is 2 weeks ago from the time it was retrieved. log dump shows not only endTxnMarker is not deleted, the record batch metadata is also retained: {code:java} Dumping /home/boquan/Downloads/Users/boquan/Documents/003530931566.log Starting offset: 3530931566 baseOffset: 3530931566 lastOffset: 3530931567 count: 0 baseSequence: 0 lastSequence: 1 producerId: 4001 producerEpoch: 2539 partitionLeaderEpoch: 94 isTransactional:
[jira] [Created] (KAFKA-8343) streams application crashed due to rocksdb
gaoshu created KAFKA-8343: - Summary: streams application crashed due to rocksdb Key: KAFKA-8343 URL: https://issues.apache.org/jira/browse/KAFKA-8343 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Environment: centos 7 jdk8 kafka-streams1.0 Reporter: gaoshu Attachments: fullsizeoutput_1.jpeg my streams application always crashed in few days. The crash log looks like [https://github.com/facebook/rocksdb/issues/5234|[https://github.com/facebook/rocksdb/issues/5234].] so I think it may because of RocksDBStore.java closed incorrectly in multithread. I look through the below code, it means the db.close() should after openiterators.close(). However, db.close() may be executed before iterators.close() due to instructions reorder. I hope my guess is correct. {code:java} // RocksDBStore.java @Override public synchronized void close() { if (!open) { return; } open = false; closeOpenIterators(); options.close(); wOptions.close(); fOptions.close(); db.close(); options = null; wOptions = null; fOptions = null; db = null; } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8343) streams application crashed due to rocksdb
[ https://issues.apache.org/jira/browse/KAFKA-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] gaoshu updated KAFKA-8343: -- Attachment: (was: fullsizeoutput_1.jpeg) > streams application crashed due to rocksdb > -- > > Key: KAFKA-8343 > URL: https://issues.apache.org/jira/browse/KAFKA-8343 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: centos 7 jdk8 kafka-streams1.0 >Reporter: gaoshu >Priority: Major > > my streams application always crashed in few days. The crash log looks like > [https://github.com/facebook/rocksdb/issues/5234|[https://github.com/facebook/rocksdb/issues/5234].] > so I think it may because of RocksDBStore.java closed incorrectly in > multithread. I look through the below code, it means the db.close() should > after openiterators.close(). However, db.close() may be executed before > iterators.close() due to instructions reorder. I hope my guess is correct. > {code:java} > // RocksDBStore.java > @Override > public synchronized void close() { > if (!open) { > return; > } > open = false; > closeOpenIterators(); > options.close(); > wOptions.close(); > fOptions.close(); > db.close(); > options = null; > wOptions = null; > fOptions = null; > db = null; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8343) streams application crashed due to rocksdb
[ https://issues.apache.org/jira/browse/KAFKA-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] gaoshu updated KAFKA-8343: -- Attachment: fullsizeoutput_6.jpeg > streams application crashed due to rocksdb > -- > > Key: KAFKA-8343 > URL: https://issues.apache.org/jira/browse/KAFKA-8343 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: centos 7 jdk8 kafka-streams1.0 >Reporter: gaoshu >Priority: Major > Attachments: fullsizeoutput_6.jpeg > > > my streams application always crashed in few days. The crash log looks like > [https://github.com/facebook/rocksdb/issues/5234|[https://github.com/facebook/rocksdb/issues/5234].] > so I think it may because of RocksDBStore.java closed incorrectly in > multithread. I look through the below code, it means the db.close() should > after openiterators.close(). However, db.close() may be executed before > iterators.close() due to instructions reorder. I hope my guess is correct. > {code:java} > // RocksDBStore.java > @Override > public synchronized void close() { > if (!open) { > return; > } > open = false; > closeOpenIterators(); > options.close(); > wOptions.close(); > fOptions.close(); > db.close(); > options = null; > wOptions = null; > fOptions = null; > db = null; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)