[jira] [Resolved] (KAFKA-6396) Possibly kafka-connect converter should be able to stop processing chain

2018-03-01 Thread Alexander Koval (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Koval resolved KAFKA-6396.

Resolution: Invalid

Thank you for the explanation.

> Possibly kafka-connect converter should be able to stop processing chain
> 
>
> Key: KAFKA-6396
> URL: https://issues.apache.org/jira/browse/KAFKA-6396
> Project: Kafka
>  Issue Type: Wish
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Alexander Koval
>Priority: Minor
>
> At present only transformations can discard records returning null. But I 
> think sometimes it would be nice to discard processing chain after converting 
> message. For example I have some tags shipped with a message key and I want 
> to stop processing the message after converting its key (there are a lot of 
> messages and I don't want to deserialize message values that I don't need).
> At the moment to do that I should disable converters and move message 
> deserializing to the transformation chain:
> {code}
> key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
> value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
> transforms=proto,catalog
> transforms.proto.type=company.evo.kafka.ProtobufTransformation
> transforms.proto.key.protobuf.class=company.evo.uaprom.indexator.KeyProto$KeyMessage
> transforms.proto.value.protobuf.class=company.evo.uaprom.indexator.catalog.CompanyProto$UniversalCompanyMessage
> transforms.proto.tag=catalog
> {code}
> If 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/1.0.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L453]
>  checked converted values on {{null}} it would solved my problem more 
> gracefully



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-03-01 Thread Filipe Agapito (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382069#comment-16382069
 ] 

Filipe Agapito commented on KAFKA-6474:
---

I'd like to work on this. I'm assigning the issue to myself.

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-03-01 Thread Filipe Agapito (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Filipe Agapito reassigned KAFKA-6474:
-

Assignee: Filipe Agapito

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators

2018-03-01 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6560.
--
   Resolution: Fixed
Fix Version/s: 1.2.0

> Use single-point queries than range queries for windowed aggregation operators
> --
>
> Key: KAFKA-6560
> URL: https://issues.apache.org/jira/browse/KAFKA-6560
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: needs-kip
> Fix For: 1.2.0
>
>
> Today for windowed aggregations in Streams DSL, the underlying implementation 
> is leveraging the fetch(key, from, to) API to get all the related windows for 
> a single record to update. However, this is a very inefficient operation with 
> significant amount of CPU time iterating over window stores. On the other 
> hand, since the operator implementation itself have full knowledge of the 
> window specs it can actually translate this operation into multiple 
> single-point queries with the accurate window start timestamp, which would 
> largely reduce the overhead.
> The proposed approach is to add a single fetch API to the WindowedStore and 
> use that in the KStreamWindowedAggregate / KStreamWindowedReduce operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6560) Use single-point queries than range queries for windowed aggregation operators

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382347#comment-16382347
 ] 

ASF GitHub Bot commented on KAFKA-6560:
---

guozhangwang closed pull request #4578: KAFKA-6560: Replace range query with 
newly added single point query in Windowed Aggregation
URL: https://github.com/apache/kafka/pull/4578
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index ec26866bd22..27f8408320f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.kstream.Window;
@@ -39,7 +38,10 @@
 
 private boolean sendOldValues = false;
 
-public KStreamWindowAggregate(Windows windows, String storeName, 
Initializer initializer, Aggregator aggregator) {
+KStreamWindowAggregate(final Windows windows,
+   final String storeName,
+   final Initializer initializer,
+   final Aggregator 
aggregator) {
 this.windows = windows;
 this.storeName = storeName;
 this.initializer = initializer;
@@ -63,7 +65,7 @@ public void enableSendingOldValues() {
 
 @SuppressWarnings("unchecked")
 @Override
-public void init(ProcessorContext context) {
+public void init(final ProcessorContext context) {
 super.init(context);
 
 windowStore = (WindowStore) context.getStateStore(storeName);
@@ -71,54 +73,27 @@ public void init(ProcessorContext context) {
 }
 
 @Override
-public void process(K key, V value) {
+public void process(final K key, final V value) {
 // if the key is null, we do not need proceed aggregating the 
record
 // the record with the table
 if (key == null)
 return;
 
 // first get the matching windows
-long timestamp = context().timestamp();
-Map matchedWindows = windows.windowsFor(timestamp);
+final long timestamp = context().timestamp();
+final Map matchedWindows = windows.windowsFor(timestamp);
 
-long timeFrom = Long.MAX_VALUE;
-long timeTo = Long.MIN_VALUE;
+// try update the window, and create the new window for the rest 
of unmatched window that do not exist yet
+for (final Map.Entry entry : matchedWindows.entrySet()) {
+T oldAgg = windowStore.fetch(key, entry.getKey());
 
-// use range query on window store for efficient reads
-for (long windowStartMs : matchedWindows.keySet()) {
-timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
-timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
-}
-
-try (WindowStoreIterator iter = windowStore.fetch(key, 
timeFrom, timeTo)) {
-
-// for each matching window, try to update the corresponding 
key
-while (iter.hasNext()) {
-KeyValue entry = iter.next();
-W window = matchedWindows.get(entry.key);
-
-if (window != null) {
-
-T oldAgg = entry.value;
-
-if (oldAgg == null)
-oldAgg = initializer.apply();
-
-// try to add the new value (there will never be old 
value)
-T newAgg = aggregator.apply(key, value, oldAgg);
-
-// update the store with the new value
-windowStore.put(key, newAgg, window.start());
-tupleForwarder.maybeForward(new Windowed<>(key, 
window), newAgg, oldAgg);
-matchedWindows.remove(entry.key);
-}
+if (oldAgg == null) {
+oldAgg = initializer.apply();
 }
-}
 
-// create the new window for the rest of unmatched window that do 
not exist yet
-for (Map.Entry entry : matchedWindows.entrySet()) {
-T o

[jira] [Assigned] (KAFKA-6602) Support Kafka to save credentials in Java Key Store on Zookeeper node

2018-03-01 Thread Chen He (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chen He reassigned KAFKA-6602:
--

Assignee: Chen He

> Support Kafka to save credentials in Java Key Store on Zookeeper node
> -
>
> Key: KAFKA-6602
> URL: https://issues.apache.org/jira/browse/KAFKA-6602
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Chen He
>Assignee: Chen He
>Priority: Major
>
> Kafka connect needs to talk to multifarious distributed systems. However, 
> each system has its own authentication mechanism. How we manage these 
> credentials become a common problem. 
> Here are my thoughts:
>  # We may need to save it in java key store;
>  # We may need to put this key store in a distributed system (topic or 
> zookeeper);
>  # Key store password may be configured in Kafka configuration;
> I have implement the feature that allows store java key store in zookeeper 
> node. If Kafka community likes this idea, I am happy to contribute.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6602) Support Kafka to save credentials in Java Key Store on Zookeeper node

2018-03-01 Thread Chen He (JIRA)
Chen He created KAFKA-6602:
--

 Summary: Support Kafka to save credentials in Java Key Store on 
Zookeeper node
 Key: KAFKA-6602
 URL: https://issues.apache.org/jira/browse/KAFKA-6602
 Project: Kafka
  Issue Type: New Feature
  Components: security
Reporter: Chen He


Kafka connect needs to talk to multifarious distributed systems. However, each 
system has its own authentication mechanism. How we manage these credentials 
become a common problem. 

Here are my thoughts:
 # We may need to save it in java key store;
 # We may need to put this key store in a distributed system (topic or 
zookeeper);
 # Key store password may be configured in Kafka configuration;

I have implement the feature that allows store java key store in zookeeper 
node. If Kafka community likes this idea, I am happy to contribute.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6593) Coordinator disconnect in heartbeat thread can cause commitSync to block indefinitely

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382469#comment-16382469
 ] 

ASF GitHub Bot commented on KAFKA-6593:
---

hachikuji closed pull request #4625: KAFKA-6593; Fix livelock with consumer 
heartbeat thread in commitSync
URL: https://github.com/apache/kafka/pull/4625
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 6884ff0dff7..2daadddee63 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -231,7 +231,7 @@ protected synchronized boolean ensureCoordinatorReady(long 
startTimeMs, long tim
 } else if (coordinator != null && 
client.connectionFailed(coordinator)) {
 // we found the coordinator, but the connection has failed, so 
mark
 // it dead and backoff before retrying discovery
-coordinatorDead();
+markCoordinatorUnknown();
 time.sleep(retryBackoffMs);
 }
 
@@ -487,7 +487,7 @@ public void handle(JoinGroupResponse joinResponse, 
RequestFuture fut
 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
 || error == Errors.NOT_COORDINATOR) {
 // re-discover the coordinator and retry with backoff
-coordinatorDead();
+markCoordinatorUnknown();
 log.debug("Attempt to join group failed due to obsolete 
coordinator information: {}", error.message());
 future.raise(error);
 } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
@@ -550,7 +550,7 @@ public void handle(SyncGroupResponse syncResponse,
 if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
 future.raise(new GroupAuthorizationException(groupId));
 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-log.debug("SyncGroup failed due to group rebalance");
+log.debug("SyncGroup failed because the group began 
another rebalance");
 future.raise(error);
 } else if (error == Errors.UNKNOWN_MEMBER_ID
 || error == Errors.ILLEGAL_GENERATION) {
@@ -559,8 +559,8 @@ public void handle(SyncGroupResponse syncResponse,
 future.raise(error);
 } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
 || error == Errors.NOT_COORDINATOR) {
-log.debug("SyncGroup failed:", error.message());
-coordinatorDead();
+log.debug("SyncGroup failed: {}", error.message());
+markCoordinatorUnknown();
 future.raise(error);
 } else {
 future.raise(new KafkaException("Unexpected error from 
SyncGroup: " + error.message()));
@@ -627,27 +627,34 @@ public void onFailure(RuntimeException e, 
RequestFuture future) {
  * @return true if the coordinator is unknown
  */
 public boolean coordinatorUnknown() {
-return coordinator() == null;
+return checkAndGetCoordinator() == null;
 }
 
 /**
- * Get the current coordinator
+ * Get the coordinator if its connection is still active. Otherwise mark 
it unknown and
+ * return null.
+ *
  * @return the current coordinator or null if it is unknown
  */
-protected synchronized Node coordinator() {
+protected synchronized Node checkAndGetCoordinator() {
 if (coordinator != null && client.connectionFailed(coordinator)) {
-coordinatorDead();
+markCoordinatorUnknown(true);
 return null;
 }
 return this.coordinator;
 }
 
-/**
- * Mark the current coordinator as dead.
- */
-protected synchronized void coordinatorDead() {
+private synchronized Node coordinator() {
+return this.coordinator;
+}
+
+protected synchronized void markCoordinatorUnknown() {
+markCoordinatorUnknown(false);
+}
+
+protected synchronized void markCoordinatorUnknown(boolean isDisconnected) 
{
 if (this.coordinator != null) {
-log.info("Marking the coordinator {} dead", this.coordinator);
+log.info("Group coordinator {} is unavailable or invalid, will 
attempt rediscovery", this.coordinator);
 Nod

[jira] [Resolved] (KAFKA-6593) Coordinator disconnect in heartbeat thread can cause commitSync to block indefinitely

2018-03-01 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6593.

Resolution: Fixed

> Coordinator disconnect in heartbeat thread can cause commitSync to block 
> indefinitely
> -
>
> Key: KAFKA-6593
> URL: https://issues.apache.org/jira/browse/KAFKA-6593
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 1.1.0
>
> Attachments: consumer.log
>
>
> If a coordinator disconnect is observed in the heartbeat thread, it can cause 
> a pending offset commit to be cancelled just before the foreground thread 
> begins waiting on its response in poll(). Since the poll timeout is 
> Long.MAX_VALUE, this will cause the consumer to effectively hang until some 
> other network event causes the poll() to return. We try to protect this case 
> with a poll condition on the future, but this isn't bulletproof since the 
> future can be completed outside of the lock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration

2018-03-01 Thread Igor Calabria (JIRA)
Igor Calabria created KAFKA-6603:


 Summary: Kafka streams off heap memory usage does not match 
expected values from configuration
 Key: KAFKA-6603
 URL: https://issues.apache.org/jira/browse/KAFKA-6603
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
Reporter: Igor Calabria


Hi, I have a simple aggregation pipeline that's backed by the default state 
store(rocksdb). The pipeline works fine except that off heap the memory usage 
is way higher than expected. Following the 
[documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
 has some effect(memory usage is reduced) but the values don't match at all. 

The java process is set to run with just `-Xmx300m -Xms300m`  and rocksdb 
config looks like this
{code:java}
tableConfig.setCacheIndexAndFilterBlocks(true);
tableConfig.setBlockCacheSize(1048576); //1MB
tableConfig.setBlockSize(16 * 1024); // 16KB
options.setTableFormatConfig(tableConfig);
options.setMaxWriteBufferNumber(2);
options.setWriteBufferSize(8 * 1024); // 8KB{code}
To estimate memory usage, I'm using this formula  
{noformat}
(block_cache_size + write_buffer_size * write_buffer_number) * segments * 
partitions{noformat}
Since my topic has 25 partitions with 3 segments each(it's a windowed store), 
off heap memory usage should be about 76MB. What I'm seeing in production is 
upwards of 300MB, even taking in consideration  extra overhead from rocksdb 
compaction threads, this seems a bit high (especially when the disk usage for 
all files is just 1GB) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5891) Cast transformation fails if record schema contains timestamp field

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382629#comment-16382629
 ] 

ASF GitHub Bot commented on KAFKA-5891:
---

maver1ck opened a new pull request #4633: [KAFKA-5891] Proper handling of 
LogicalTypes in Cast
URL: https://github.com/apache/kafka/pull/4633
 
 
   Currently logical types are dropped during Cast Transformation.
   This patch fixes this behaviour.
   
   ### Committer Checklist (excluded from commit message)
   - [X] Verify design and implementation 
   - [X] Verify test coverage and CI build status
   - [X] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cast transformation fails if record schema contains timestamp field
> ---
>
> Key: KAFKA-5891
> URL: https://issues.apache.org/jira/browse/KAFKA-5891
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Artem Plotnikov
>Priority: Major
>
> I have the following simple type cast transformation:
> {code}
> name=postgresql-source-simple
> connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
> tasks.max=1
> connection.url=jdbc:postgresql://localhost:5432/testdb?user=postgres&password=mysecretpassword
> query=SELECT 1::INT as a, '2017-09-14 10:23:54'::TIMESTAMP as b
> transforms=Cast
> transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value
> transforms.Cast.spec=a:boolean
> mode=bulk
> topic.prefix=clients
> {code}
> Which fails with the following exception in runtime:
> {code}
> [2017-09-14 16:51:01,885] ERROR Task postgresql-source-simple-0 threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:148)
> org.apache.kafka.connect.errors.DataException: Invalid Java object for schema 
> type INT64: class java.sql.Timestamp for field: "null"
>   at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239)
>   at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209)
>   at org.apache.kafka.connect.data.Struct.put(Struct.java:214)
>   at 
> org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152)
>   at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108)
>   at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:190)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> If I remove the  transforms.* part of the connector it will work correctly. 
> Actually, it doesn't really matter which types I use in the transformation 
> for field 'a', just the existence of a timestamp field brings the exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6604) ReplicaManager should not remove partitions on the log dirctory from high watermark checkpoint file

2018-03-01 Thread Dong Lin (JIRA)
Dong Lin created KAFKA-6604:
---

 Summary: ReplicaManager should not remove partitions on the log 
dirctory from high watermark checkpoint file
 Key: KAFKA-6604
 URL: https://issues.apache.org/jira/browse/KAFKA-6604
 Project: Kafka
  Issue Type: Bug
Reporter: Dong Lin
Assignee: Dong Lin


Currently a broker may truncate a partition to log start offset in the 
following scenario:

- Broker A is restarted after shutdown
- Controller knows that broker A is started.
- Som event (e.g. topic deletion) triggered controller to send 
LeaderAndIsrRequest for partition P1.
- Broker A receives LeaderAndIsrRequest for partition P1. After the broker 
receives the first LeaderAndIsrRequest, it will overwrite the HW checkpoint 
file with all its leader partitions and follower partitions. The checkpoint 
file will contain only the HW for partition P1.
- Controller sends broker A a LeaderAndIsrRequest for all its leader and 
follower partitions.
- Broker creates ReplicaFetcherThread for its follower partitions, truncates 
the log to HW, which will be zero for all partitions except P1.

When this happens, potentially all logs in the broker will be truncated to log 
start offset and then the cluster will run with reduced availability for a long 
time.

The right solution is to keep the partitions in the high watermark checkpoint 
file if the partition exists in LogManager.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6604) ReplicaManager should not remove partitions on the log dirctory from high watermark checkpoint file

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382709#comment-16382709
 ] 

ASF GitHub Bot commented on KAFKA-6604:
---

lindong28 opened a new pull request #4634: KAFKA-6604; ReplicaManager should 
not remove partitions on the log directory from high watermark checkpoint file
URL: https://github.com/apache/kafka/pull/4634
 
 
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReplicaManager should not remove partitions on the log dirctory from high 
> watermark checkpoint file
> ---
>
> Key: KAFKA-6604
> URL: https://issues.apache.org/jira/browse/KAFKA-6604
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
>
> Currently a broker may truncate a partition to log start offset in the 
> following scenario:
> - Broker A is restarted after shutdown
> - Controller knows that broker A is started.
> - Som event (e.g. topic deletion) triggered controller to send 
> LeaderAndIsrRequest for partition P1.
> - Broker A receives LeaderAndIsrRequest for partition P1. After the broker 
> receives the first LeaderAndIsrRequest, it will overwrite the HW checkpoint 
> file with all its leader partitions and follower partitions. The checkpoint 
> file will contain only the HW for partition P1.
> - Controller sends broker A a LeaderAndIsrRequest for all its leader and 
> follower partitions.
> - Broker creates ReplicaFetcherThread for its follower partitions, truncates 
> the log to HW, which will be zero for all partitions except P1.
> When this happens, potentially all logs in the broker will be truncated to 
> log start offset and then the cluster will run with reduced availability for 
> a long time.
> The right solution is to keep the partitions in the high watermark checkpoint 
> file if the partition exists in LogManager.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6605) Flatten SMT does not properly handle fields that are null

2018-03-01 Thread Randall Hauch (JIRA)
Randall Hauch created KAFKA-6605:


 Summary: Flatten SMT does not properly handle fields that are null
 Key: KAFKA-6605
 URL: https://issues.apache.org/jira/browse/KAFKA-6605
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.0.0
Reporter: Randall Hauch


When a message has a null field, the `Flatten` SMT does not properly handle 
this and throws an NPE. Consider this message from Debezium:

{code}
{
  "before": null,
  "after": {
"dbserver1.mydb.team.Value": {
  "id": 1,
  "name": "kafka",
  "email": "ka...@apache.org",
  "last_modified": 1519939449000
}
  },
  "source": {
"version": {
  "string": "0.7.3"
},
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.03",
"pos": 154,
"row": 0,
"snapshot": {
  "boolean": true
},
"thread": null,
"db": {
  "string": "mydb"
},
"table": {
  "string": "team"
}
  },
  "op": "c",
  "ts_ms": {
"long": 1519939520285
  }
}
{code}

Note how `before` is null; this event represents a row was INSERTED and thus 
there is no `before` state of the row. This results in an NPE:

{noformat}
org.apache.avro.SchemaParseException: Illegal character in: source.version
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Field.(Schema.java:403)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
at 
org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
at 
io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
at 
io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

Here's the connector configuration that was used:

{code}
{
"name": "debezium-connector-flatten",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "223345",
"database.server.name": "dbserver-flatten",
"database.whitelist": "mydb",
"database.history.kafka.bootstrap.servers": 
"kafka-1:9092,kafka-2:9092,kafka-3:9092",
"database.history.kafka.topic": "schema-flatten.mydb",
"include.schema.changes": "true",
"transforms": "flatten",
"transforms.flatten.type": 
"org.apache.kafka.connect.transforms.Flatten$Value"
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6605) Flatten SMT does not properly handle fields that are null

2018-03-01 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6605:
-
Description: 
When a message has a null field, the `Flatten` SMT does not properly handle 
this and throws an NPE. Consider this message from Debezium:

{code}
{
  "before": null,
  "after": {
"dbserver1.mydb.team.Value": {
  "id": 1,
  "name": "kafka",
  "email": "ka...@apache.org",
  "last_modified": 1519939449000
}
  },
  "source": {
"version": {
  "string": "0.7.3"
},
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.03",
"pos": 154,
"row": 0,
"snapshot": {
  "boolean": true
},
"thread": null,
"db": {
  "string": "mydb"
},
"table": {
  "string": "team"
}
  },
  "op": "c",
  "ts_ms": {
"long": 1519939520285
  }
}
{code}

Note how `before` is null; this event represents a row was INSERTED and thus 
there is no `before` state of the row. This results in an NPE:

{noformat}
org.apache.avro.SchemaParseException: Illegal character in: source.version
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Field.(Schema.java:403)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
at 
org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
at 
io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
at 
io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

Here's the connector configuration that was used:

{code}
{
"name": "debezium-connector-flatten",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "223345",
"database.server.name": "dbserver-flatten",
"database.whitelist": "mydb",
"database.history.kafka.bootstrap.servers": 
"kafka-1:9092,kafka-2:9092,kafka-3:9092",
"database.history.kafka.topic": "schema-flatten.mydb",
"include.schema.changes": "true",
"transforms": "flatten",
"transforms.flatten.type": 
"org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
  }
}
{code}

  was:
When a message has a null field, the `Flatten` SMT does not properly handle 
this and throws an NPE. Consider this message from Debezium:

{code}
{
  "before": null,
  "after": {
"dbserver1.mydb.team.Value": {
  "id": 1,
  "name": "kafka",
  "email": "ka...@apache.org",
  "last_modified": 1519939449000
}
  },
  "source": {
"version": {
  "string": "0.7.3"
},
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.03",
"pos": 154,
"row": 0,
"snapshot": {
  "boolean": true
},
"thread": null,
"db": {
  "string": "mydb"
},
"table": {
  "string": "team"
}
  },
  "op": "c",
  "ts_ms": {
"long": 1519939520285
  }
}
{code}

Note how `before` is null; this event represents a row was INSERTED and thus 
there is no `before` state of the row. This results in an NPE:

{noformat}
org.apache.avro.SchemaParseException: Illegal character in: source.version
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Sche

[jira] [Updated] (KAFKA-6605) Flatten SMT does not properly handle fields that are null

2018-03-01 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6605:
-
Description: 
When a message has a null field, the `Flatten` SMT does not properly handle 
this and throws an NPE. Consider this message from Debezium:

{code}
{
  "before": null,
  "after": {
"dbserver1.mydb.team.Value": {
  "id": 1,
  "name": "kafka",
  "email": "ka...@apache.org",
  "last_modified": 1519939449000
}
  },
  "source": {
"version": {
  "string": "0.7.3"
},
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.03",
"pos": 154,
"row": 0,
"snapshot": {
  "boolean": true
},
"thread": null,
"db": {
  "string": "mydb"
},
"table": {
  "string": "team"
}
  },
  "op": "c",
  "ts_ms": {
"long": 1519939520285
  }
}
{code}

Note how `before` is null; this event represents a row was INSERTED and thus 
there is no `before` state of the row. This results in an NPE:

{noformat}
org.apache.avro.SchemaParseException: Illegal character in: source.version
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Field.(Schema.java:403)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
at 
org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
at 
io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
at 
io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

Here's the connector configuration that was used:

{code}
{
"name": "debezium-connector-flatten",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "223345",
"database.server.name": "dbserver-flatten",
"database.whitelist": "mydb",
"database.history.kafka.bootstrap.servers": 
"kafka-1:9092,kafka-2:9092,kafka-3:9092",
"database.history.kafka.topic": "schema-flatten.mydb",
"include.schema.changes": "true",
"transforms": "flatten",
"transforms.flatten.type": 
"org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
  }
}
{code}

Note that the above configuration sets the delimiter to `_`. The default 
delimiter is `.`, which is not a valid character within an Avro field, and 
doing this results in the following exception:

{noformat}
org.apache.avro.SchemaParseException: Illegal character in: source.version
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Field.(Schema.java:403)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
at 
org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
at 
io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)

[jira] [Resolved] (KAFKA-3513) Transient failure of OffsetValidationTest

2018-03-01 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-3513.

Resolution: Fixed

We haven't seen this in some time, so I'm going to close this. We can reopen if 
it reoccurs.

> Transient failure of OffsetValidationTest
> -
>
> Key: KAFKA-3513
> URL: https://issues.apache.org/jira/browse/KAFKA-3513
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, system tests
>Reporter: Ewen Cheslack-Postava
>Assignee: Jason Gustafson
>Priority: Major
>
> http://confluent-kafka-system-test-results.s3-us-west-2.amazonaws.com/2016-04-05--001.1459840046--apache--trunk--31e263e/report.html
> The version of the test fails in this case is:
> Module: kafkatest.tests.client.consumer_test
> Class:  OffsetValidationTest
> Method: test_broker_failure
> Arguments:
> {
>   "clean_shutdown": true,
>   "enable_autocommit": false
> }
> and others passed. It's unclear if the parameters actually have any impact on 
> the failure.
> I did some initial triage and it looks like the test code isn't seeing all 
> the group members join the group (receive partition assignments), but it 
> appears from the logs that they all did. This could indicate a simple timing 
> issue, but I haven't been able to verify that yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6605) Flatten SMT does not properly handle fields that are null

2018-03-01 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6605:
-
Description: 
When a message has a null field, the `Flatten` SMT does not properly handle 
this and throws an NPE. Consider this message from Debezium:

{code}
{
  "before": null,
  "after": {
"dbserver1.mydb.team.Value": {
  "id": 1,
  "name": "kafka",
  "email": "ka...@apache.org",
  "last_modified": 1519939449000
}
  },
  "source": {
"version": {
  "string": "0.7.3"
},
"name": "dbserver1",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.03",
"pos": 154,
"row": 0,
"snapshot": {
  "boolean": true
},
"thread": null,
"db": {
  "string": "mydb"
},
"table": {
  "string": "team"
}
  },
  "op": "c",
  "ts_ms": {
"long": 1519939520285
  }
}
{code}

Note how `before` is null; this event represents a row was INSERTED and thus 
there is no `before` state of the row. This results in an NPE:

{noformat}
java.lang.NullPointerException
at 
org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:219)
at 
org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:234)
at 
org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:151)
at org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:75)
at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:211)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

Here's the connector configuration that was used:

{code}
{
"name": "debezium-connector-flatten",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "223345",
"database.server.name": "dbserver-flatten",
"database.whitelist": "mydb",
"database.history.kafka.bootstrap.servers": 
"kafka-1:9092,kafka-2:9092,kafka-3:9092",
"database.history.kafka.topic": "schema-flatten.mydb",
"include.schema.changes": "true",
"transforms": "flatten",
"transforms.flatten.type": 
"org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
  }
}
{code}

Note that the above configuration sets the delimiter to `_`. The default 
delimiter is `.`, which is not a valid character within an Avro field, and 
doing this results in the following exception:

{noformat}
org.apache.avro.SchemaParseException: Illegal character in: source.version
at org.apache.avro.Schema.validateName(Schema.java:1151)
at org.apache.avro.Schema.access$200(Schema.java:81)
at org.apache.avro.Schema$Field.(Schema.java:403)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116)
at 
org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034)
at 
org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423)
at 
io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652)
at 
io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647)
at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
at 
io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.uti

[jira] [Commented] (KAFKA-6603) Kafka streams off heap memory usage does not match expected values from configuration

2018-03-01 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382871#comment-16382871
 ] 

Guozhang Wang commented on KAFKA-6603:
--

Hello Igor, thanks for reporting the issue, without further information I'd 
suspect it is because of the rocksDB iterator implementation which is 
frequently created in windowed aggregations, and this has just been replaced in 
KIP-261. If possible could you build from the latest trunk that contains this 
fix and check if the memory usage gets improved?

> Kafka streams off heap memory usage does not match expected values from 
> configuration
> -
>
> Key: KAFKA-6603
> URL: https://issues.apache.org/jira/browse/KAFKA-6603
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Igor Calabria
>Priority: Minor
>
> Hi, I have a simple aggregation pipeline that's backed by the default state 
> store(rocksdb). The pipeline works fine except that off heap the memory usage 
> is way higher than expected. Following the 
> [documention|https://docs.confluent.io/current/streams/developer-guide/config-streams.html#streams-developer-guide-rocksdb-config]
>  has some effect(memory usage is reduced) but the values don't match at all. 
> The java process is set to run with just `-Xmx300m -Xms300m`  and rocksdb 
> config looks like this
> {code:java}
> tableConfig.setCacheIndexAndFilterBlocks(true);
> tableConfig.setBlockCacheSize(1048576); //1MB
> tableConfig.setBlockSize(16 * 1024); // 16KB
> options.setTableFormatConfig(tableConfig);
> options.setMaxWriteBufferNumber(2);
> options.setWriteBufferSize(8 * 1024); // 8KB{code}
> To estimate memory usage, I'm using this formula  
> {noformat}
> (block_cache_size + write_buffer_size * write_buffer_number) * segments * 
> partitions{noformat}
> Since my topic has 25 partitions with 3 segments each(it's a windowed store), 
> off heap memory usage should be about 76MB. What I'm seeing in production is 
> upwards of 300MB, even taking in consideration  extra overhead from rocksdb 
> compaction threads, this seems a bit high (especially when the disk usage for 
> all files is just 1GB) 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6348) Kafka consumer can't restore from coordinator failure

2018-03-01 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382966#comment-16382966
 ] 

Jason Gustafson commented on KAFKA-6348:


Each groupId is mapped to one of the __consumer_offsets partitions. The group 
coordinator is whichever broker happens to be leading that partition. So if you 
see an issue with the coordinator which is resolved by changing the groupId, 
that likely suggests some problem with whichever __consumer_offsets partitions 
the old groupId was mapped to. You should check that all partitions are healthy 
and that there are no errors in the broker logs.

> Kafka consumer can't restore from coordinator failure
> -
>
> Key: KAFKA-6348
> URL: https://issues.apache.org/jira/browse/KAFKA-6348
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, core
>Affects Versions: 0.10.1.1
>Reporter: Renjie Liu
>Priority: Major
>
> Kafka consumer blocks and keep reporting coordinator is dead. I tried to 
> restart the process and it still can't work. Then we shutdown the broker and 
> restart consumer, but it still keep reporting coordinator is dead. This 
> situation continues until we change our group id and it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4936) Allow dynamic routing of output records

2018-03-01 Thread Blake Miller (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382969#comment-16382969
 ] 

Blake Miller commented on KAFKA-4936:
-

That is a very good point about the potential to create oodles of topics 
unintentionally. I don't think you're overthinking it... there is significantly 
more complexity to it than I recognized at first, and I was not considering 
automatic topic creation. I pondered it a bit more just now, and came to the 
tentative conclusion that this feature could be implemented in a safe and 
simple way *only* in the absence of automatic topic creation.

I personally do not understand what value automatic topic creation offers, it 
seems to me to abstract over very important details that the Kafka user should 
think about (partitioning, at least), and also to lower the bar for mistakes or 
bad code doing damage, in order to gain what seems like a very tiny bit of 
convenience. Perhaps somebody who understands the value proposition of 
automatic topic creation can comment on its relevance to this issue.

Regarding the implementation details, I hadn't thought very deeply into it 
myself, but what you say makes sense, and even without automatic topic 
creation, the implementation would have to handle the eventuality of the topic 
not existing.

I wonder what happens in the current implementation if I give a nonexistent 
output topic to a StreamBuilder and then .start() it, specifically would it be 
a runtime error somewhere in a SinkNode after the Kafka Streams app is started, 
or would StreamBuilder.build() complain? I will give that a try.

 

 

> Allow dynamic routing of output records
> ---
>
> Key: KAFKA-4936
> URL: https://issues.apache.org/jira/browse/KAFKA-4936
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Currently, all used output topics must be know beforehand, and thus, it's not 
> possible to send output records to topic in a dynamic fashion.
> There have been couple of request for this feature and we should consider 
> adding it. There are many open questions however, with regard to topic 
> creation and configuration (replication factor, number of partitions) etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6111) Tests for KafkaZkClient

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16382993#comment-16382993
 ] 

ASF GitHub Bot commented on KAFKA-6111:
---

junrao closed pull request #4596: KAFKA-6111: Improve test coverage of 
KafkaZkClient, fix bugs found by new tests
URL: https://github.com/apache/kafka/pull/4596
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index afc8202b88a..e15bd2f8503 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -85,7 +85,8 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def updateBrokerInfoInZk(brokerInfo: BrokerInfo): Unit = {
 val brokerIdPath = brokerInfo.path
 val setDataRequest = SetDataRequest(brokerIdPath, brokerInfo.toJsonBytes, 
ZkVersion.NoVersion)
-retryRequestUntilConnected(setDataRequest)
+val response = retryRequestUntilConnected(setDataRequest)
+response.maybeThrow()
 info("Updated broker %d at path %s with addresses: 
%s".format(brokerInfo.broker.id, brokerIdPath, brokerInfo.broker.endPoints))
   }
 
@@ -421,7 +422,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
   def deleteLogDirEventNotifications(): Unit = {
 val getChildrenResponse = 
retryRequestUntilConnected(GetChildrenRequest(LogDirEventNotificationZNode.path))
 if (getChildrenResponse.resultCode == Code.OK) {
-  deleteLogDirEventNotifications(getChildrenResponse.children)
+  
deleteLogDirEventNotifications(getChildrenResponse.children.map(LogDirEventNotificationSequenceZNode.sequenceNumber))
 } else if (getChildrenResponse.resultCode != Code.NONODE) {
   getChildrenResponse.maybeThrow
 }
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index d3726c25c58..e44c2c94e52 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,10 +16,11 @@
 */
 package kafka.zk
 
-import java.util.{Properties, UUID}
+import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.log.LogConfig
 import kafka.security.auth._
@@ -29,17 +30,48 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
-import org.apache.kafka.common.utils.SecurityUtils
-import org.apache.zookeeper.KeeperException.NodeExistsException
+import org.apache.kafka.common.utils.{SecurityUtils, Time}
+import org.apache.zookeeper.KeeperException.{Code, NoNodeException, 
NodeExistsException}
 import org.junit.Assert._
-import org.junit.Test
-
+import org.junit.{After, Before, Test}
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Seq, mutable}
 import scala.util.Random
 
+import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.zookeeper.data.Stat
+
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
+  private val topic1 = "topic1"
+  private val topic2 = "topic2"
+
+  val topicPartition10 = new TopicPartition(topic1, 0)
+  val topicPartition11 = new TopicPartition(topic1, 1)
+  val topicPartition20 = new TopicPartition(topic2, 0)
+  val topicPartitions10_11 = Seq(topicPartition10, topicPartition11)
+
+  var otherZkClient: KafkaZkClient = _
+
+  @Before
+  override def setUp(): Unit = {
+super.setUp()
+otherZkClient = KafkaZkClient(zkConnect, 
zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled), zkSessionTimeout,
+  zkConnectionTimeout, zkMaxInFlightRequests, Time.SYSTEM)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+if (otherZkClient != null)
+  otherZkClient.close()
+super.tearDown()
+  }
+
   private val topicPartition = new TopicPartition("topic", 0)
 
   @Test
@@ -90,10 +122,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   @Test
   def testTopicAssignmentMethods() {
-val topic1 = "topic1"
-val topic2 = "topic2"
+assertTrue(zkClien

[jira] [Commented] (KAFKA-6111) Tests for KafkaZkClient

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383008#comment-16383008
 ] 

ASF GitHub Bot commented on KAFKA-6111:
---

junrao closed pull request #4635: Revert "KAFKA-6111: Improve test coverage of 
KafkaZkClient, fix bugs found by new tests"
URL: https://github.com/apache/kafka/pull/4635
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tests for KafkaZkClient
> ---
>
> Key: KAFKA-6111
> URL: https://issues.apache.org/jira/browse/KAFKA-6111
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Sandor Murakozi
>Priority: Major
> Fix For: 1.2.0
>
>
> Some methods in KafkaZkClient have no tests at the moment and we need to fix 
> that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6111) Tests for KafkaZkClient

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383007#comment-16383007
 ] 

ASF GitHub Bot commented on KAFKA-6111:
---

junrao opened a new pull request #4635: Revert "KAFKA-6111: Improve test 
coverage of KafkaZkClient, fix bugs found by new tests"
URL: https://github.com/apache/kafka/pull/4635
 
 
   Reverts apache/kafka#4596


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tests for KafkaZkClient
> ---
>
> Key: KAFKA-6111
> URL: https://issues.apache.org/jira/browse/KAFKA-6111
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Sandor Murakozi
>Priority: Major
> Fix For: 1.2.0
>
>
> Some methods in KafkaZkClient have no tests at the moment and we need to fix 
> that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4936) Allow dynamic routing of output records

2018-03-01 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383038#comment-16383038
 ] 

Matthias J. Sax commented on KAFKA-4936:


If you provide an unknown topic name for an output topic, the topic with either 
be created if auto-topic create is enabled, or a runtime error occurs. 
StreamsBuilder does not check the existence of topics.

I tend to agree, that auto.topic create is a "tricky feature". The main 
question is the following: if we argue, that auto topic creation should not be 
used, this implies that all output topic must be know in advance because users 
need to create them. And thus, users can also use the {{branch()}} operator and 
don't need "auto routing" in the first place. Thus, it seems that people asking 
for this feature actually assume that auto-topic create is enabled – otherwise 
I cannot explain why one would need this feature?

I am still not convinced about the overall story / need for this. We also would 
need a KIP and a discussion on the mailing list. Thus, it might make more sense 
to do this discussion first, before starting to write the code – we might 
decide that we don't want this for the mentioned reasons. When we go the 
feature request, we just create the Jira to track it – but in retrospective, I 
am not sure about it anymore.

> Allow dynamic routing of output records
> ---
>
> Key: KAFKA-4936
> URL: https://issues.apache.org/jira/browse/KAFKA-4936
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Currently, all used output topics must be know beforehand, and thus, it's not 
> possible to send output records to topic in a dynamic fashion.
> There have been couple of request for this feature and we should consider 
> adding it. There are many open questions however, with regard to topic 
> creation and configuration (replication factor, number of partitions) etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6595) Kafka connect commit offset incorrectly.

2018-03-01 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383062#comment-16383062
 ] 

huxihx commented on KAFKA-6595:
---

Could we simply cancel the flush if we find another thread has already 
initiated a flushing?

> Kafka connect commit offset incorrectly.
> 
>
> Key: KAFKA-6595
> URL: https://issues.apache.org/jira/browse/KAFKA-6595
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Hanlin Liu
>Priority: Major
>
> Version: ConfluentPlatform Kafka 3.2.0
> SourceTaskOffsetComitter calls commitOffset() and waits for all incomplete 
> records to be sent. While the task is stopped, commitOffset() is called again 
> by the final block in WorkerSourceTask.execute(), it will throw {{Invalid 
> call to OffsetStorageWriter flush() while already flushing, the framework 
> should not allow this}} exception. This will trigger closing Producer without 
> waiting the flush timeout.
> After 30 seconds, all incomplete records has been forcefully aborted. If the 
> {{offset.flush.timeout.ms}} is configured larger than 30 seconds, 
> WorkerSourceTask will consider those aborted records as sent within flush 
> timeout, which results in incorrectly flushing the source offset.
>  
> {code:java}
> // code placeholder
> 2018-02-27 02:59:33,134 INFO  [] Stopping connector 
> dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:254]
> 2018-02-27 02:59:33,134 INFO  [] Stopped connector 
> dp-sqlserver-connector-dptask_455   [pool-3-thread-6][Worker.java:264]
> 2018-02-27 02:59:34,121 ERROR [] Invalid call to OffsetStorageWriter flush() 
> while already flushing, the framework should not allow this   
> [pool-1-thread-13][OffsetStorageWriter.java:110]
> 2018-02-27 02:59:34,121 ERROR [] Task dp-sqlserver-connector-dptask_455-0 
> threw an uncaught and unrecoverable exception   
> [pool-1-thread-13][WorkerTask.java:141]
> org.apache.kafka.connect.errors.ConnectException: OffsetStorageWriter is 
> already flushing
>         at 
> org.apache.kafka.connect.storage.OffsetStorageWriter.beginFlush(OffsetStorageWriter.java:112)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:294)
>         at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:177)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
>         at 
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> 2018-02-27 03:00:00,734 ERROR [] Graceful stop of task 
> dp-sqlserver-connector-dptask_455-0 failed.   
> [pool-3-thread-1][Worker.java:405]
> 2018-02-27 03:00:04,126 INFO  [] Proceeding to force close the producer since 
> pending requests could not be completed within timeout 30 ms.   
> [pool-1-thread-13][KafkaProducer.java:713]
> 2018-02-27 03:00:04,127 ERROR [] dp-sqlserver-connector-dptask_455-0 failed 
> to send record to dptask_455.JF_TEST_11.jf_test_tab_8: {}   
> [kafka-producer-network-thread | producer-31][WorkerSourceTask.java:228]
> java.lang.IllegalStateException: Producer is closed forcefully.
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
>         at java.lang.Thread.run(Thread.java:745)
> 2018-02-27 03:00:09,920 INFO  [] Finished 
> WorkerSourceTask{id=dp-sqlserver-connector-dptask_455-0} commitOffsets 
> successfully in 47088 ms   [pool-4-thread-1][WorkerSourceTask.java:371]
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`

2018-03-01 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx resolved KAFKA-6592.
---
Resolution: Duplicate

Seems it's a duplicate of 
[KAFKA-4831|https://issues.apache.org/jira/browse/KAFKA-4831]

> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> -
>
> Key: KAFKA-6592
> URL: https://issues.apache.org/jira/browse/KAFKA-6592
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`

2018-03-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6592:
---
Component/s: streams

> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> -
>
> Key: KAFKA-6592
> URL: https://issues.apache.org/jira/browse/KAFKA-6592
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 1.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5891) Cast transformation fails if record schema contains timestamp field

2018-03-01 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-5891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383227#comment-16383227
 ] 

Maciej Bryński commented on KAFKA-5891:
---

[~broartem]
Could you test my patch ?

> Cast transformation fails if record schema contains timestamp field
> ---
>
> Key: KAFKA-5891
> URL: https://issues.apache.org/jira/browse/KAFKA-5891
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Artem Plotnikov
>Priority: Major
>
> I have the following simple type cast transformation:
> {code}
> name=postgresql-source-simple
> connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
> tasks.max=1
> connection.url=jdbc:postgresql://localhost:5432/testdb?user=postgres&password=mysecretpassword
> query=SELECT 1::INT as a, '2017-09-14 10:23:54'::TIMESTAMP as b
> transforms=Cast
> transforms.Cast.type=org.apache.kafka.connect.transforms.Cast$Value
> transforms.Cast.spec=a:boolean
> mode=bulk
> topic.prefix=clients
> {code}
> Which fails with the following exception in runtime:
> {code}
> [2017-09-14 16:51:01,885] ERROR Task postgresql-source-simple-0 threw an 
> uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:148)
> org.apache.kafka.connect.errors.DataException: Invalid Java object for schema 
> type INT64: class java.sql.Timestamp for field: "null"
>   at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:239)
>   at 
> org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:209)
>   at org.apache.kafka.connect.data.Struct.put(Struct.java:214)
>   at 
> org.apache.kafka.connect.transforms.Cast.applyWithSchema(Cast.java:152)
>   at org.apache.kafka.connect.transforms.Cast.apply(Cast.java:108)
>   at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:190)
>   at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
>   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> If I remove the  transforms.* part of the connector it will work correctly. 
> Actually, it doesn't really matter which types I use in the transformation 
> for field 'a', just the existence of a timestamp field brings the exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6054:
---
Labels: needs-kip  (was: )

> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -
>
> Key: KAFKA-6054
> URL: https://issues.apache.org/jira/browse/KAFKA-6054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: James Cheng
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running. 
> We observed the following stack trace:
> {code}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, 
> because the internal version number of the protocol changed when adding 
> Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-01 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-6054:
--

Assignee: Matthias J. Sax

> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -
>
> Key: KAFKA-6054
> URL: https://issues.apache.org/jira/browse/KAFKA-6054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: James Cheng
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running. 
> We observed the following stack trace:
> {code}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, 
> because the internal version number of the protocol changed when adding 
> Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-01 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383259#comment-16383259
 ] 

Matthias J. Sax commented on KAFKA-6054:


We need to change the metadata version again for KAFKA-3522 and plan to 
piggyback a fix for this into this KIP.

> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -
>
> Key: KAFKA-6054
> URL: https://issues.apache.org/jira/browse/KAFKA-6054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: James Cheng
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running. 
> We observed the following stack trace:
> {code}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, 
> because the internal version number of the protocol changed when adding 
> Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1

2018-03-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383292#comment-16383292
 ] 

ASF GitHub Bot commented on KAFKA-6054:
---

mjsax opened a new pull request #4636: [WIP] KAFKA-6054: Fix Kafka Streams 
upgrade path for v0.10.0
URL: https://github.com/apache/kafka/pull/4636
 
 
   Fixes the upgrade path from 0.10.0.x to 0.10.1.x+
   Contained in KIP-258
   Adds system tests for rolling bounce upgrades.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -
>
> Key: KAFKA-6054
> URL: https://issues.apache.org/jira/browse/KAFKA-6054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1
>Reporter: James Cheng
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running. 
> We observed the following stack trace:
> {code}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
> at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
> at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
> 
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 0.10.2.1 ins