[jira] [Created] (KAFKA-6632) Very slow hashCode methods in Kafka Connect types
Maciej Bryński created KAFKA-6632: - Summary: Very slow hashCode methods in Kafka Connect types Key: KAFKA-6632 URL: https://issues.apache.org/jira/browse/KAFKA-6632 Project: Kafka Issue Type: Bug Affects Versions: 1.0.0 Reporter: Maciej Bryński hashCode method of ConnectSchema (and Field) is used a lot in SMT. Example: [https://github.com/apache/kafka/blob/e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L164] Unfortunately it's using Objects.hash which is very slow. I rewrite this to own implementation and gain 6x speedup. Microbencharks gives: * Original ConnectSchema hashCode: 2995ms * My implementation: 517ms (1 iterations of calculating: hashCode for on new ConnectSchema(Schema.Type.STRING)) {code:java} @Override public int hashCode() { int result = 5; result = 31 * result + type.hashCode(); result = 31 * result + (optional ? 1 : 0); result = 31 * result + (defaultValue == null ? 0 : defaultValue.hashCode()); if (fields != null) { for (Field f : fields) { result = 31 * result + f.hashCode(); } } result = 31 * result + (keySchema == null ? 0 : keySchema.hashCode()); result = 31 * result + (valueSchema == null ? 0 : valueSchema.hashCode()); result = 31 * result + (name == null ? 0 : name.hashCode()); result = 31 * result + (version == null ? 0 : version); result = 31 * result + (doc == null ? 0 : doc.hashCode()); if (parameters != null) { for (String s : parameters.keySet()) { result = 31 * result + s.hashCode() + parameters.get(s).hashCode(); } } return result; }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392692#comment-16392692 ] Khaireddine Rezgui commented on KAFKA-6535: --- Infinity == Integer.MAX_VALUE ? [~mjsax] > Set default retention ms for Streams repartition topics to infinity > --- > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6633) Is KafkaProducer still thread safe in version 1.0.1
Herbert Koelman created KAFKA-6633: -- Summary: Is KafkaProducer still thread safe in version 1.0.1 Key: KAFKA-6633 URL: https://issues.apache.org/jira/browse/KAFKA-6633 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 1.0.1 Reporter: Herbert Koelman The javadoc of version [0.8|https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] states that producers are thread safe: {quote}{{The producer is _thread safe_ and should generally be shared among all threads for best performance.}} {quote} Is it still the case in version 1.0.1 ? I failed to find this information in the javadoc of version 1.0.1. Can I share one producer with many threads ? (I posted this question as a bug, because I didn't know where else I could post questions. Sorry) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6633) Is KafkaProducer still thread safe in version 1.0.1
[ https://issues.apache.org/jira/browse/KAFKA-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392763#comment-16392763 ] Mickael Maison commented on KAFKA-6633: --- Yes it is still the case. And it's also still in the javadoc ! See [http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html] or [https://github.com/apache/kafka/blob/1.0.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L85-L86|https://github.com/apache/kafka/blob/1.0.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L85-L86] It's right at the top: {code:java} The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances. {code} > Is KafkaProducer still thread safe in version 1.0.1 > --- > > Key: KAFKA-6633 > URL: https://issues.apache.org/jira/browse/KAFKA-6633 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 >Reporter: Herbert Koelman >Priority: Minor > > The javadoc of version > [0.8|https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] > states that producers are thread safe: > {quote}{{The producer is _thread safe_ and should generally be shared among > all threads for best performance.}} > {quote} > Is it still the case in version 1.0.1 ? I failed to find this information in > the javadoc of version 1.0.1. > Can I share one producer with many threads ? > (I posted this question as a bug, because I didn't know where else I could > post questions. Sorry) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6633) Is KafkaProducer still thread safe in version 1.0.1
[ https://issues.apache.org/jira/browse/KAFKA-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-6633. --- Resolution: Not A Problem > Is KafkaProducer still thread safe in version 1.0.1 > --- > > Key: KAFKA-6633 > URL: https://issues.apache.org/jira/browse/KAFKA-6633 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 >Reporter: Herbert Koelman >Priority: Minor > > The javadoc of version > [0.8|https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] > states that producers are thread safe: > {quote}{{The producer is _thread safe_ and should generally be shared among > all threads for best performance.}} > {quote} > Is it still the case in version 1.0.1 ? I failed to find this information in > the javadoc of version 1.0.1. > Can I share one producer with many threads ? > (I posted this question as a bug, because I didn't know where else I could > post questions. Sorry) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6632) Very slow hashCode methods in Kafka Connect types
[ https://issues.apache.org/jira/browse/KAFKA-6632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maciej Bryński updated KAFKA-6632: -- Description: hashCode method of ConnectSchema (and Field) is used a lot in SMT and fromConnect. Example: [https://github.com/apache/kafka/blob/e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L164] Unfortunately it's using Objects.hash which is very slow. I rewrite this to own implementation and gain 6x speedup. Microbencharks gives: * Original ConnectSchema hashCode: 2995ms * My implementation: 517ms (1 iterations of calculating: hashCode for on new ConnectSchema(Schema.Type.STRING)) {code:java} @Override public int hashCode() { int result = 5; result = 31 * result + type.hashCode(); result = 31 * result + (optional ? 1 : 0); result = 31 * result + (defaultValue == null ? 0 : defaultValue.hashCode()); if (fields != null) { for (Field f : fields) { result = 31 * result + f.hashCode(); } } result = 31 * result + (keySchema == null ? 0 : keySchema.hashCode()); result = 31 * result + (valueSchema == null ? 0 : valueSchema.hashCode()); result = 31 * result + (name == null ? 0 : name.hashCode()); result = 31 * result + (version == null ? 0 : version); result = 31 * result + (doc == null ? 0 : doc.hashCode()); if (parameters != null) { for (Map.Entry e : parameters.entrySet()) { result = 31 * result + e.getKey().hashCode() + e.getValue().hashCode(); } } return result; }{code} was: hashCode method of ConnectSchema (and Field) is used a lot in SMT. Example: [https://github.com/apache/kafka/blob/e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L164] Unfortunately it's using Objects.hash which is very slow. I rewrite this to own implementation and gain 6x speedup. Microbencharks gives: * Original ConnectSchema hashCode: 2995ms * My implementation: 517ms (1 iterations of calculating: hashCode for on new ConnectSchema(Schema.Type.STRING)) {code:java} @Override public int hashCode() { int result = 5; result = 31 * result + type.hashCode(); result = 31 * result + (optional ? 1 : 0); result = 31 * result + (defaultValue == null ? 0 : defaultValue.hashCode()); if (fields != null) { for (Field f : fields) { result = 31 * result + f.hashCode(); } } result = 31 * result + (keySchema == null ? 0 : keySchema.hashCode()); result = 31 * result + (valueSchema == null ? 0 : valueSchema.hashCode()); result = 31 * result + (name == null ? 0 : name.hashCode()); result = 31 * result + (version == null ? 0 : version); result = 31 * result + (doc == null ? 0 : doc.hashCode()); if (parameters != null) { for (String s : parameters.keySet()) { result = 31 * result + s.hashCode() + parameters.get(s).hashCode(); } } return result; }{code} > Very slow hashCode methods in Kafka Connect types > - > > Key: KAFKA-6632 > URL: https://issues.apache.org/jira/browse/KAFKA-6632 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Maciej Bryński >Priority: Major > > hashCode method of ConnectSchema (and Field) is used a lot in SMT and > fromConnect. > Example: > [https://github.com/apache/kafka/blob/e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L164] > Unfortunately it's using Objects.hash which is very slow. > I rewrite this to own implementation and gain 6x speedup. > Microbencharks gives: > * Original ConnectSchema hashCode: 2995ms > * My implementation: 517ms > (1 iterations of calculating: hashCode for on new > ConnectSchema(Schema.Type.STRING)) > {code:java} > @Override > public int hashCode() { > int result = 5; > result = 31 * result + type.hashCode(); > result = 31 * result + (optional ? 1 : 0); > result = 31 * result + (defaultValue == null ? 0 : > defaultValue.hashCode()); > if (fields != null) { > for (Field f : fields) { > result = 31 * result + f.hashCode(); > } > } > result = 31 * result + (keySchema == null ? 0 : keySchema.hashCode()); > result = 31 * result + (valueSchema == null ? 0 : valueSchema.hashCode()); > result = 31 * result + (name == null ? 0 : name.hashCode()); > result = 31 * result + (version == null ? 0 : version); > result = 31 * result + (doc == null ? 0 : doc.hashCode()); > if (parameters != null) { > for (Map.Entry e : parameters.entrySet()) { > result = 31 * result + e.getKe
[jira] [Commented] (KAFKA-6632) Very slow hashCode methods in Kafka Connect types
[ https://issues.apache.org/jira/browse/KAFKA-6632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392790#comment-16392790 ] Maciej Bryński commented on KAFKA-6632: --- I checked also Struct Schemas. My implementation: 6026 Connect: 18880 > Very slow hashCode methods in Kafka Connect types > - > > Key: KAFKA-6632 > URL: https://issues.apache.org/jira/browse/KAFKA-6632 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Maciej Bryński >Priority: Major > > hashCode method of ConnectSchema (and Field) is used a lot in SMT and > fromConnect. > Example: > [https://github.com/apache/kafka/blob/e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L164] > Unfortunately it's using Objects.hash which is very slow. > I rewrite this to own implementation and gain 6x speedup. > Microbencharks gives: > * Original ConnectSchema hashCode: 2995ms > * My implementation: 517ms > (1 iterations of calculating: hashCode for on new > ConnectSchema(Schema.Type.STRING)) > {code:java} > @Override > public int hashCode() { > int result = 5; > result = 31 * result + type.hashCode(); > result = 31 * result + (optional ? 1 : 0); > result = 31 * result + (defaultValue == null ? 0 : > defaultValue.hashCode()); > if (fields != null) { > for (Field f : fields) { > result = 31 * result + f.hashCode(); > } > } > result = 31 * result + (keySchema == null ? 0 : keySchema.hashCode()); > result = 31 * result + (valueSchema == null ? 0 : valueSchema.hashCode()); > result = 31 * result + (name == null ? 0 : name.hashCode()); > result = 31 * result + (version == null ? 0 : version); > result = 31 * result + (doc == null ? 0 : doc.hashCode()); > if (parameters != null) { > for (Map.Entry e : parameters.entrySet()) { > result = 31 * result + e.getKey().hashCode() + > e.getValue().hashCode(); > } > } > return result; > }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6633) Is KafkaProducer still thread safe in version 1.0.1
[ https://issues.apache.org/jira/browse/KAFKA-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392845#comment-16392845 ] Herbert Koelman commented on KAFKA-6633: Thanks for this quick answear. > Is KafkaProducer still thread safe in version 1.0.1 > --- > > Key: KAFKA-6633 > URL: https://issues.apache.org/jira/browse/KAFKA-6633 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 >Reporter: Herbert Koelman >Priority: Minor > > The javadoc of version > [0.8|https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] > states that producers are thread safe: > {quote}{{The producer is _thread safe_ and should generally be shared among > all threads for best performance.}} > {quote} > Is it still the case in version 1.0.1 ? I failed to find this information in > the javadoc of version 1.0.1. > Can I share one producer with many threads ? > (I posted this question as a bug, because I didn't know where else I could > post questions. Sorry) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (KAFKA-6633) Is KafkaProducer still thread safe in version 1.0.1
[ https://issues.apache.org/jira/browse/KAFKA-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Herbert Koelman closed KAFKA-6633. -- > Is KafkaProducer still thread safe in version 1.0.1 > --- > > Key: KAFKA-6633 > URL: https://issues.apache.org/jira/browse/KAFKA-6633 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.1 >Reporter: Herbert Koelman >Priority: Minor > > The javadoc of version > [0.8|https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html] > states that producers are thread safe: > {quote}{{The producer is _thread safe_ and should generally be shared among > all threads for best performance.}} > {quote} > Is it still the case in version 1.0.1 ? I failed to find this information in > the javadoc of version 1.0.1. > Can I share one producer with many threads ? > (I posted this question as a bug, because I didn't know where else I could > post questions. Sorry) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()
[ https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5863: -- Description: Here is the call chain: {code} RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); {code} In httpRequest(): {code} } else if (responseCode >= 200 && responseCode < 300) { InputStream is = connection.getInputStream(); T result = JSON_SERDE.readValue(is, responseFormat); {code} For readValue(): {code} public T readValue(InputStream src, TypeReference valueTypeRef) throws IOException, JsonParseException, JsonMappingException { return (T) _readMapAndClose(_jsonFactory.createParser(src), _typeFactory.constructType(valueTypeRef)); {code} Then there would be NPE in constructType(): {code} public JavaType constructType(TypeReference typeRef) { // 19-Oct-2015, tatu: Simpler variant like so should work return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); {code} was: Here is the call chain: {code} RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); {code} In httpRequest(): {code} } else if (responseCode >= 200 && responseCode < 300) { InputStream is = connection.getInputStream(); T result = JSON_SERDE.readValue(is, responseFormat); {code} For readValue(): {code} public T readValue(InputStream src, TypeReference valueTypeRef) throws IOException, JsonParseException, JsonMappingException { return (T) _readMapAndClose(_jsonFactory.createParser(src), _typeFactory.constructType(valueTypeRef)); {code} Then there would be NPE in constructType(): {code} public JavaType constructType(TypeReference typeRef) { // 19-Oct-2015, tatu: Simpler variant like so should work return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); {code} > Potential null dereference in DistributedHerder#reconfigureConnector() > -- > > Key: KAFKA-5863 > URL: https://issues.apache.org/jira/browse/KAFKA-5863 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ted Yu >Priority: Minor > > Here is the call chain: > {code} > RestServer.httpRequest(reconfigUrl, "POST", > taskProps, null); > {code} > In httpRequest(): > {code} > } else if (responseCode >= 200 && responseCode < 300) { > InputStream is = connection.getInputStream(); > T result = JSON_SERDE.readValue(is, responseFormat); > {code} > For readValue(): > {code} > public T readValue(InputStream src, TypeReference valueTypeRef) > throws IOException, JsonParseException, JsonMappingException > { > return (T) _readMapAndClose(_jsonFactory.createParser(src), > _typeFactory.constructType(valueTypeRef)); > {code} > Then there would be NPE in constructType(): > {code} > public JavaType constructType(TypeReference typeRef) > { > // 19-Oct-2015, tatu: Simpler variant like so should work > return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393169#comment-16393169 ] Bill Bejeck commented on KAFKA-6399: I'm +1 as well for reducing the {{max.poll.intervall.ms config }}from{{ }}{{Integer.MAX_VALUE}}, I'm not sure about 30 seconds. My reasoning is we should still set the value to something more conservative (5 minutes is okay, I don't have another suggestion), as a hedge because we can't accurately predict user patterns concerning processing data. But we can add to the documentation the reason why we had it set to {{Integer.MAX_VALUE}} in the first place and what we've done to improve the rebalance process, and suggest that users reduce it further if need be. > Consider reducing "max.poll.interval.ms" default for Kafka Streams > -- > > Key: KAFKA-6399 > URL: https://issues.apache.org/jira/browse/KAFKA-6399 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Minor > > In Kafka {{0.10.2.1}} we change the default value of > {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The > reason was that long state restore phases during rebalance could yield > "rebalance storms" as consumers drop out of a consumer group even if they are > healthy as they didn't call {{poll()}} during state restore phase. > In version {{0.11}} and {{1.0}} the state restore logic was improved a lot > and thus, now Kafka Streams does call {{poll()}} even during restore phase. > Therefore, we might consider setting a smaller timeout for > {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications > (ie, targeting user code) that don't make progress any more during regular > operations. > The open question would be, what a good default might be. Maybe the actual > consumer default of 30 seconds might be sufficient. During one {{poll()}} > roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a > single batch of records. This should take way less time than 30 seconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords
[ https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393217#comment-16393217 ] Ewen Cheslack-Postava commented on KAFKA-6626: -- It cannot be a regular map, see the comment right above that field: https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L79-L80 > Performance bottleneck in Kafka Connect sendRecords > --- > > Key: KAFKA-6626 > URL: https://issues.apache.org/jira/browse/KAFKA-6626 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 >Reporter: Maciej Bryński >Priority: Major > Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png > > > Kafka Connect is using IdentityHashMap for storing records. > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239] > Unfortunately this solution is very slow (2-4 times slower than normal > HashMap / HashSet). > Benchmark result (code in attachment). > {code:java} > Identity 4220 > Set 2115 > Map 1941 > Fast Set 2121 > {code} > Things are even worse when using default GC configuration > (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true) > {code:java} > Identity 7885 > Set 2364 > Map 1548 > Fast Set 1520 > {code} > Java version > {code:java} > java version "1.8.0_152" > Java(TM) SE Runtime Environment (build 1.8.0_152-b16) > Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode) > {code} > This problem is greatly slowing Kafka Connect. > !image-2018-03-08-08-35-19-247.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393415#comment-16393415 ] John Roesler commented on KAFKA-6474: - Hi Filipe, I have been working on a similar task (KAFKA-6473) and discovered something that will probably be a problem for you. This task requires the streams project to have a test dependency on the stream:test-utils project, but streams:test-utils already has a compile dependency on the streams project. I'm no Gradle expert, but as far as I can tell, there's no way to break this circular dependency, at least without doing something exotic in the gradle config. We had a discussion in this mailing list thread: [[DISCUSS] KIP-267|[http://mail-archives.apache.org/mod_mbox/kafka-dev/201803.mbox/%3CCAAyirGsovAzRMLa91nd6rzceQgEcNcLMt7ZrXVN7M1Psj4jCmQ%40mail.gmail.com%3E]|http://mail-archives.apache.org/mod_mbox/kafka-dev/201803.mbox/%3CCAAyirGsovAzRMLa91nd6rzceQgEcNcLMt7ZrXVN7M1Psj4jCmQ%40mail.gmail.com%3E].] . Here's what we settled on: {quote}I would propose we restructure the streams directory thusly: streams/ (artifact name := "streams", the actual streams code lives here) - test-utils/ (this is the current test-utils artifact, depends on "streams") - tests/ (new module, depends on "streams" and "test-utils", *NO published artifact*) This gets us out of the circular dependency without having to engage in any Gradle shenanigans while preserving "test-utils" as a separate artifact. This is good because: 1) the test-utils don't need to be in production code, so it's nice to have a separate artifact, 2) test-utils is already public in 1.1, and it's a bummer to introduce users' code when we can so easily avoid it.{quote} Another result of the discussion is that I'm actually going to side-step this issue for KAFKA-6473, so I won't be doing any restructuring in the course of my work. I'm just sharing these ideas with you for your context. Hope you're well! -John > Rewrite test to use new public TopologyTestDriver > - > > Key: KAFKA-6474 > URL: https://issues.apache.org/jira/browse/KAFKA-6474 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Filipe Agapito >Priority: Major > Labels: beginner, newbie > > With KIP-247 we added public TopologyTestDriver. We should rewrite out own > test to use this new test driver and remove the two classes > ProcessorTopoogyTestDriver and KStreamTestDriver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393444#comment-16393444 ] John Roesler commented on KAFKA-6474: - Also worth mentioning is [~guozhang]'s reply: {quote}I think I agree with your proposed changes, in fact in order to not scatter the test classes in two places maybe it's better to move all of them to the new module. One caveat is that it will make streams' project hierarchy inconsistent with other projects where the unit test classes are maintained inside the main artifact package, but I think it is a good cost to pay, plus once we start publishing test-util artifacts for other projects like client and connect, we may face the same issue and need to do this refactoring as well. {quote} > Rewrite test to use new public TopologyTestDriver > - > > Key: KAFKA-6474 > URL: https://issues.apache.org/jira/browse/KAFKA-6474 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Filipe Agapito >Priority: Major > Labels: beginner, newbie > > With KIP-247 we added public TopologyTestDriver. We should rewrite out own > test to use this new test driver and remove the two classes > ProcessorTopoogyTestDriver and KStreamTestDriver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393457#comment-16393457 ] Matthias J. Sax commented on KAFKA-6535: Yes. It's a long though (not and integer). We should pass this config when creating repartitions topics (cf. {{InternalTopicManager}}). > Set default retention ms for Streams repartition topics to infinity > --- > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393486#comment-16393486 ] Matthias J. Sax commented on KAFKA-6399: Thanks for the feedback. It's a tricky question and I am personally not sure what I prefer. My thinking is as follows: initially, we used 30 seconds what was too short because of store restore time. Since we set it to MAX_VALUE, I cannot remember any use issues related to the config. Thus, it might even be ok to keep the default at MAX_VALUE. If we still need MAX_VALUEis questionable though as we moved the restore code into the mail loop and got rid of the root cause that forces us to set it to MAX_VALUE. However, because I can't remember any issues with MAX_VALUE, even if we don't need this high value, it seems to work in practice. We know from some user reports, that processing time can vary largely, thus, even is we set it to 5 Minutes, it would bit some users if they don't increase the setting. Keeping MAX_VALUE would be a safe bet for this case. However, I am a little concerned about a bad behaving app that never times out if the default is MAX_VALUE: users code could loop infinitely for example. Long story short: I think it boils down to the question if we either want to make sure the default settings are robust with regard to "make progress" or if the default setting should be more "error sensitive". I guess, for most cases, uses want/should to adjust this value anyway independently what default we choose (either some user need to increase or other users should decrease to enable error detection in the first place). > Consider reducing "max.poll.interval.ms" default for Kafka Streams > -- > > Key: KAFKA-6399 > URL: https://issues.apache.org/jira/browse/KAFKA-6399 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.0 >Reporter: Matthias J. Sax >Priority: Minor > > In Kafka {{0.10.2.1}} we change the default value of > {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The > reason was that long state restore phases during rebalance could yield > "rebalance storms" as consumers drop out of a consumer group even if they are > healthy as they didn't call {{poll()}} during state restore phase. > In version {{0.11}} and {{1.0}} the state restore logic was improved a lot > and thus, now Kafka Streams does call {{poll()}} even during restore phase. > Therefore, we might consider setting a smaller timeout for > {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications > (ie, targeting user code) that don't make progress any more during regular > operations. > The open question would be, what a good default might be. Maybe the actual > consumer default of 30 seconds might be sufficient. During one {{poll()}} > roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a > single batch of records. This should take way less time than 30 seconds. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6627) Console producer default config values override explicitly provided properties
[ https://issues.apache.org/jira/browse/KAFKA-6627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandor Murakozi reassigned KAFKA-6627: -- Assignee: Sandor Murakozi > Console producer default config values override explicitly provided properties > -- > > Key: KAFKA-6627 > URL: https://issues.apache.org/jira/browse/KAFKA-6627 > Project: Kafka > Issue Type: Bug > Components: tools >Reporter: Jason Gustafson >Assignee: Sandor Murakozi >Priority: Minor > Labels: newbie > > Some producer properties can be provided through custom parameters (e.g. > {{\-\-request-required-acks}}) and explicitly through > {{\-\-producer-property}}. At the moment, some of the custom parameters have > default values which actually override explicitly provided properties. For > example, if you set {{\-\-producer-property acks=all}} when starting the > console producer, the argument will be ignored since > {{\-\-request-required-acks}} has a default value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity
[ https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393587#comment-16393587 ] John Roesler commented on KAFKA-6535: - I don't think it's that important in this context, but it's worth generally bearing in mind that a MAX_INT number of milliseconds is about 25 days. So this proposal is more like "set default retention to max allowed value" > Set default retention ms for Streams repartition topics to infinity > --- > > Key: KAFKA-6535 > URL: https://issues.apache.org/jira/browse/KAFKA-6535 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip, newbie > > After KIP-220 / KIP-204, repartition topics in Streams are transient, so it > is better to set its default retention to infinity to allow any records be > pushed to it with old timestamps (think: bootstrapping, re-processing) and > just rely on the purging API to keeping its storage small. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6631) Kafka Streams - Rebalancing exception in Kafka 1.0.0
[ https://issues.apache.org/jira/browse/KAFKA-6631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393680#comment-16393680 ] Guozhang Wang commented on KAFKA-6631: -- Hello Alexander, Thanks for reporting this issue; looking through the logs I think the root cause is that the assignment record that is trying to be appended to the offset topic is larger than the max.record.size, by default it's 1MB. The reason you would not see it with one client is that with more clients, more bytes would be needed to encode the assignment metadata in the SyncGroup request, and hence eventually it will exceed the limit. Normally 1MB should be an appropriate limit, and since you mention seeing this issue only with three clients, I'm wondering if you have many partitions for your input topics? Note that the encoding bytes are linear with the total number of partitions as well. To walk around this issue, try to set a larger value for the "__consumer_offset" topic's "max.message.bytes" config (note this is a per-topic config). > Kafka Streams - Rebalancing exception in Kafka 1.0.0 > > > Key: KAFKA-6631 > URL: https://issues.apache.org/jira/browse/KAFKA-6631 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: Container Linux by CoreOS 1576.5.0 >Reporter: Alexander Ivanichev >Priority: Critical > > > In Kafka Streams 1.0.0, we saw a strange rebalance error, our stream app > performs window based aggregations, sometimes on start when all stream > workers join the app just crash, however if we enable only one worker than > it works fine, sometime 2 workers work just fine, but when third join the app > crashes again, some critical issue with rebalance. > {code:java} > 018-03-08T18:51:01.226243000Z org.apache.kafka.common.KafkaException: > Unexpected error from SyncGroup: The server experienced an unexpected error > when processing the request > 2018-03-08T18:51:01.226557000Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566) > 2018-03-08T18:51:01.22686Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539) > 2018-03-08T18:51:01.227328000Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) > 2018-03-08T18:51:01.22763Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) > 2018-03-08T18:51:01.228152000Z at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > 2018-03-08T18:51:01.228449000Z at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > 2018-03-08T18:51:01.228897000Z at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > 2018-03-08T18:51:01.229196000Z at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506) > 2018-03-08T18:51:01.229673000Z at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > 2018-03-08T18:51:01.229971000Z at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268) > 2018-03-08T18:51:01.230436000Z at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) > 2018-03-08T18:51:01.230749000Z at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174) > 2018-03-08T18:51:01.231065000Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) > 2018-03-08T18:51:01.231584000Z at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > 2018-03-08T18:51:01.231911000Z at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) > 2018-03-08T18:51:01.23219Z at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138) > 2018-03-08T18:51:01.232643000Z at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) > 2018-03-08T18:51:01.233121000Z at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) > 2018-03-08T18:51:01.233409000Z at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > 2018-03-08T18:51:01.23372Z at > org.apache.kafka.streams
[jira] [Commented] (KAFKA-6622) GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly
[ https://issues.apache.org/jira/browse/KAFKA-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393695#comment-16393695 ] ASF GitHub Bot commented on KAFKA-6622: --- hachikuji closed pull request #4661: KAFKA-6622 - fix performance issue in parsing consumer offsets URL: https://github.com/apache/kafka/pull/4661 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 3b79544a502..63af1cb0ce9 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -555,8 +555,11 @@ class GroupMetadataManager(brokerId: Int, } pendingOffsets.remove(batch.producerId) } else { + var batchBaseOffset: Option[Long] = None for (record <- batch.asScala) { require(record.hasKey, "Group metadata/offset entry key should not be null") +if (batchBaseOffset.isEmpty) + batchBaseOffset = Some(record.offset) GroupMetadataManager.readMessageKey(record.key) match { case offsetKey: OffsetKey => @@ -573,9 +576,9 @@ class GroupMetadataManager(brokerId: Int, } else { val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value) if (isTxnOffsetCommit) - pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata)) + pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata)) else -loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata)) +loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata)) } case groupMetadataKey: GroupMetadataKey => This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly > -- > > Key: KAFKA-6622 > URL: https://issues.apache.org/jira/browse/KAFKA-6622 > Project: Kafka > Issue Type: Bug >Reporter: radai rosenblatt >Assignee: radai rosenblatt >Priority: Major > Attachments: kafka batch iteration funtime.png > > > when reading records from a consumer offsets batch, the entire batch is > decompressed multiple times (per record) as part of calling > `batch.baseOffset`. this is a very expensive operation being called in a loop > for no reason: > !kafka batch iteration funtime.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6622) GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly
[ https://issues.apache.org/jira/browse/KAFKA-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6622. Resolution: Fixed Fix Version/s: 1.1.0 > GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly > -- > > Key: KAFKA-6622 > URL: https://issues.apache.org/jira/browse/KAFKA-6622 > Project: Kafka > Issue Type: Bug >Reporter: radai rosenblatt >Assignee: radai rosenblatt >Priority: Major > Fix For: 1.1.0 > > Attachments: kafka batch iteration funtime.png > > > when reading records from a consumer offsets batch, the entire batch is > decompressed multiple times (per record) as part of calling > `batch.baseOffset`. this is a very expensive operation being called in a loop > for no reason: > !kafka batch iteration funtime.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6264) Log cleaner thread may die on legacy segment containing messages whose offsets are too large
[ https://issues.apache.org/jira/browse/KAFKA-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah reassigned KAFKA-6264: --- Assignee: Dhruvil Shah (was: Jiangjie Qin) > Log cleaner thread may die on legacy segment containing messages whose > offsets are too large > > > Key: KAFKA-6264 > URL: https://issues.apache.org/jira/browse/KAFKA-6264 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.2.1, 1.0.0, 0.11.0.2 >Reporter: Jiangjie Qin >Assignee: Dhruvil Shah >Priority: Critical > Fix For: 1.2.0 > > > We encountered a problem that some of the legacy log segments contains > messages whose offsets are larger than {{SegmentBaseOffset + Int.MaxValue}}. > Prior to 0.10.2.0, we do not assert the offset of the messages when appending > them to the log segments. Due to KAFKA-5413, the log cleaner may append > messages whose offset is greater than {{base_offset + Int.MaxValue}} into the > segment during the log compaction. > After the brokers are upgraded, those log segments cannot be compacted > anymore because the compaction will fail immediately due to the offset range > assertion we added to the LogSegment. > We have seen this issue in the __consumer_offsets topic so it could be a > general problem. There is no easy solution for the users to recover from this > case. > One solution is to split such log segments in the log cleaner once it sees a > message with problematic offset and append those messages to a separate log > segment with a larger base_offset. > Due to the impact of the issue. We may want to consider backporting the fix > to previous affected versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6634) Delay initiating the txn on producers until initializeTopology with EOS turned on
Guozhang Wang created KAFKA-6634: Summary: Delay initiating the txn on producers until initializeTopology with EOS turned on Key: KAFKA-6634 URL: https://issues.apache.org/jira/browse/KAFKA-6634 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Assignee: Guozhang Wang In Streams EOS implementation, the created producers for tasks will initiate a txn immediately after being created in the constructor of `StreamTask`. However, the task may not process any data and hence producer may not send any records for that started txn for a long time because of the restoration process. And with default txn.session.timeout valued at 60 seconds, it means that if the restoration takes more than that amount of time, upon starting the producer will immediately get the error that its producer epoch is already old. To fix this, we should consider instantiating the txn only after the restoration phase is done. Although this may have a caveat that if the producer is already fenced, it will not be notified until then, in initializeTopology. But I think this should not be a correctness issue since during the restoration process we do not make any changes to the processing state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5846) Use singleton NoOpConsumerRebalanceListener in subscribe() call where listener is not specified
[ https://issues.apache.org/jira/browse/KAFKA-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191405#comment-16191405 ] Ted Yu edited comment on KAFKA-5846 at 3/10/18 12:42 AM: - Patch looks good to me. was (Author: yuzhih...@gmail.com): Patch looks good to me . > Use singleton NoOpConsumerRebalanceListener in subscribe() call where > listener is not specified > --- > > Key: KAFKA-5846 > URL: https://issues.apache.org/jira/browse/KAFKA-5846 > Project: Kafka > Issue Type: Task > Components: clients >Reporter: Ted Yu >Assignee: Kamal Chandraprakash >Priority: Minor > > Currently KafkaConsumer creates instance of NoOpConsumerRebalanceListener for > each subscribe() call where ConsumerRebalanceListener is not specified: > {code} > public void subscribe(Pattern pattern) { > subscribe(pattern, new NoOpConsumerRebalanceListener()); > {code} > We can create a singleton NoOpConsumerRebalanceListener to be used in such > scenarios. -- This message was sent by Atlassian JIRA (v7.6.3#76005)