[jira] [Resolved] (KAFKA-6396) Possibly kafka-connect converter should be able to stop processing chain
[ https://issues.apache.org/jira/browse/KAFKA-6396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexander Koval resolved KAFKA-6396. Resolution: Invalid Thank you for the explanation. > Possibly kafka-connect converter should be able to stop processing chain > > > Key: KAFKA-6396 > URL: https://issues.apache.org/jira/browse/KAFKA-6396 > Project: Kafka > Issue Type: Wish > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Alexander Koval >Priority: Minor > > At present only transformations can discard records returning null. But I > think sometimes it would be nice to discard processing chain after converting > message. For example I have some tags shipped with a message key and I want > to stop processing the message after converting its key (there are a lot of > messages and I don't want to deserialize message values that I don't need). > At the moment to do that I should disable converters and move message > deserializing to the transformation chain: > {code} > key.converter=org.apache.kafka.connect.converters.ByteArrayConverter > value.converter=org.apache.kafka.connect.converters.ByteArrayConverter > transforms=proto,catalog > transforms.proto.type=company.evo.kafka.ProtobufTransformation > transforms.proto.key.protobuf.class=company.evo.uaprom.indexator.KeyProto$KeyMessage > transforms.proto.value.protobuf.class=company.evo.uaprom.indexator.catalog.CompanyProto$UniversalCompanyMessage > transforms.proto.tag=catalog > {code} > If > [WorkerSinkTask|https://github.com/apache/kafka/blob/1.0.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L453] > checked converted values on {{null}} it would solved my problem more > gracefully -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382069#comment-16382069 ] Filipe Agapito commented on KAFKA-6474: --- I'd like to work on this. I'm assigning the issue to myself. > Rewrite test to use new public TopologyTestDriver > - > > Key: KAFKA-6474 > URL: https://issues.apache.org/jira/browse/KAFKA-6474 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Priority: Major > Labels: beginner, newbie > > With KIP-247 we added public TopologyTestDriver. We should rewrite out own > test to use this new test driver and remove the two classes > ProcessorTopoogyTestDriver and KStreamTestDriver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver
[ https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Filipe Agapito reassigned KAFKA-6474: - Assignee: Filipe Agapito > Rewrite test to use new public TopologyTestDriver > - > > Key: KAFKA-6474 > URL: https://issues.apache.org/jira/browse/KAFKA-6474 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Affects Versions: 1.1.0 >Reporter: Matthias J. Sax >Assignee: Filipe Agapito >Priority: Major > Labels: beginner, newbie > > With KIP-247 we added public TopologyTestDriver. We should rewrite out own > test to use this new test driver and remove the two classes > ProcessorTopoogyTestDriver and KStreamTestDriver. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators
[ https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6560. -- Resolution: Fixed Fix Version/s: 1.2.0 > Use single-point queries than range queries for windowed aggregation operators > -- > > Key: KAFKA-6560 > URL: https://issues.apache.org/jira/browse/KAFKA-6560 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Critical > Labels: needs-kip > Fix For: 1.2.0 > > > Today for windowed aggregations in Streams DSL, the underlying implementation > is leveraging the fetch(key, from, to) API to get all the related windows for > a single record to update. However, this is a very inefficient operation with > significant amount of CPU time iterating over window stores. On the other > hand, since the operator implementation itself have full knowledge of the > window specs it can actually translate this operation into multiple > single-point queries with the accurate window start timestamp, which would > largely reduce the overhead. > The proposed approach is to add a single fetch API to the WindowedStore and > use that in the KStreamWindowedAggregate / KStreamWindowedReduce operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators
[ https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382347#comment-16382347 ] ASF GitHub Bot commented on KAFKA-6560: --- guozhangwang closed pull request #4578: KAFKA-6560: Replace range query with newly added single point query in Windowed Aggregation URL: https://github.com/apache/kafka/pull/4578 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index ec26866bd22..27f8408320f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.kstream.Window; @@ -39,7 +38,10 @@ private boolean sendOldValues = false; -public KStreamWindowAggregate(Windows windows, String storeName, Initializer initializer, Aggregator aggregator) { +KStreamWindowAggregate(final Windows windows, + final String storeName, + final Initializer initializer, + final Aggregator aggregator) { this.windows = windows; this.storeName = storeName; this.initializer = initializer; @@ -63,7 +65,7 @@ public void enableSendingOldValues() { @SuppressWarnings("unchecked") @Override -public void init(ProcessorContext context) { +public void init(final ProcessorContext context) { super.init(context); windowStore = (WindowStore) context.getStateStore(storeName); @@ -71,54 +73,27 @@ public void init(ProcessorContext context) { } @Override -public void process(K key, V value) { +public void process(final K key, final V value) { // if the key is null, we do not need proceed aggregating the record // the record with the table if (key == null) return; // first get the matching windows -long timestamp = context().timestamp(); -Map matchedWindows = windows.windowsFor(timestamp); +final long timestamp = context().timestamp(); +final Map matchedWindows = windows.windowsFor(timestamp); -long timeFrom = Long.MAX_VALUE; -long timeTo = Long.MIN_VALUE; +// try update the window, and create the new window for the rest of unmatched window that do not exist yet +for (final Map.Entry entry : matchedWindows.entrySet()) { +T oldAgg = windowStore.fetch(key, entry.getKey()); -// use range query on window store for efficient reads -for (long windowStartMs : matchedWindows.keySet()) { -timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom; -timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; -} - -try (WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo)) { - -// for each matching window, try to update the corresponding key -while (iter.hasNext()) { -KeyValue entry = iter.next(); -W window = matchedWindows.get(entry.key); - -if (window != null) { - -T oldAgg = entry.value; - -if (oldAgg == null) -oldAgg = initializer.apply(); - -// try to add the new value (there will never be old value) -T newAgg = aggregator.apply(key, value, oldAgg); - -// update the store with the new value -windowStore.put(key, newAgg, window.start()); -tupleForwarder.maybeForward(new Windowed<>(key, window), newAgg, oldAgg); -matchedWindows.remove(entry.key); -} +if (oldAgg == null) { +oldAgg = initializer.apply(); } -} -// create the new window for the rest of unmatched window that do not exist yet -for (Map.Entry entry : matchedWindows.entrySet()) { -T o
[jira] [Assigned] (KAFKA-6602) Support Kafka to save credentials in Java Key Store on Zookeeper node
[ https://issues.apache.org/jira/browse/KAFKA-6602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chen He reassigned KAFKA-6602: -- Assignee: Chen He > Support Kafka to save credentials in Java Key Store on Zookeeper node > - > > Key: KAFKA-6602 > URL: https://issues.apache.org/jira/browse/KAFKA-6602 > Project: Kafka > Issue Type: New Feature > Components: security >Reporter: Chen He >Assignee: Chen He >Priority: Major > > Kafka connect needs to talk to multifarious distributed systems. However, > each system has its own authentication mechanism. How we manage these > credentials become a common problem. > Here are my thoughts: > # We may need to save it in java key store; > # We may need to put this key store in a distributed system (topic or > zookeeper); > # Key store password may be configured in Kafka configuration; > I have implement the feature that allows store java key store in zookeeper > node. If Kafka community likes this idea, I am happy to contribute. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6602) Support Kafka to save credentials in Java Key Store on Zookeeper node
Chen He created KAFKA-6602: -- Summary: Support Kafka to save credentials in Java Key Store on Zookeeper node Key: KAFKA-6602 URL: https://issues.apache.org/jira/browse/KAFKA-6602 Project: Kafka Issue Type: New Feature Components: security Reporter: Chen He Kafka connect needs to talk to multifarious distributed systems. However, each system has its own authentication mechanism. How we manage these credentials become a common problem. Here are my thoughts: # We may need to save it in java key store; # We may need to put this key store in a distributed system (topic or zookeeper); # Key store password may be configured in Kafka configuration; I have implement the feature that allows store java key store in zookeeper node. If Kafka community likes this idea, I am happy to contribute. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6593) Coordinator disconnect in heartbeat thread can cause commitSync to block indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-6593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382469#comment-16382469 ] ASF GitHub Bot commented on KAFKA-6593: --- hachikuji closed pull request #4625: KAFKA-6593; Fix livelock with consumer heartbeat thread in commitSync URL: https://github.com/apache/kafka/pull/4625 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 6884ff0dff7..2daadddee63 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -231,7 +231,7 @@ protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long tim } else if (coordinator != null && client.connectionFailed(coordinator)) { // we found the coordinator, but the connection has failed, so mark // it dead and backoff before retrying discovery -coordinatorDead(); +markCoordinatorUnknown(); time.sleep(retryBackoffMs); } @@ -487,7 +487,7 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { // re-discover the coordinator and retry with backoff -coordinatorDead(); +markCoordinatorUnknown(); log.debug("Attempt to join group failed due to obsolete coordinator information: {}", error.message()); future.raise(error); } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL @@ -550,7 +550,7 @@ public void handle(SyncGroupResponse syncResponse, if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { -log.debug("SyncGroup failed due to group rebalance"); +log.debug("SyncGroup failed because the group began another rebalance"); future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) { @@ -559,8 +559,8 @@ public void handle(SyncGroupResponse syncResponse, future.raise(error); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { -log.debug("SyncGroup failed:", error.message()); -coordinatorDead(); +log.debug("SyncGroup failed: {}", error.message()); +markCoordinatorUnknown(); future.raise(error); } else { future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message())); @@ -627,27 +627,34 @@ public void onFailure(RuntimeException e, RequestFuture future) { * @return true if the coordinator is unknown */ public boolean coordinatorUnknown() { -return coordinator() == null; +return checkAndGetCoordinator() == null; } /** - * Get the current coordinator + * Get the coordinator if its connection is still active. Otherwise mark it unknown and + * return null. + * * @return the current coordinator or null if it is unknown */ -protected synchronized Node coordinator() { +protected synchronized Node checkAndGetCoordinator() { if (coordinator != null && client.connectionFailed(coordinator)) { -coordinatorDead(); +markCoordinatorUnknown(true); return null; } return this.coordinator; } -/** - * Mark the current coordinator as dead. - */ -protected synchronized void coordinatorDead() { +private synchronized Node coordinator() { +return this.coordinator; +} + +protected synchronized void markCoordinatorUnknown() { +markCoordinatorUnknown(false); +} + +protected synchronized void markCoordinatorUnknown(boolean isDisconnected) { if (this.coordinator != null) { -log.info("Marking the coordinator {} dead", this.coordinator); +log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator); Nod
[jira] [Resolved] (KAFKA-6593) Coordinator disconnect in heartbeat thread can cause commitSync to block indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-6593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6593. Resolution: Fixed > Coordinator disconnect in heartbeat thread can cause commitSync to block > indefinitely > - > > Key: KAFKA-6593 > URL: https://issues.apache.org/jira/browse/KAFKA-6593 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Critical > Fix For: 1.1.0 > > Attachments: consumer.log > > > If a coordinator disconnect is observed in the heartbeat thread, it can cause > a pending offset commit to be cancelled just before the foreground thread > begins waiting on its response in poll(). Since the poll timeout is > Long.MAX_VALUE, this will cause the consumer to effectively hang until some > other network event causes the poll() to return. We try to protect this case > with a poll condition on the future, but this isn't bulletproof since the > future can be completed outside of the lock. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration
Igor Calabria created KAFKA-6603: Summary: Kafka streams off heap memory usage does not match expected values from configuration Key: KAFKA-6603 URL: https://issues.apache.org/jira/browse/KAFKA-6603 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: Igor Calabria Hi, I have a simple aggregation pipeline that's backed by the default state store(rocksdb). The pipeline works fine except that off heap the memory usage is way higher than expected. Following the [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] has some effect(memory usage is reduced) but the values don't match at all. The java process is set to run with just `-Xmx300m -Xms300m` and rocksdb config looks like this {code:java} tableConfig.setCacheIndexAndFilterBlocks(true); tableConfig.setBlockCacheSize(1048576); //1MB tableConfig.setBlockSize(16 * 1024); // 16KB options.setTableFormatConfig(tableConfig); options.setMaxWriteBufferNumber(2); options.setWriteBufferSize(8 * 1024); // 8KB{code} To estimate memory usage, I'm using this formula {noformat} (block_cache_size + write_buffer_size * write_buffer_number) * segments * partitions{noformat} Since my topic has 25 partitions with 3 segments each(it's a windowed store), off heap memory usage should be about 76MB. What I'm seeing in production is upwards of 300MB, even taking in consideration extra overhead from rocksdb compaction threads, this seems a bit high (especially when the disk usage for all files is just 1GB) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5891) Cast transformation fails if record schema contains timestamp field
[ https://issues.apache.org/jira/browse/KAFKA-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382629#comment-16382629 ] ASF GitHub Bot commented on KAFKA-5891: --- maver1ck opened a new pull request #4633: [KAFKA-5891] Proper handling of LogicalTypes in Cast URL: https://github.com/apache/kafka/pull/4633 Currently logical types are dropped during Cast Transformation. This patch fixes this behaviour. ### Committer Checklist (excluded from commit message) - [X] Verify design and implementation - [X] Verify test coverage and CI build status - [X] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Cast transformation fails if record schema contains timestamp field > --- > > Key: KAFKA-5891 > URL: https://issues.apache.org/jira/browse/KAFKA-5891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Artem Plotnikov >Priority: Major > > I have the following simple type cast transformation: > {code} > name=postgresql-source-simple > connector.class=io.confluent.connect.jdbc.JdbcSourceConnector > tasks.max=1 > connection.url=jdbc:postgresql://localhost:5432/testdb?user=postgres&password=mysecretpassword > query=SELECT 1::INT as a, '2017-09-14 10:23:54'::TIMESTAMP as b > transforms=Cast > transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value > transforms.Cast.spec=a:boolean > mode=bulk > topic.prefix=clients > {code} > Which fails with the following exception in runtime: > {code} > [2017-09-14 16:51:01,885] ERROR Task postgresql-source-simple-0 threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:148) > org.apache.kafka.connect.errors.DataException: Invalid Java object for schema > type INT64: class java.sql.Timestamp for field: "null" > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209) > at org.apache.kafka.connect.data.Struct.put(Struct.java:214) > at > org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152) > at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:190) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > If I remove the transforms.* part of the connector it will work correctly. > Actually, it doesn't really matter which types I use in the transformation > for field 'a', just the existence of a timestamp field brings the exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6604) ReplicaManager should not remove partitions on the log dirctory from high watermark checkpoint file
Dong Lin created KAFKA-6604: --- Summary: ReplicaManager should not remove partitions on the log dirctory from high watermark checkpoint file Key: KAFKA-6604 URL: https://issues.apache.org/jira/browse/KAFKA-6604 Project: Kafka Issue Type: Bug Reporter: Dong Lin Assignee: Dong Lin Currently a broker may truncate a partition to log start offset in the following scenario: - Broker A is restarted after shutdown - Controller knows that broker A is started. - Som event (e.g. topic deletion) triggered controller to send LeaderAndIsrRequest for partition P1. - Broker A receives LeaderAndIsrRequest for partition P1. After the broker receives the first LeaderAndIsrRequest, it will overwrite the HW checkpoint file with all its leader partitions and follower partitions. The checkpoint file will contain only the HW for partition P1. - Controller sends broker A a LeaderAndIsrRequest for all its leader and follower partitions. - Broker creates ReplicaFetcherThread for its follower partitions, truncates the log to HW, which will be zero for all partitions except P1. When this happens, potentially all logs in the broker will be truncated to log start offset and then the cluster will run with reduced availability for a long time. The right solution is to keep the partitions in the high watermark checkpoint file if the partition exists in LogManager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6604) ReplicaManager should not remove partitions on the log dirctory from high watermark checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382709#comment-16382709 ] ASF GitHub Bot commented on KAFKA-6604: --- lindong28 opened a new pull request #4634: KAFKA-6604; ReplicaManager should not remove partitions on the log directory from high watermark checkpoint file URL: https://github.com/apache/kafka/pull/4634 *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ReplicaManager should not remove partitions on the log dirctory from high > watermark checkpoint file > --- > > Key: KAFKA-6604 > URL: https://issues.apache.org/jira/browse/KAFKA-6604 > Project: Kafka > Issue Type: Bug >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > Currently a broker may truncate a partition to log start offset in the > following scenario: > - Broker A is restarted after shutdown > - Controller knows that broker A is started. > - Som event (e.g. topic deletion) triggered controller to send > LeaderAndIsrRequest for partition P1. > - Broker A receives LeaderAndIsrRequest for partition P1. After the broker > receives the first LeaderAndIsrRequest, it will overwrite the HW checkpoint > file with all its leader partitions and follower partitions. The checkpoint > file will contain only the HW for partition P1. > - Controller sends broker A a LeaderAndIsrRequest for all its leader and > follower partitions. > - Broker creates ReplicaFetcherThread for its follower partitions, truncates > the log to HW, which will be zero for all partitions except P1. > When this happens, potentially all logs in the broker will be truncated to > log start offset and then the cluster will run with reduced availability for > a long time. > The right solution is to keep the partitions in the high watermark checkpoint > file if the partition exists in LogManager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6605) Flatten SMT does not properly handle fields that are null
Randall Hauch created KAFKA-6605: Summary: Flatten SMT does not properly handle fields that are null Key: KAFKA-6605 URL: https://issues.apache.org/jira/browse/KAFKA-6605 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.0.0 Reporter: Randall Hauch When a message has a null field, the `Flatten` SMT does not properly handle this and throws an NPE. Consider this message from Debezium: {code} { "before": null, "after": { "dbserver1.mydb.team.Value": { "id": 1, "name": "kafka", "email": "ka...@apache.org", "last_modified": 1519939449000 } }, "source": { "version": { "string": "0.7.3" }, "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.03", "pos": 154, "row": 0, "snapshot": { "boolean": true }, "thread": null, "db": { "string": "mydb" }, "table": { "string": "team" } }, "op": "c", "ts_ms": { "long": 1519939520285 } } {code} Note how `before` is null; this event represents a row was INSERTED and thus there is no `before` state of the row. This results in an NPE: {noformat} org.apache.avro.SchemaParseException: Illegal character in: source.version at org.apache.avro.Schema.validateName(Schema.java:1151) at org.apache.avro.Schema.access$200(Schema.java:81) at org.apache.avro.Schema$Field.(Schema.java:403) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) at org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) at org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {noformat} Here's the connector configuration that was used: {code} { "name": "debezium-connector-flatten", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "223345", "database.server.name": "dbserver-flatten", "database.whitelist": "mydb", "database.history.kafka.bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092", "database.history.kafka.topic": "schema-flatten.mydb", "include.schema.changes": "true", "transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value" } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6605) Flatten SMT does not properly handle fields that are null
[ https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6605: - Description: When a message has a null field, the `Flatten` SMT does not properly handle this and throws an NPE. Consider this message from Debezium: {code} { "before": null, "after": { "dbserver1.mydb.team.Value": { "id": 1, "name": "kafka", "email": "ka...@apache.org", "last_modified": 1519939449000 } }, "source": { "version": { "string": "0.7.3" }, "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.03", "pos": 154, "row": 0, "snapshot": { "boolean": true }, "thread": null, "db": { "string": "mydb" }, "table": { "string": "team" } }, "op": "c", "ts_ms": { "long": 1519939520285 } } {code} Note how `before` is null; this event represents a row was INSERTED and thus there is no `before` state of the row. This results in an NPE: {noformat} org.apache.avro.SchemaParseException: Illegal character in: source.version at org.apache.avro.Schema.validateName(Schema.java:1151) at org.apache.avro.Schema.access$200(Schema.java:81) at org.apache.avro.Schema$Field.(Schema.java:403) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) at org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) at org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {noformat} Here's the connector configuration that was used: {code} { "name": "debezium-connector-flatten", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "223345", "database.server.name": "dbserver-flatten", "database.whitelist": "mydb", "database.history.kafka.bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092", "database.history.kafka.topic": "schema-flatten.mydb", "include.schema.changes": "true", "transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter": "_" } } {code} was: When a message has a null field, the `Flatten` SMT does not properly handle this and throws an NPE. Consider this message from Debezium: {code} { "before": null, "after": { "dbserver1.mydb.team.Value": { "id": 1, "name": "kafka", "email": "ka...@apache.org", "last_modified": 1519939449000 } }, "source": { "version": { "string": "0.7.3" }, "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.03", "pos": 154, "row": 0, "snapshot": { "boolean": true }, "thread": null, "db": { "string": "mydb" }, "table": { "string": "team" } }, "op": "c", "ts_ms": { "long": 1519939520285 } } {code} Note how `before` is null; this event represents a row was INSERTED and thus there is no `before` state of the row. This results in an NPE: {noformat} org.apache.avro.SchemaParseException: Illegal character in: source.version at org.apache.avro.Schema.validateName(Schema.java:1151) at org.apache.avro.Sche
[jira] [Updated] (KAFKA-6605) Flatten SMT does not properly handle fields that are null
[ https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6605: - Description: When a message has a null field, the `Flatten` SMT does not properly handle this and throws an NPE. Consider this message from Debezium: {code} { "before": null, "after": { "dbserver1.mydb.team.Value": { "id": 1, "name": "kafka", "email": "ka...@apache.org", "last_modified": 1519939449000 } }, "source": { "version": { "string": "0.7.3" }, "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.03", "pos": 154, "row": 0, "snapshot": { "boolean": true }, "thread": null, "db": { "string": "mydb" }, "table": { "string": "team" } }, "op": "c", "ts_ms": { "long": 1519939520285 } } {code} Note how `before` is null; this event represents a row was INSERTED and thus there is no `before` state of the row. This results in an NPE: {noformat} org.apache.avro.SchemaParseException: Illegal character in: source.version at org.apache.avro.Schema.validateName(Schema.java:1151) at org.apache.avro.Schema.access$200(Schema.java:81) at org.apache.avro.Schema$Field.(Schema.java:403) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) at org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) at org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {noformat} Here's the connector configuration that was used: {code} { "name": "debezium-connector-flatten", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "223345", "database.server.name": "dbserver-flatten", "database.whitelist": "mydb", "database.history.kafka.bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092", "database.history.kafka.topic": "schema-flatten.mydb", "include.schema.changes": "true", "transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter": "_" } } {code} Note that the above configuration sets the delimiter to `_`. The default delimiter is `.`, which is not a valid character within an Avro field, and doing this results in the following exception: {noformat} org.apache.avro.SchemaParseException: Illegal character in: source.version at org.apache.avro.Schema.validateName(Schema.java:1151) at org.apache.avro.Schema.access$200(Schema.java:81) at org.apache.avro.Schema$Field.(Schema.java:403) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) at org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) at org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
[jira] [Resolved] (KAFKA-3513) Transient failure of OffsetValidationTest
[ https://issues.apache.org/jira/browse/KAFKA-3513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-3513. Resolution: Fixed We haven't seen this in some time, so I'm going to close this. We can reopen if it reoccurs. > Transient failure of OffsetValidationTest > - > > Key: KAFKA-3513 > URL: https://issues.apache.org/jira/browse/KAFKA-3513 > Project: Kafka > Issue Type: Bug > Components: consumer, system tests >Reporter: Ewen Cheslack-Postava >Assignee: Jason Gustafson >Priority: Major > > http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-04-05--001.1459840046--apache--trunk--31e263e/report.html > The version of the test fails in this case is: > Module: kafkatest.tests.client.consumer_test > Class: OffsetValidationTest > Method: test_broker_failure > Arguments: > { > "clean_shutdown": true, > "enable_autocommit": false > } > and others passed. It's unclear if the parameters actually have any impact on > the failure. > I did some initial triage and it looks like the test code isn't seeing all > the group members join the group (receive partition assignments), but it > appears from the logs that they all did. This could indicate a simple timing > issue, but I haven't been able to verify that yet. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6605) Flatten SMT does not properly handle fields that are null
[ https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6605: - Description: When a message has a null field, the `Flatten` SMT does not properly handle this and throws an NPE. Consider this message from Debezium: {code} { "before": null, "after": { "dbserver1.mydb.team.Value": { "id": 1, "name": "kafka", "email": "ka...@apache.org", "last_modified": 1519939449000 } }, "source": { "version": { "string": "0.7.3" }, "name": "dbserver1", "server_id": 0, "ts_sec": 0, "gtid": null, "file": "mysql-bin.03", "pos": 154, "row": 0, "snapshot": { "boolean": true }, "thread": null, "db": { "string": "mydb" }, "table": { "string": "team" } }, "op": "c", "ts_ms": { "long": 1519939520285 } } {code} Note how `before` is null; this event represents a row was INSERTED and thus there is no `before` state of the row. This results in an NPE: {noformat} java.lang.NullPointerException at org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:219) at org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:234) at org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:151) at org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:75) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:211) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {noformat} Here's the connector configuration that was used: {code} { "name": "debezium-connector-flatten", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "223345", "database.server.name": "dbserver-flatten", "database.whitelist": "mydb", "database.history.kafka.bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092", "database.history.kafka.topic": "schema-flatten.mydb", "include.schema.changes": "true", "transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter": "_" } } {code} Note that the above configuration sets the delimiter to `_`. The default delimiter is `.`, which is not a valid character within an Avro field, and doing this results in the following exception: {noformat} org.apache.avro.SchemaParseException: Illegal character in: source.version at org.apache.avro.Schema.validateName(Schema.java:1151) at org.apache.avro.Schema.access$200(Schema.java:81) at org.apache.avro.Schema$Field.(Schema.java:403) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) at org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) at org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) at org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) at io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652) at io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647) at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.uti
[jira] [Commented] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration
[ https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382871#comment-16382871 ] Guozhang Wang commented on KAFKA-6603: -- Hello Igor, thanks for reporting the issue, without further information I'd suspect it is because of the rocksDB iterator implementation which is frequently created in windowed aggregations, and this has just been replaced in KIP-261. If possible could you build from the latest trunk that contains this fix and check if the memory usage gets improved? > Kafka streams off heap memory usage does not match expected values from > configuration > - > > Key: KAFKA-6603 > URL: https://issues.apache.org/jira/browse/KAFKA-6603 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Igor Calabria >Priority: Minor > > Hi, I have a simple aggregation pipeline that's backed by the default state > store(rocksdb). The pipeline works fine except that off heap the memory usage > is way higher than expected. Following the > [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config] > has some effect(memory usage is reduced) but the values don't match at all. > The java process is set to run with just `-Xmx300m -Xms300m` and rocksdb > config looks like this > {code:java} > tableConfig.setCacheIndexAndFilterBlocks(true); > tableConfig.setBlockCacheSize(1048576); //1MB > tableConfig.setBlockSize(16 * 1024); // 16KB > options.setTableFormatConfig(tableConfig); > options.setMaxWriteBufferNumber(2); > options.setWriteBufferSize(8 * 1024); // 8KB{code} > To estimate memory usage, I'm using this formula > {noformat} > (block_cache_size + write_buffer_size * write_buffer_number) * segments * > partitions{noformat} > Since my topic has 25 partitions with 3 segments each(it's a windowed store), > off heap memory usage should be about 76MB. What I'm seeing in production is > upwards of 300MB, even taking in consideration extra overhead from rocksdb > compaction threads, this seems a bit high (especially when the disk usage for > all files is just 1GB) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6348) Kafka consumer can't restore from coordinator failure
[ https://issues.apache.org/jira/browse/KAFKA-6348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382966#comment-16382966 ] Jason Gustafson commented on KAFKA-6348: Each groupId is mapped to one of the __consumer_offsets partitions. The group coordinator is whichever broker happens to be leading that partition. So if you see an issue with the coordinator which is resolved by changing the groupId, that likely suggests some problem with whichever __consumer_offsets partitions the old groupId was mapped to. You should check that all partitions are healthy and that there are no errors in the broker logs. > Kafka consumer can't restore from coordinator failure > - > > Key: KAFKA-6348 > URL: https://issues.apache.org/jira/browse/KAFKA-6348 > Project: Kafka > Issue Type: Bug > Components: consumer, core >Affects Versions: 0.10.1.1 >Reporter: Renjie Liu >Priority: Major > > Kafka consumer blocks and keep reporting coordinator is dead. I tried to > restart the process and it still can't work. Then we shutdown the broker and > restart consumer, but it still keep reporting coordinator is dead. This > situation continues until we change our group id and it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4936) Allow dynamic routing of output records
[ https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382969#comment-16382969 ] Blake Miller commented on KAFKA-4936: - That is a very good point about the potential to create oodles of topics unintentionally. I don't think you're overthinking it... there is significantly more complexity to it than I recognized at first, and I was not considering automatic topic creation. I pondered it a bit more just now, and came to the tentative conclusion that this feature could be implemented in a safe and simple way *only* in the absence of automatic topic creation. I personally do not understand what value automatic topic creation offers, it seems to me to abstract over very important details that the Kafka user should think about (partitioning, at least), and also to lower the bar for mistakes or bad code doing damage, in order to gain what seems like a very tiny bit of convenience. Perhaps somebody who understands the value proposition of automatic topic creation can comment on its relevance to this issue. Regarding the implementation details, I hadn't thought very deeply into it myself, but what you say makes sense, and even without automatic topic creation, the implementation would have to handle the eventuality of the topic not existing. I wonder what happens in the current implementation if I give a nonexistent output topic to a StreamBuilder and then .start() it, specifically would it be a runtime error somewhere in a SinkNode after the Kafka Streams app is started, or would StreamBuilder.build() complain? I will give that a try. > Allow dynamic routing of output records > --- > > Key: KAFKA-4936 > URL: https://issues.apache.org/jira/browse/KAFKA-4936 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > Currently, all used output topics must be know beforehand, and thus, it's not > possible to send output records to topic in a dynamic fashion. > There have been couple of request for this feature and we should consider > adding it. There are many open questions however, with regard to topic > creation and configuration (replication factor, number of partitions) etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6111) Tests for KafkaZkClient
[ https://issues.apache.org/jira/browse/KAFKA-6111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382993#comment-16382993 ] ASF GitHub Bot commented on KAFKA-6111: --- junrao closed pull request #4596: KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests URL: https://github.com/apache/kafka/pull/4596 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index afc8202b88a..e15bd2f8503 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -85,7 +85,8 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = { val brokerIdPath = brokerInfo.path val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, ZkVersion.NoVersion) -retryRequestUntilConnected(setDataRequest) +val response = retryRequestUntilConnected(setDataRequest) +response.maybeThrow() info("Updated broker %d at path %s with addresses: %s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints)) } @@ -421,7 +422,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean def deleteLogDirEventNotifications(): Unit = { val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path)) if (getChildrenResponse.resultCode == Code.OK) { - deleteLogDirEventNotifications(getChildrenResponse.children) + deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber)) } else if (getChildrenResponse.resultCode != Code.NONODE) { getChildrenResponse.maybeThrow } diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index d3726c25c58..e44c2c94e52 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -16,10 +16,11 @@ */ package kafka.zk -import java.util.{Properties, UUID} +import java.util.{Collections, Properties, UUID} import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.{CountDownLatch, TimeUnit} -import kafka.api.ApiVersion +import kafka.api.{ApiVersion, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} import kafka.log.LogConfig import kafka.security.auth._ @@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.token.delegation.TokenInformation -import org.apache.kafka.common.utils.SecurityUtils -import org.apache.zookeeper.KeeperException.NodeExistsException +import org.apache.kafka.common.utils.{SecurityUtils, Time} +import org.apache.zookeeper.KeeperException.{Code, NoNodeException, NodeExistsException} import org.junit.Assert._ -import org.junit.Test - +import org.junit.{After, Before, Test} import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.{Seq, mutable} import scala.util.Random +import kafka.controller.LeaderIsrAndControllerEpoch +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper._ +import org.apache.kafka.common.security.JaasUtils +import org.apache.zookeeper.data.Stat + class KafkaZkClientTest extends ZooKeeperTestHarness { private val group = "my-group" + private val topic1 = "topic1" + private val topic2 = "topic2" + + val topicPartition10 = new TopicPartition(topic1, 0) + val topicPartition11 = new TopicPartition(topic1, 1) + val topicPartition20 = new TopicPartition(topic2, 0) + val topicPartitions10_11 = Seq(topicPartition10, topicPartition11) + + var otherZkClient: KafkaZkClient = _ + + @Before + override def setUp(): Unit = { +super.setUp() +otherZkClient = KafkaZkClient(zkConnect, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout, + zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM) + } + + @After + override def tearDown(): Unit = { +if (otherZkClient != null) + otherZkClient.close() +super.tearDown() + } + private val topicPartition = new TopicPartition("topic", 0) @Test @@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { @Test def testTopicAssignmentMethods() { -val topic1 = "topic1" -val topic2 = "topic2" +assertTrue(zkClien
[jira] [Commented] (KAFKA-6111) Tests for KafkaZkClient
[ https://issues.apache.org/jira/browse/KAFKA-6111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383008#comment-16383008 ] ASF GitHub Bot commented on KAFKA-6111: --- junrao closed pull request #4635: Revert "KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests" URL: https://github.com/apache/kafka/pull/4635 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tests for KafkaZkClient > --- > > Key: KAFKA-6111 > URL: https://issues.apache.org/jira/browse/KAFKA-6111 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Sandor Murakozi >Priority: Major > Fix For: 1.2.0 > > > Some methods in KafkaZkClient have no tests at the moment and we need to fix > that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6111) Tests for KafkaZkClient
[ https://issues.apache.org/jira/browse/KAFKA-6111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383007#comment-16383007 ] ASF GitHub Bot commented on KAFKA-6111: --- junrao opened a new pull request #4635: Revert "KAFKA-6111: Improve test coverage of KafkaZkClient, fix bugs found by new tests" URL: https://github.com/apache/kafka/pull/4635 Reverts apache/kafka#4596 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tests for KafkaZkClient > --- > > Key: KAFKA-6111 > URL: https://issues.apache.org/jira/browse/KAFKA-6111 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Sandor Murakozi >Priority: Major > Fix For: 1.2.0 > > > Some methods in KafkaZkClient have no tests at the moment and we need to fix > that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4936) Allow dynamic routing of output records
[ https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383038#comment-16383038 ] Matthias J. Sax commented on KAFKA-4936: If you provide an unknown topic name for an output topic, the topic with either be created if auto-topic create is enabled, or a runtime error occurs. StreamsBuilder does not check the existence of topics. I tend to agree, that auto.topic create is a "tricky feature". The main question is the following: if we argue, that auto topic creation should not be used, this implies that all output topic must be know in advance because users need to create them. And thus, users can also use the {{branch()}} operator and don't need "auto routing" in the first place. Thus, it seems that people asking for this feature actually assume that auto-topic create is enabled – otherwise I cannot explain why one would need this feature? I am still not convinced about the overall story / need for this. We also would need a KIP and a discussion on the mailing list. Thus, it might make more sense to do this discussion first, before starting to write the code – we might decide that we don't want this for the mentioned reasons. When we go the feature request, we just create the Jira to track it – but in retrospective, I am not sure about it anymore. > Allow dynamic routing of output records > --- > > Key: KAFKA-4936 > URL: https://issues.apache.org/jira/browse/KAFKA-4936 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Labels: needs-kip > > Currently, all used output topics must be know beforehand, and thus, it's not > possible to send output records to topic in a dynamic fashion. > There have been couple of request for this feature and we should consider > adding it. There are many open questions however, with regard to topic > creation and configuration (replication factor, number of partitions) etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6595) Kafka connect commit offset incorrectly.
[ https://issues.apache.org/jira/browse/KAFKA-6595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383062#comment-16383062 ] huxihx commented on KAFKA-6595: --- Could we simply cancel the flush if we find another thread has already initiated a flushing? > Kafka connect commit offset incorrectly. > > > Key: KAFKA-6595 > URL: https://issues.apache.org/jira/browse/KAFKA-6595 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Hanlin Liu >Priority: Major > > Version: ConfluentPlatform Kafka 3.2.0 > SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete > records to be sent. While the task is stopped, commitOffset() is called again > by the final block in WorkerSourceTask.execute(), it will throw {{Invalid > call to OffsetStorageWriter flush() while already flushing, the framework > should not allow this}} exception. This will trigger closing Producer without > waiting the flush timeout. > After 30 seconds, all incomplete records has been forcefully aborted. If the > {{offset.flush.timeout.ms}} is configured larger than 30 seconds, > WorkerSourceTask will consider those aborted records as sent within flush > timeout, which results in incorrectly flushing the source offset. > > {code:java} > // code placeholder > 2018-02-27 02:59:33,134 INFO [] Stopping connector > dp-sqlserver-connector-dptask_455 [pool-3-thread-6][Worker.java:254] > 2018-02-27 02:59:33,134 INFO [] Stopped connector > dp-sqlserver-connector-dptask_455 [pool-3-thread-6][Worker.java:264] > 2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() > while already flushing, the framework should not allow this > [pool-1-thread-13][OffsetStorageWriter.java:110] > 2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 > threw an uncaught and unrecoverable exception > [pool-1-thread-13][WorkerTask.java:141] > org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is > already flushing > at > org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 2018-02-27 03:00:00,734 ERROR [] Graceful stop of task > dp-sqlserver-connector-dptask_455-0 failed. > [pool-3-thread-1][Worker.java:405] > 2018-02-27 03:00:04,126 INFO [] Proceeding to force close the producer since > pending requests could not be completed within timeout 30 ms. > [pool-1-thread-13][KafkaProducer.java:713] > 2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed > to send record to dptask_455.JF_TEST_11.jf_test_tab_8: {} > [kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228] > java.lang.IllegalStateException: Producer is closed forcefully. > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147) > at java.lang.Thread.run(Thread.java:745) > 2018-02-27 03:00:09,920 INFO [] Finished > WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets > successfully in 47088 ms [pool-4-thread-1][WorkerSourceTask.java:371] > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`
[ https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx resolved KAFKA-6592. --- Resolution: Duplicate Seems it's a duplicate of [KAFKA-4831|https://issues.apache.org/jira/browse/KAFKA-4831] > NullPointerException thrown when executing ConsoleCosumer with deserializer > set to `WindowedDeserializer` > - > > Key: KAFKA-6592 > URL: https://issues.apache.org/jira/browse/KAFKA-6592 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 1.0.0 >Reporter: huxihx >Assignee: huxihx >Priority: Minor > > When reading streams app's output topic with WindowedDeserializer deserilizer > using kafka-console-consumer.sh, NullPointerException was thrown due to the > fact that the inner deserializer was not initialized since there is no place > in ConsoleConsumer to set this class. > Complete stack trace is shown below: > {code:java} > [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer: > (kafka.tools.ConsoleConsumer$) > java.lang.NullPointerException > at > org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89) > at > org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35) > at > kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544) > at scala.Option.map(Option.scala:146) > at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545) > at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560) > at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`
[ https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6592: --- Component/s: streams > NullPointerException thrown when executing ConsoleCosumer with deserializer > set to `WindowedDeserializer` > - > > Key: KAFKA-6592 > URL: https://issues.apache.org/jira/browse/KAFKA-6592 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 1.0.0 >Reporter: huxihx >Assignee: huxihx >Priority: Minor > > When reading streams app's output topic with WindowedDeserializer deserilizer > using kafka-console-consumer.sh, NullPointerException was thrown due to the > fact that the inner deserializer was not initialized since there is no place > in ConsoleConsumer to set this class. > Complete stack trace is shown below: > {code:java} > [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer: > (kafka.tools.ConsoleConsumer$) > java.lang.NullPointerException > at > org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89) > at > org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35) > at > kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544) > at scala.Option.map(Option.scala:146) > at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545) > at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560) > at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5891) Cast transformation fails if record schema contains timestamp field
[ https://issues.apache.org/jira/browse/KAFKA-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383227#comment-16383227 ] Maciej Bryński commented on KAFKA-5891: --- [~broartem] Could you test my patch ? > Cast transformation fails if record schema contains timestamp field > --- > > Key: KAFKA-5891 > URL: https://issues.apache.org/jira/browse/KAFKA-5891 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Artem Plotnikov >Priority: Major > > I have the following simple type cast transformation: > {code} > name=postgresql-source-simple > connector.class=io.confluent.connect.jdbc.JdbcSourceConnector > tasks.max=1 > connection.url=jdbc:postgresql://localhost:5432/testdb?user=postgres&password=mysecretpassword > query=SELECT 1::INT as a, '2017-09-14 10:23:54'::TIMESTAMP as b > transforms=Cast > transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value > transforms.Cast.spec=a:boolean > mode=bulk > topic.prefix=clients > {code} > Which fails with the following exception in runtime: > {code} > [2017-09-14 16:51:01,885] ERROR Task postgresql-source-simple-0 threw an > uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:148) > org.apache.kafka.connect.errors.DataException: Invalid Java object for schema > type INT64: class java.sql.Timestamp for field: "null" > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239) > at > org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209) > at org.apache.kafka.connect.data.Struct.put(Struct.java:214) > at > org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152) > at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:190) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > If I remove the transforms.* part of the connector it will work correctly. > Actually, it doesn't really matter which types I use in the transformation > for field 'a', just the existence of a timestamp field brings the exception. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6054: --- Labels: needs-kip (was: ) > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: needs-kip > > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, > because the internal version number of the protocol changed when adding > Interactive Queries. Matthias asked me to file this JIRA> -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-6054: -- Assignee: Matthias J. Sax > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: needs-kip > > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, > because the internal version number of the protocol changed when adding > Interactive Queries. Matthias asked me to file this JIRA> -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383259#comment-16383259 ] Matthias J. Sax commented on KAFKA-6054: We need to change the metadata version again for KAFKA-3522 and plan to piggyback a fix for this into this KIP. > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: needs-kip > > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, > because the internal version number of the protocol changed when adding > Interactive Queries. Matthias asked me to file this JIRA> -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383292#comment-16383292 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4636: [WIP] KAFKA-6054: Fix Kafka Streams upgrade path for v0.10.0 URL: https://github.com/apache/kafka/pull/4636 Fixes the upgrade path from 0.10.0.x to 0.10.1.x+ Contained in KIP-258 Adds system tests for rolling bounce upgrades. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: needs-kip > > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 instances and 0.10.2.1 ins