[jira] [Commented] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290670#comment-16290670
 ] 

ASF GitHub Bot commented on KAFKA-6360:
---

GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/4324

KAFKA-6360: Clear RocksDB Segments when store is closed 

Now that we support re-initializing state stores, we need to clear the 
segments when the store is closed so that they can be re-opened.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-6360

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4324


commit 4c84af7522d847f82b14e5b3b4c589b0223a5bd8
Author: Damian Guy 
Date:   2017-12-14T10:13:44Z

clear segments on close




> RocksDB segments not removed when store is closed causes re-initialization to 
> fail
> --
>
> Key: KAFKA-6360
> URL: https://issues.apache.org/jira/browse/KAFKA-6360
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Blocker
> Fix For: 1.1.0
>
>
> When a store is re-initialized it is first closed, before it is opened again. 
> When this happens the segments in the {{Segments}} class are closed, but they 
> are not removed from the list of segments. So when the store is 
> re-initialized the old closed segments are used. This results in:
> {code}
> [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] 
> task [1_3] Failed to flush state store 
> KSTREAM-AGGREGATE-STATE-STORE-24:  
> (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5123) Refactor ZkUtils readData* methods

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290704#comment-16290704
 ] 

ASF GitHub Bot commented on KAFKA-5123:
---

Github user baluchicken closed the pull request at:

https://github.com/apache/kafka/pull/3554


> Refactor ZkUtils readData* methods 
> ---
>
> Key: KAFKA-5123
> URL: https://issues.apache.org/jira/browse/KAFKA-5123
> Project: Kafka
>  Issue Type: Bug
>Reporter: Balint Molnar
>Assignee: Balint Molnar
>Priority: Minor
> Fix For: 1.1.0
>
>
> Usually only the data value is required but every readData method in the 
> ZkUtils returns a Tuple with the data and the stat.
> https://github.com/apache/kafka/pull/2888



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5123) Refactor ZkUtils readData* methods

2017-12-14 Thread Balint Molnar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Balint Molnar resolved KAFKA-5123.
--
Resolution: Won't Fix

> Refactor ZkUtils readData* methods 
> ---
>
> Key: KAFKA-5123
> URL: https://issues.apache.org/jira/browse/KAFKA-5123
> Project: Kafka
>  Issue Type: Bug
>Reporter: Balint Molnar
>Assignee: Balint Molnar
>Priority: Minor
> Fix For: 1.1.0
>
>
> Usually only the data value is required but every readData method in the 
> ZkUtils returns a Tuple with the data and the stat.
> https://github.com/apache/kafka/pull/2888



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower

2017-12-14 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6361:
---
Labels: reliability  (was: )

> Fast leader fail over can lead to log divergence between leader and follower
> 
>
> Key: KAFKA-6361
> URL: https://issues.apache.org/jira/browse/KAFKA-6361
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>  Labels: reliability
>
> We have observed an edge case in the replication failover logic which can 
> cause a replica to permanently fall out of sync with the leader or, in the 
> worst case, actually have localized divergence between logs. This occurs in 
> spite of the improved truncation logic from KIP-101. 
> Suppose we have brokers A and B. Initially A is the leader in epoch 1. It 
> appends two batches: one in the range (0, 10) and the other in the range (11, 
> 20). The first one successfully replicates to B, but the second one does not. 
> In other words, the logs on the brokers look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> {code}
> Broker A then has a zk session expiration and broker B is elected with epoch 
> 2. It appends a new batch with offsets (11, n) to its local log. So we now 
> have this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Normally we expect broker A to truncate to offset 11 on becoming the 
> follower, but before it is able to do so, broker B has its own zk session 
> expiration and broker A again becomes leader, now with epoch 3. It then 
> appends a new entry in the range (21, 30). The updated logs look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> 2: offsets: [21, 30], leader epoch: 3
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Now what happens next depends on the last offset of the batch appended in 
> epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch 
> request to broker A with epoch 2. Broker A will respond that epoch 2 ends at 
> offset 21. There are three cases:
> 1) n < 20: In this case, broker B will not do any truncation. It will begin 
> fetching from offset n, which will ultimately cause an out of order offset 
> error because broker A will return the full batch beginning from offset 11 
> which broker B will be unable to append.
> 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 
> and everything will appear fine though the logs have actually diverged.
> 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in 
> the middle of the batch, it will truncate all the way to offset 10. It can 
> begin fetching from offset 11 and everything is fine.
> The case we have actually seen is the first one. The second one would likely 
> go unnoticed in practice and everything is fine in the third case. To 
> workaround the issue, we deleted the active segment on the replica which 
> allowed it to re-replicate consistently from the leader.
> I'm not sure the best solution for this scenario. Maybe if the leader isn't 
> aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} 
> instead of using the offset of the next highest epoch. That would cause the 
> follower to truncate using its high watermark. Or perhaps instead of doing 
> so, it could send another OffsetForLeaderEpoch request at the next previous 
> cached epoch and then truncate using that. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6199) Single broker with fast growing heap usage

2017-12-14 Thread Robin Tweedie (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robin Tweedie updated KAFKA-6199:
-
Attachment: jstack-2017-12-08.scrubbed.out

Apologies for the delay in uploading this, I took a snapshot before restarting 
the broker on the 8th.

I've scrubbed out a few IP addresses and hostnames as {{__SCRUBBED__}}.

> Single broker with fast growing heap usage
> --
>
> Key: KAFKA-6199
> URL: https://issues.apache.org/jira/browse/KAFKA-6199
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1
> Environment: Amazon Linux
>Reporter: Robin Tweedie
> Attachments: Screen Shot 2017-11-10 at 1.55.33 PM.png, Screen Shot 
> 2017-11-10 at 11.59.06 AM.png, dominator_tree.png, histo_live.txt, 
> histo_live_20171206.txt, histo_live_80.txt, jstack-2017-12-08.scrubbed.out, 
> merge_shortest_paths.png, path2gc.png
>
>
> We have a single broker in our cluster of 25 with fast growing heap usage 
> which necessitates us restarting it every 12 hours. If we don't restart the 
> broker, it becomes very slow from long GC pauses and eventually has 
> {{OutOfMemory}} errors.
> See {{Screen Shot 2017-11-10 at 11.59.06 AM.png}} for a graph of heap usage 
> percentage on the broker. A "normal" broker in the same cluster stays below 
> 50% (averaged) over the same time period.
> We have taken heap dumps when the broker's heap usage is getting dangerously 
> high, and there are a lot of retained {{NetworkSend}} objects referencing 
> byte buffers.
> We also noticed that the single affected broker logs a lot more of this kind 
> of warning than any other broker:
> {noformat}
> WARN Attempting to send response via channel for which there is no open 
> connection, connection id 13 (kafka.network.Processor)
> {noformat}
> See {{Screen Shot 2017-11-10 at 1.55.33 PM.png}} for counts of that WARN log 
> message visualized across all the brokers (to show it happens a bit on other 
> brokers, but not nearly as much as it does on the "bad" broker).
> I can't make the heap dumps public, but would appreciate advice on how to pin 
> down the problem better. We're currently trying to narrow it down to a 
> particular client, but without much success so far.
> Let me know what else I could investigate or share to track down the source 
> of this leak.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2017-12-14 Thread Renkai Ge (JIRA)
Renkai Ge created KAFKA-6362:


 Summary: auto commit not work since coordinatorUnknown() is always 
true.
 Key: KAFKA-6362
 URL: https://issues.apache.org/jira/browse/KAFKA-6362
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.2.1
Reporter: Renkai Ge



{code}
[2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
source_bj-docker-large (14/40)] INFO  
org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
11.192.73.66:3002]
check.crcs = true
client.id = 
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = tcprtdetail_flink
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 1
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

[2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
source_bj-docker-large (14/40)] INFO  
org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
[2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
source_bj-docker-large (14/40)] INFO  
org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799
{code}

My kafka java client cannot auto commit.After add some debug log,I found that 
the coordinatorUnknown() function in 
[ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
 always returns true,and nextAutoCommitDeadline just increases infinitly.Should 
there be a lookupCoordinator() after line 604 like in 
[ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508].After
 I add lookupCoordinator() next to line 604.The consumer can auto commit offset 
properly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2017-12-14 Thread Renkai Ge (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renkai Ge updated KAFKA-6362:
-
Description: 
{code}
[2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
source_bj-docker-large (14/40)] INFO  
org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
11.192.73.66:3002]
check.crcs = true
client.id = 
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = tcprtdetail_flink
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 1
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

[2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
source_bj-docker-large (14/40)] INFO  
org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
[2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
source_bj-docker-large (14/40)] INFO  
org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799
{code}

My kafka java client cannot auto commit.After add some debug log,I found that 
the coordinatorUnknown() function in 
[ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
 always returns true,and nextAutoCommitDeadline just increases infinitly.Should 
there be a lookupCoordinator() after line 604 like in 
[ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After
 I add lookupCoordinator() next to line 604.The consumer can auto commit offset 
properly.

  was:

{code}
[2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
source_bj-docker-large (14/40)] INFO  
org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
11.192.73.66:3002]
check.crcs = true
client.id = 
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = tcprtdetail_flink
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000

[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception

2017-12-14 Thread Rob Gevers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290783#comment-16290783
 ] 

Rob Gevers commented on KAFKA-6260:
---

We ran into this issue as well. In our situation the consumer was fine until 
something slowed down processing and caused us to pause consumption. Our config 
is very different, though. We are using the default request.timeout.ms which is 
10 seconds, but our fetch.max.wait.ms is set to 1 second. That seems like a 
pretty significant margin so i'm not sure that the two configs being too close 
together is the problem for us. Once our consumers were blocked the issue was 
persistent and created a lot of problems since the NPE appears to have dropped 
that batch of messages but subsequent attempts still succeeded, creating out of 
order processing. I still have some more investigation to do but I think it 
will prevent us from using the 1.0.0 release as is.

> AbstractCoordinator not clearly handles NULL Exception
> --
>
> Key: KAFKA-6260
> URL: https://issues.apache.org/jira/browse/KAFKA-6260
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
> Environment: RedHat Linux
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: Jason Gustafson
> Fix For: 1.1.0, 1.0.1
>
>
> The error reporting is not clear. But it seems that Kafka Heartbeat shuts 
> down application due to NULL exception caused by "fake" disconnections.
> One more comment. We are processing messages in the stream, but sometimes we 
> have to block processing for minutes, as consumers are not handling too much 
> load. Is it possibble that when stream is waiting, then heartbeat is as well 
> blocked?
> Can you check that?
> {code}
> 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Received successful Heartbeat response
> 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Sending Heartbeat request to coordinator 
> cljp01.eb.lan.at:9093 (id: 2147483646 rack: null)
> 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Sending HEARTBEAT 
> {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08}
>  with correlation id 24 to node 2147483646
> 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT 
> with correlation id 24, received {throttle_time_ms=0,error_code=0}
> 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Received successful Heartbeat response
> 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout.
> 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Cancelled request 
> {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]}
>  with correlation id 21 due to node 1 being disconnected
> 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, 
> apiVersion=6, 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  correlationId=21) with correlation id 21 due to node 1 being disconnected
> 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer 
> clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer,
>  groupId=kafka-endpoint] Fetch request 
> {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, 
> maxBytes=10

[jira] [Resolved] (KAFKA-6350) File descriptors leak with persistent KeyValueStore

2017-12-14 Thread Alin Gheorghe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alin Gheorghe resolved KAFKA-6350.
--
Resolution: Not A Bug

> File descriptors leak with persistent KeyValueStore
> ---
>
> Key: KAFKA-6350
> URL: https://issues.apache.org/jira/browse/KAFKA-6350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 1.0.0
>Reporter: Alin Gheorghe
>
> When using the low level processor API with persistent KV stores we observed 
> continuous increase in the number of SSTs on disk. The file descriptors 
> remain open until reaching the configured OS limit (100k in our case), when 
> Kafka Streams crashes with "Too many open files" exception. In our case this 
> happens regularly in about 17 hours of uptime. The commit interval is set to 
> 5 seconds and we never call it from our code.
> Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink 
> topics. Retention policy is set to 2 days and the topics have 25 partitions.
> Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup 
> every 30 seconds which checks for keys that have not been updated for at 
> least 20 minutes. The KV stores hold temporary user sessions which last for 5 
> minutes and have about 50 updates (user actions).
> 2017-12-11 10:57:03 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 54
> {code}
> 2017-12-11 11:45:31 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 6742
> {code}
> We use the following state store APIs: *all*, *get*, *delete*, *put*.
> When switching to in memory state stores this obviously doesn't happen.
> We have also tried to override the RocksDB parameter to *max_open_files* 
> which defaults to -1, but the configured values seems to be ignored and 
> RocksDB surpasses that threshold. 
> Sometimes the application crashes with different error which may or may not 
> be related. We will file a different Jira issue if it seems unrelated:
> {code:none}
> RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception!
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN  
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have 
> died. The instance will be in error state and should be closed.
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR 
> com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task 
> [0_257] Failed to flush state store eventQueueStore. Shutting down the entire 
> Kafka Streams process
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed 
> to flush state store eventQueueStore
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> Caused by: java.lang.IllegalArgumentException: Illegal value provided for 
> SubCode.
>   at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
>   at org.rocksdb.Status.(Status.java:30)
>   at org.rocksdb.RocksDB.flush(Native Method)
>   at org.rocksdb.RocksDB.flush(RocksDB.java:1743)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:435)
>   at 
> org.apache.kafka.streams.

[jira] [Commented] (KAFKA-6350) File descriptors leak with persistent KeyValueStore

2017-12-14 Thread Alin Gheorghe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290818#comment-16290818
 ] 

Alin Gheorghe commented on KAFKA-6350:
--

Hi [~damianguy]! Thank you for your prompt response! Indeed, that was the 
problem. We ran a longevity test and it seems to not leak file descriptors 
anymore. Thank you very much!

> File descriptors leak with persistent KeyValueStore
> ---
>
> Key: KAFKA-6350
> URL: https://issues.apache.org/jira/browse/KAFKA-6350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 1.0.0
>Reporter: Alin Gheorghe
>
> When using the low level processor API with persistent KV stores we observed 
> continuous increase in the number of SSTs on disk. The file descriptors 
> remain open until reaching the configured OS limit (100k in our case), when 
> Kafka Streams crashes with "Too many open files" exception. In our case this 
> happens regularly in about 17 hours of uptime. The commit interval is set to 
> 5 seconds and we never call it from our code.
> Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink 
> topics. Retention policy is set to 2 days and the topics have 25 partitions.
> Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup 
> every 30 seconds which checks for keys that have not been updated for at 
> least 20 minutes. The KV stores hold temporary user sessions which last for 5 
> minutes and have about 50 updates (user actions).
> 2017-12-11 10:57:03 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 54
> {code}
> 2017-12-11 11:45:31 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 6742
> {code}
> We use the following state store APIs: *all*, *get*, *delete*, *put*.
> When switching to in memory state stores this obviously doesn't happen.
> We have also tried to override the RocksDB parameter to *max_open_files* 
> which defaults to -1, but the configured values seems to be ignored and 
> RocksDB surpasses that threshold. 
> Sometimes the application crashes with different error which may or may not 
> be related. We will file a different Jira issue if it seems unrelated:
> {code:none}
> RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception!
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN  
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have 
> died. The instance will be in error state and should be closed.
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR 
> com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task 
> [0_257] Failed to flush state store eventQueueStore. Shutting down the entire 
> Kafka Streams process
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed 
> to flush state store eventQueueStore
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> Caused by: java.lang.IllegalArgumentException: Illegal value provided for 
> SubCode.
>   at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
>   at org.rocksdb.Status.(Status.java:30)
>   at org.rocksdb.RocksDB.flush(Nati

[jira] [Commented] (KAFKA-6350) File descriptors leak with persistent KeyValueStore

2017-12-14 Thread Alin Gheorghe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290823#comment-16290823
 ] 

Alin Gheorghe commented on KAFKA-6350:
--

P.S.: We haven’t been able to reproduce the flush-related exception in the 
issue description either.

> File descriptors leak with persistent KeyValueStore
> ---
>
> Key: KAFKA-6350
> URL: https://issues.apache.org/jira/browse/KAFKA-6350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 1.0.0
>Reporter: Alin Gheorghe
>
> When using the low level processor API with persistent KV stores we observed 
> continuous increase in the number of SSTs on disk. The file descriptors 
> remain open until reaching the configured OS limit (100k in our case), when 
> Kafka Streams crashes with "Too many open files" exception. In our case this 
> happens regularly in about 17 hours of uptime. The commit interval is set to 
> 5 seconds and we never call it from our code.
> Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink 
> topics. Retention policy is set to 2 days and the topics have 25 partitions.
> Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup 
> every 30 seconds which checks for keys that have not been updated for at 
> least 20 minutes. The KV stores hold temporary user sessions which last for 5 
> minutes and have about 50 updates (user actions).
> 2017-12-11 10:57:03 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 54
> {code}
> 2017-12-11 11:45:31 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 6742
> {code}
> We use the following state store APIs: *all*, *get*, *delete*, *put*.
> When switching to in memory state stores this obviously doesn't happen.
> We have also tried to override the RocksDB parameter to *max_open_files* 
> which defaults to -1, but the configured values seems to be ignored and 
> RocksDB surpasses that threshold. 
> Sometimes the application crashes with different error which may or may not 
> be related. We will file a different Jira issue if it seems unrelated:
> {code:none}
> RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception!
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN  
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have 
> died. The instance will be in error state and should be closed.
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR 
> com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task 
> [0_257] Failed to flush state store eventQueueStore. Shutting down the entire 
> Kafka Streams process
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed 
> to flush state store eventQueueStore
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> Caused by: java.lang.IllegalArgumentException: Illegal value provided for 
> SubCode.
>   at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
>   at org.rocksdb.Status.(Status.java:30)
>   at org.rocksdb.RocksDB.flush(Native Method)
>   at org.rocksdb.RocksDB.flush(RocksDB.java:1743)
>   at 

[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.

2017-12-14 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291119#comment-16291119
 ] 

Randall Hauch commented on KAFKA-6252:
--

No, I don't think there are any workarounds, other than restarting the worker. 
Of course the best workaround would be for the connectors to correct their 
behavior, but I understand that the framework should be more tolerant of such 
implementations where it can.

> A metric named 'XX' already exists, can't register another one.
> ---
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
> Environment: Linux
>Reporter: Alexis Sellier
>Priority: Critical
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>   at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>   at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328)
>   at 
> org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69)
>   at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98)
>   at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>   at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail

2017-12-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6360.
--
Resolution: Fixed

Issue resolved by pull request 4324
[https://github.com/apache/kafka/pull/4324]

> RocksDB segments not removed when store is closed causes re-initialization to 
> fail
> --
>
> Key: KAFKA-6360
> URL: https://issues.apache.org/jira/browse/KAFKA-6360
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Blocker
> Fix For: 1.1.0
>
>
> When a store is re-initialized it is first closed, before it is opened again. 
> When this happens the segments in the {{Segments}} class are closed, but they 
> are not removed from the list of segments. So when the store is 
> re-initialized the old closed segments are used. This results in:
> {code}
> [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] 
> task [1_3] Failed to flush state store 
> KSTREAM-AGGREGATE-STATE-STORE-24:  
> (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291263#comment-16291263
 ] 

ASF GitHub Bot commented on KAFKA-6360:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4324


> RocksDB segments not removed when store is closed causes re-initialization to 
> fail
> --
>
> Key: KAFKA-6360
> URL: https://issues.apache.org/jira/browse/KAFKA-6360
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Blocker
> Fix For: 1.1.0
>
>
> When a store is re-initialized it is first closed, before it is opened again. 
> When this happens the segments in the {{Segments}} class are closed, but they 
> are not removed from the list of segments. So when the store is 
> re-initialized the old closed segments are used. This results in:
> {code}
> [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] 
> task [1_3] Failed to flush state store 
> KSTREAM-AGGREGATE-STATE-STORE-24:  
> (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient

2017-12-14 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6363:


 Summary: Use MockAdminClient for any unit tests that depend on 
AdminClient
 Key: KAFKA-6363
 URL: https://issues.apache.org/jira/browse/KAFKA-6363
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


Today we have a few unit tests other than KafkaAdminClientTest that relies on 
MockKafkaAdminClientEnv.

About this class and MockKafkaAdminClientEnv, my thoughts:

1. MockKafkaAdminClientEnv is actually using a MockClient for the inner 
KafkaClient; it should be only used for the unit test of KafkaAdminClient 
itself.

2. For any other unit tests on classes that depend on AdminClient, we should be 
using the MockAdminClient that mocks the whole AdminClient.

So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in 
KafkaAdminClientTest use MockClient and added a new static constructor that 
takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient

2017-12-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-6363:
-
Labels: newbie  (was: )

> Use MockAdminClient for any unit tests that depend on AdminClient
> -
>
> Key: KAFKA-6363
> URL: https://issues.apache.org/jira/browse/KAFKA-6363
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>  Labels: newbie
>
> Today we have a few unit tests other than KafkaAdminClientTest that relies on 
> MockKafkaAdminClientEnv.
> About this class and MockKafkaAdminClientEnv, my thoughts:
> 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner 
> KafkaClient; it should be only used for the unit test of KafkaAdminClient 
> itself.
> 2. For any other unit tests on classes that depend on AdminClient, we should 
> be using the MockAdminClient that mocks the whole AdminClient.
> So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in 
> KafkaAdminClientTest use MockClient and added a new static constructor that 
> takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient

2017-12-14 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291310#comment-16291310
 ] 

Guozhang Wang commented on KAFKA-6363:
--

>From the previous discussion:

{code}
cmccabe 3 days ago  Contributor
Reposting some of the stuff I said offline here:

It's great that we now have a MockAdminClient. This is something we discussed 
having before, but never got around to doing. As @guozhangwang said, 
MockKafkaAdminClientEnv is really just useful for testing KafkaAdminClient.

I don't completely understand the proposal for removing MockKafkaAdminClientEnv 
-- it still seems useful for KafkaAdminClientTest-- but I'm probably missing 
something here. Is the idea to make it a static class in KafkaAdminClientTest? 
That might be good.


guozhangwang a minute ago  Contributor
@cmccabe We could consider using a MockClient and added a new static 
constructor that takes a KafkaClient for KafkaAdminClientTest; this is 
basically what MockKafkaAdminClientEnv does.
{code}

> Use MockAdminClient for any unit tests that depend on AdminClient
> -
>
> Key: KAFKA-6363
> URL: https://issues.apache.org/jira/browse/KAFKA-6363
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>  Labels: newbie
>
> Today we have a few unit tests other than KafkaAdminClientTest that relies on 
> MockKafkaAdminClientEnv.
> About this class and MockKafkaAdminClientEnv, my thoughts:
> 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner 
> KafkaClient; it should be only used for the unit test of KafkaAdminClient 
> itself.
> 2. For any other unit tests on classes that depend on AdminClient, we should 
> be using the MockAdminClient that mocks the whole AdminClient.
> So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in 
> KafkaAdminClientTest use MockClient and added a new static constructor that 
> takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6364) Add Second Check for End Offset During Restore

2017-12-14 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-6364:
--

 Summary: Add Second Check for End Offset During Restore
 Key: KAFKA-6364
 URL: https://issues.apache.org/jira/browse/KAFKA-6364
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.0.0
Reporter: Bill Bejeck
Assignee: Bill Bejeck
 Fix For: 1.0.1


We need to re-check the ending offset when restoring a changelog topic to guard 
against the race condition of an additional record appended to log immediately 
on restoring start.  Also, need to add a check for KTable source topic and if 
offset limit is set.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-12-14 Thread Victor Chicu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291456#comment-16291456
 ] 

Victor Chicu commented on KAFKA-5696:
-

Any news? I have the same issue in case of rebalance if task is killed before 
offsets are committed Kafka Connect does not commit on stop partitions and 
offsets from source record to the OffsetStorage. In this case the connector 
take once again the same records from external storage and hence duplicate the 
records in the topic. :(

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-12-14 Thread Victor Chicu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291456#comment-16291456
 ] 

Victor Chicu edited comment on KAFKA-5696 at 12/14/17 7:42 PM:
---

Any news? I have the same issue in case of rebalance if task is killed before 
offsets are committed Kafka Connect does not commit on stop partitions and 
offsets from source record to the OffsetStorage. In this case the any of kind 
of source connector takes once again the same data from external storage and 
hence duplicate the records in the topic. :(


was (Author: loopnotzero):
Any news? I have the same issue in case of rebalance if task is killed before 
offsets are committed Kafka Connect does not commit on stop partitions and 
offsets from source record to the OffsetStorage. In this case the connector 
take once again the same records from external storage and hence duplicate the 
records in the topic. :(

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-12-14 Thread Victor Chicu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291456#comment-16291456
 ] 

Victor Chicu edited comment on KAFKA-5696 at 12/14/17 7:42 PM:
---

Any news? I have the same issue in case of rebalance if task is killed before 
offsets are committed Kafka Connect does not commit on stop partitions and 
offsets from source record to the OffsetStorage. In this case the any kind of 
source connectors takes once again the same data from external storage and 
hence duplicate the records in the topic. :(


was (Author: loopnotzero):
Any news? I have the same issue in case of rebalance if task is killed before 
offsets are committed Kafka Connect does not commit on stop partitions and 
offsets from source record to the OffsetStorage. In this case the any of kind 
of source connector takes once again the same data from external storage and 
hence duplicate the records in the topic. :(

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Issue Comment Deleted] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-12-14 Thread Victor Chicu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victor Chicu updated KAFKA-5696:

Comment: was deleted

(was: Any news? I have the same issue in case of rebalance if task is killed 
before offsets are committed Kafka Connect does not commit on stop partitions 
and offsets from source record to the OffsetStorage. In this case the any kind 
of source connectors takes once again the same data from external storage and 
hence duplicate the records in the topic. :()

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-12-14 Thread Victor Chicu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291466#comment-16291466
 ] 

Victor Chicu commented on KAFKA-5696:
-

AI faced the same issue in the following casny

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5722) Refactor ConfigCommand to use the AdminClient

2017-12-14 Thread Viktor Somogyi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291488#comment-16291488
 ] 

Viktor Somogyi commented on KAFKA-5722:
---

[~rsivaram] just wanted to update you, might publish it only next week, takes a 
bit more work than anticipated :).

> Refactor ConfigCommand to use the AdminClient
> -
>
> Key: KAFKA-5722
> URL: https://issues.apache.org/jira/browse/KAFKA-5722
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>  Labels: kip, needs-kip
> Fix For: 1.1.0
>
>
> The ConfigCommand currently uses a direct connection to zookeeper. The 
> zookeeper dependency should be deprecated and an AdminClient API created to 
> be used instead.
> This change requires a KIP.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6365) How to add a client to list of available clients?

2017-12-14 Thread Lev Gorodinski (JIRA)
Lev Gorodinski created KAFKA-6365:
-

 Summary: How to add a client to list of available clients?
 Key: KAFKA-6365
 URL: https://issues.apache.org/jira/browse/KAFKA-6365
 Project: Kafka
  Issue Type: Wish
Reporter: Lev Gorodinski
Priority: Trivial


I'd like to add a client to: 
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET

The client is: https://github.com/jet/kafunk

.NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-12-14 Thread Victor Chicu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291466#comment-16291466
 ] 

Victor Chicu edited comment on KAFKA-5696 at 12/14/17 8:07 PM:
---

Any news? I faced the same issue with JDBC connector and I think this problem 
can be reproduced not only with JDBC.
To reproduce the problem you can start the connector with SELECT TOP 1 what 
will produce 1 record to the topic and right after trough REST API restart the 
connector task before offsets are going to be committed by hitting the offset 
flush interval.
In this case before rebalancing on the task stop methods Kafka Connect does not 
commit offsets to the Kafka Node and even to the offsets storage.
When task going up the JDBC connector trying again to query data from the 
database and it's gave again the same record that cause duplication data in the 
topic.
This is because OffsetStorageReader does not contains last stored offset cause 
on rebalacing it was lost..


was (Author: loopnotzero):
AI faced the same issue in the following casny

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6365) How to add a client to list of available clients?

2017-12-14 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291524#comment-16291524
 ] 

Jason Gustafson commented on KAFKA-6365:


[~eulerfx] Do you have a username for https://cwiki.apache.org? If so, I can 
give you edit access and you can add your client.

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5696) SourceConnector does not commit offset on rebalance

2017-12-14 Thread Victor Chicu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291466#comment-16291466
 ] 

Victor Chicu edited comment on KAFKA-5696 at 12/14/17 8:11 PM:
---

Any news? I faced the same issue with JDBC connector and I think this problem 
can be reproduced not only with JDBC. To reproduce the problem you can start 
the connector with SELECT TOP 1 e.t.c. what will produce 1 record to the topic 
and after that try to request trough REST API restarting the connector task 
before offsets are going to be committed by hitting the offset flush interval.
In this case before rebalancing on the task stop methods Kafka Connect does not 
commit offsets to the Kafka Node and even to the offsets storage.
When task going up the JDBC connector trying again to query data from the 
database and it's gave again the same record that cause duplication data in the 
topic.
This is because OffsetStorageReader does not contains last stored offset cause 
on rebalacing it was lost..


was (Author: loopnotzero):
Any news? I faced the same issue with JDBC connector and I think this problem 
can be reproduced not only with JDBC.
To reproduce the problem you can start the connector with SELECT TOP 1 what 
will produce 1 record to the topic and right after trough REST API restart the 
connector task before offsets are going to be committed by hitting the offset 
flush interval.
In this case before rebalancing on the task stop methods Kafka Connect does not 
commit offsets to the Kafka Node and even to the offsets storage.
When task going up the JDBC connector trying again to query data from the 
database and it's gave again the same record that cause duplication data in the 
topic.
This is because OffsetStorageReader does not contains last stored offset cause 
on rebalacing it was lost..

> SourceConnector does not commit offset on rebalance
> ---
>
> Key: KAFKA-5696
> URL: https://issues.apache.org/jira/browse/KAFKA-5696
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleg Kuznetsov
>Priority: Critical
>  Labels: newbie
>
> I'm running SourceConnector, that reads files from storage and put data in 
> kafka. I want, in case of reconfiguration, offsets to be flushed. 
> Say, a file is completely processed, but source records are not yet committed 
> and in case of reconfiguration their offsets might be missing in store.
> Is it possible to force committing offsets on reconfiguration?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-14 Thread Joerg Heinicke (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291582#comment-16291582
 ] 

Joerg Heinicke commented on KAFKA-6366:
---

Ted Yu already suggested a fix on the mailing list: 
http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6%3DpJObC%2Bi36BkoqbOLTKsQ%3DNrDDv6dM8abfwB5PspLA%40mail.gmail.com%3E

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
> ...
> the following 9 lines are repeated around hundred times.
> ...
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internal

[jira] [Created] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-14 Thread Joerg Heinicke (JIRA)
Joerg Heinicke created KAFKA-6366:
-

 Summary: StackOverflowError in kafka-coordinator-heartbeat-thread
 Key: KAFKA-6366
 URL: https://issues.apache.org/jira/browse/KAFKA-6366
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 1.0.0
Reporter: Joerg Heinicke


With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
once a StackOverflowError in the heartbeat thread occurred due to connectivity 
issues of the consumers to the coordinating broker:

Immediately before the exception there are hundreds, if not thousands of log 
entries of following type:

2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
groupId=my-consumer-group] Marking the coordinator : (id: 
2147483645 rack: null) dead

The exceptions always happen somewhere in the DateFormat code, even 
though at different lines.

2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
my-consumer-group] ERROR - Uncaught exception in thread 
'kafka-coordinator-heartbeat-thread | my-consumer-group':
java.lang.StackOverflowError
 at 
java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
 at 
java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
 at java.util.Calendar.getDisplayName(Calendar.java:2110)
 at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
 at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
 at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
 at java.text.DateFormat.format(DateFormat.java:345)
 at 
org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
 at 
org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
 at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
 at 
org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
 at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
 at 
org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
 at 
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
 at org.apache.log4j.Category.callAppenders(Category.java:206)
 at org.apache.log4j.Category.forcedLog(Category.java:391)
 at org.apache.log4j.Category.log(Category.java:856)
 at 
org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
 at 
org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
...
the following 9 lines are repeated around hundred times.
...
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-14 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6366:
--
Attachment: 6366.v1.txt

First attempt for fixing the stack overflow

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
> Attachments: 6366.v1.txt
>
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
> ...
> the following 9 lines are repeated around hundred times.
> ...
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)

[jira] [Commented] (KAFKA-6365) How to add a client to list of available clients?

2017-12-14 Thread Lev Gorodinski (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291595#comment-16291595
 ] 

Lev Gorodinski commented on KAFKA-6365:
---

[~hachikuji] I do, it is *eulerfx*. Thanks!

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5722) Refactor ConfigCommand to use the AdminClient

2017-12-14 Thread Rajini Sivaram (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291597#comment-16291597
 ] 

Rajini Sivaram commented on KAFKA-5722:
---

[~viktorsomogyi] Thank you for working on this!

> Refactor ConfigCommand to use the AdminClient
> -
>
> Key: KAFKA-5722
> URL: https://issues.apache.org/jira/browse/KAFKA-5722
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Viktor Somogyi
>Assignee: Viktor Somogyi
>  Labels: kip, needs-kip
> Fix For: 1.1.0
>
>
> The ConfigCommand currently uses a direct connection to zookeeper. The 
> zookeeper dependency should be deprecated and an AdminClient API created to 
> be used instead.
> This change requires a KIP.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6365) How to add a client to list of available clients?

2017-12-14 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291615#comment-16291615
 ] 

Jason Gustafson commented on KAFKA-6365:


[~eulerfx] Cool, I've given you permission to edit the page. I'll go ahead and 
resolve this issue. Thanks for contributing!

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6365) How to add a client to list of available clients?

2017-12-14 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6365.

Resolution: Resolved

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-14 Thread Jason Gustafson (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291699#comment-16291699
 ] 

Jason Gustafson commented on KAFKA-6366:


[~joerg.heinicke] Thanks for reporting the issue. I'm puzzling a bit over the 
trace. It looks like {{ConsumerNetworkClient.disconnect()}} should be reentrant 
(I tested locally to be sure). Maybe we're hitting a situation where the 
foreground thread is in a loop trying to add a request which the heartbeat 
thread is stuck cancelling in an infinite recursion? Is there another 
explanation?

> StackOverflowError in kafka-coordinator-heartbeat-thread
> 
>
> Key: KAFKA-6366
> URL: https://issues.apache.org/jira/browse/KAFKA-6366
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Joerg Heinicke
> Attachments: 6366.v1.txt
>
>
> With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
> once a StackOverflowError in the heartbeat thread occurred due to 
> connectivity issues of the consumers to the coordinating broker:
> Immediately before the exception there are hundreds, if not thousands of log 
> entries of following type:
> 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
> groupId=my-consumer-group] Marking the coordinator : (id: 
> 2147483645 rack: null) dead
> The exceptions always happen somewhere in the DateFormat code, even 
> though at different lines.
> 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
> my-consumer-group] ERROR - Uncaught exception in thread 
> 'kafka-coordinator-heartbeat-thread | my-consumer-group':
> java.lang.StackOverflowError
>  at 
> java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
>  at 
> java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
>  at java.util.Calendar.getDisplayName(Calendar.java:2110)
>  at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
>  at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
>  at java.text.DateFormat.format(DateFormat.java:345)
>  at 
> org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
>  at 
> org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
>  at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
>  at 
> org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
>  at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
>  at 
> org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
>  at 
> org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
>  at org.apache.log4j.Category.callAppenders(Category.java:206)
>  at org.apache.log4j.Category.forcedLog(Category.java:391)
>  at org.apache.log4j.Category.log(Category.java:856)
>  at 
> org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
>  at 
> org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
>  at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
> ...
> the following 9 lines are repeated around hundred times.
> ...
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(Abs

[jira] [Created] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset

2017-12-14 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-6367:
--

 Summary: Fix StateRestoreListener To Use Correct Ending Offset
 Key: KAFKA-6367
 URL: https://issues.apache.org/jira/browse/KAFKA-6367
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0, 0.11.0.0
Reporter: Bill Bejeck
 Fix For: 1.0.1


{{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long  
but the {{nextPosition}} is not correct, it should be the offset of the latest 
restored offset, but nextPosition is the offset of the first not restored 
offset.

We can't just automatically use {{nextPosition}} - 1 as this could be a commit 
marker.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset

2017-12-14 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-6367:
---
Description: 
{{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long  
but the {{nextPosition}} is not correct, it should be the offset of the latest 
restored offset, but nextPosition is the offset of the first not restored 
offset.

We can't automatically use {{nextPosition}} - 1 as this could be a commit 
marker.

  was:
{{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long  
but the {{nextPosition}} is not correct, it should be the offset of the latest 
restored offset, but nextPosition is the offset of the first not restored 
offset.

We can't just automatically use {{nextPosition}} - 1 as this could be a commit 
marker.


> Fix StateRestoreListener To Use Correct Ending Offset
> -
>
> Key: KAFKA-6367
> URL: https://issues.apache.org/jira/browse/KAFKA-6367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Bill Bejeck
> Fix For: 1.0.1
>
>
> {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} 
> long  but the {{nextPosition}} is not correct, it should be the offset of the 
> latest restored offset, but nextPosition is the offset of the first not 
> restored offset.
> We can't automatically use {{nextPosition}} - 1 as this could be a commit 
> marker.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset

2017-12-14 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-6367:
---
Description: 
{{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long  
but the {{nextPosition}} is not correct, it should be the offset of the latest 
restored offset, but {{nextPosition}} is the offset of the first not restored 
offset.

We can't automatically use {{nextPosition}} - 1 as this could be a commit 
marker.

  was:
{{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long  
but the {{nextPosition}} is not correct, it should be the offset of the latest 
restored offset, but nextPosition is the offset of the first not restored 
offset.

We can't automatically use {{nextPosition}} - 1 as this could be a commit 
marker.


> Fix StateRestoreListener To Use Correct Ending Offset
> -
>
> Key: KAFKA-6367
> URL: https://issues.apache.org/jira/browse/KAFKA-6367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Bill Bejeck
> Fix For: 1.0.1
>
>
> {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} 
> long  but the {{nextPosition}} is not correct, it should be the offset of the 
> latest restored offset, but {{nextPosition}} is the offset of the first not 
> restored offset.
> We can't automatically use {{nextPosition}} - 1 as this could be a commit 
> marker.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset

2017-12-14 Thread Bill Bejeck (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-6367:
--

Assignee: Bill Bejeck

> Fix StateRestoreListener To Use Correct Ending Offset
> -
>
> Key: KAFKA-6367
> URL: https://issues.apache.org/jira/browse/KAFKA-6367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
> Fix For: 1.0.1
>
>
> {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} 
> long  but the {{nextPosition}} is not correct, it should be the offset of the 
> latest restored offset, but {{nextPosition}} is the offset of the first not 
> restored offset.
> We can't automatically use {{nextPosition}} - 1 as this could be a commit 
> marker.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6311) Expose Kafka cluster ID in Connect REST API

2017-12-14 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291735#comment-16291735
 ] 

Randall Hauch commented on KAFKA-6311:
--

And KIP is available here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-238%3A+Expose+Kafka+cluster+ID+in+Connect+REST+API

> Expose Kafka cluster ID in Connect REST API
> ---
>
> Key: KAFKA-6311
> URL: https://issues.apache.org/jira/browse/KAFKA-6311
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Xavier Léauté
>Assignee: Ewen Cheslack-Postava
>  Labels: needs-kip
>
> Connect currently does not expose any information about the Kafka cluster it 
> is connected to.
> In an environment with multiple Kafka clusters it would be useful to know 
> which cluster Connect is talking to. Exposing this information enables 
> programmatic discovery of Kafka cluster metadata for the purpose of 
> configuring connectors.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6102) Consolidate MockTime implementations between connect and clients

2017-12-14 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6102.

   Resolution: Fixed
Fix Version/s: 1.0.1

Issue resolved by pull request 4105
[https://github.com/apache/kafka/pull/4105]

> Consolidate MockTime implementations between connect and clients
> 
>
> Key: KAFKA-6102
> URL: https://issues.apache.org/jira/browse/KAFKA-6102
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Minor
> Fix For: 1.0.1
>
>
> Consolidate MockTime implementations between connect and clients



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6102) Consolidate MockTime implementations between connect and clients

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291749#comment-16291749
 ] 

ASF GitHub Bot commented on KAFKA-6102:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4105


> Consolidate MockTime implementations between connect and clients
> 
>
> Key: KAFKA-6102
> URL: https://issues.apache.org/jira/browse/KAFKA-6102
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Minor
> Fix For: 1.0.1
>
>
> Consolidate MockTime implementations between connect and clients



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2017-12-14 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291750#comment-16291750
 ] 

Randall Hauch commented on KAFKA-3821:
--

[~gunnar.morling], no, there is no KIP for this yet. I agree, the 
{{OffsetRecord}} approach seems to be the cleanest API.

We will have to think about how the transformations will be affected, since 
many of the implementations (not just those we provided in AK) are likely not 
necessarily expecting a null topic. The {{SourceRecord}} documentation is not 
explicit about what fields are required.

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>  Labels: needs-kip
> Fix For: 1.1.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6368) AdminClient should contact multiple nodes before timing out a call

2017-12-14 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-6368:
--

 Summary: AdminClient should contact multiple nodes before timing 
out a call
 Key: KAFKA-6368
 URL: https://issues.apache.org/jira/browse/KAFKA-6368
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.10.1.0
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


The AdminClient should contact multiple nodes before timing out a call.  Right 
now, we could use up our entire call timeout just waiting for one very slow 
node to respond.  We probably need to decouple the call timeout from the 
NetworkClient request timeout.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6368) AdminClient should contact multiple nodes before timing out a call

2017-12-14 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-6368:
---
Affects Version/s: (was: 0.10.1.0)
   0.11.0.0

> AdminClient should contact multiple nodes before timing out a call
> --
>
> Key: KAFKA-6368
> URL: https://issues.apache.org/jira/browse/KAFKA-6368
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>
> The AdminClient should contact multiple nodes before timing out a call.  
> Right now, we could use up our entire call timeout just waiting for one very 
> slow node to respond.  We probably need to decouple the call timeout from the 
> NetworkClient request timeout.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6308) Connect: Struct equals/hashCode method should use Arrays#deep* methods

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291794#comment-16291794
 ] 

ASF GitHub Bot commented on KAFKA-6308:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4293


> Connect: Struct equals/hashCode method should use Arrays#deep* methods
> --
>
> Key: KAFKA-6308
> URL: https://issues.apache.org/jira/browse/KAFKA-6308
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Tobias Gies
>  Labels: easyfix, newbie
> Fix For: 1.0.1
>
>
> At the moment, {{org.apache.kafka.connect.data.Struct#equals}} checks two 
> things, after ensuring the incoming {{Object o}} is indeed of the correct 
> type:
> * Whether the schemas of {{this}} and {{o}} are equal, via {{Objects#equals}}
> * Whether the values of {{this}} and {{o}} are qual, via {{Arrays#equals}}.
> The latter check is problematic. {{Arrays#equals}} is meant for 
> one-dimensional arrays of any kind, and thus simply checks the {{equals}} 
> methods of all corresponding elements of its parameters {{a1}} and {{a2}}. 
> However, elements of the {{Struct#values}} array may themselves be arrays in 
> a specific case, namely if a field has a {{BYTES}} Schema Type and the user's 
> input for this field is of type {{byte[]}}.
> Given that, I would suggest to use {{Arrays#deepEquals}} to compare the 
> {{values}} arrays of two {{Struct}} instances. With similar reasoning, I 
> would also suggest to use {{Arrays#deepHashCode}} in the Struct's 
> {{hashCode}} method.
> This would allow to properly compare and hash structs that get byte arrays 
> passed in as field values instead of the recommended ByteBuffers. An 
> alternative might be to automatically wrap byte arrays passed into any 
> {{put}} method in a ByteBuffer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6308) Connect: Struct equals/hashCode method should use Arrays#deep* methods

2017-12-14 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6308.

   Resolution: Fixed
Fix Version/s: 1.0.1

Issue resolved by pull request 4293
[https://github.com/apache/kafka/pull/4293]

> Connect: Struct equals/hashCode method should use Arrays#deep* methods
> --
>
> Key: KAFKA-6308
> URL: https://issues.apache.org/jira/browse/KAFKA-6308
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Tobias Gies
>  Labels: easyfix, newbie
> Fix For: 1.0.1
>
>
> At the moment, {{org.apache.kafka.connect.data.Struct#equals}} checks two 
> things, after ensuring the incoming {{Object o}} is indeed of the correct 
> type:
> * Whether the schemas of {{this}} and {{o}} are equal, via {{Objects#equals}}
> * Whether the values of {{this}} and {{o}} are qual, via {{Arrays#equals}}.
> The latter check is problematic. {{Arrays#equals}} is meant for 
> one-dimensional arrays of any kind, and thus simply checks the {{equals}} 
> methods of all corresponding elements of its parameters {{a1}} and {{a2}}. 
> However, elements of the {{Struct#values}} array may themselves be arrays in 
> a specific case, namely if a field has a {{BYTES}} Schema Type and the user's 
> input for this field is of type {{byte[]}}.
> Given that, I would suggest to use {{Arrays#deepEquals}} to compare the 
> {{values}} arrays of two {{Struct}} instances. With similar reasoning, I 
> would also suggest to use {{Arrays#deepHashCode}} in the Struct's 
> {{hashCode}} method.
> This would allow to properly compare and hash structs that get byte arrays 
> passed in as field values instead of the recommended ByteBuffers. An 
> alternative might be to automatically wrap byte arrays passed into any 
> {{put}} method in a ByteBuffer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower

2017-12-14 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291812#comment-16291812
 ] 

Jun Rao commented on KAFKA-6361:


[~hachikuji], thanks for the info. The problem in the description can indeed 
happen if the first leader change is due to preferred leader election, in which 
case, the ISR won't change.

To address this issue, we could send another OffsetForLeaderEpoch with the 
previous leader epoch as you suggested. This may require multiple rounds of 
OffsetForLeaderEpoch requests. Another way is to change OffsetForLeaderEpoch 
request to send a sequence of (leader epoch, start offset) for the epoch 
between the follower's HW and LEO. On the leader side, we find the longest 
consecutive sequence of leader epoch whose start offset matches the leader's. 
We then return the end offset of the last matching leader epoch.

The above approach doesn't fully fix the issue for a compacted topic. When all 
messages for a leader epoch are removed, we may lose the leader epoch. Thus, 
the leader epochs between the follower and the leader may not perfectly match. 
One way to address this issue is to preserve the offset of the first message in 
a leader epoch during log cleaning. This probably can be done separately since 
it causes problems rarely.

> Fast leader fail over can lead to log divergence between leader and follower
> 
>
> Key: KAFKA-6361
> URL: https://issues.apache.org/jira/browse/KAFKA-6361
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>  Labels: reliability
>
> We have observed an edge case in the replication failover logic which can 
> cause a replica to permanently fall out of sync with the leader or, in the 
> worst case, actually have localized divergence between logs. This occurs in 
> spite of the improved truncation logic from KIP-101. 
> Suppose we have brokers A and B. Initially A is the leader in epoch 1. It 
> appends two batches: one in the range (0, 10) and the other in the range (11, 
> 20). The first one successfully replicates to B, but the second one does not. 
> In other words, the logs on the brokers look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> {code}
> Broker A then has a zk session expiration and broker B is elected with epoch 
> 2. It appends a new batch with offsets (11, n) to its local log. So we now 
> have this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Normally we expect broker A to truncate to offset 11 on becoming the 
> follower, but before it is able to do so, broker B has its own zk session 
> expiration and broker A again becomes leader, now with epoch 3. It then 
> appends a new entry in the range (21, 30). The updated logs look like this:
> {code}
> Broker A:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets [11, 20], leader epoch: 1
> 2: offsets: [21, 30], leader epoch: 3
> Broker B:
> 0: offsets [0, 10], leader epoch: 1
> 1: offsets: [11, n], leader epoch: 2
> {code}
> Now what happens next depends on the last offset of the batch appended in 
> epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch 
> request to broker A with epoch 2. Broker A will respond that epoch 2 ends at 
> offset 21. There are three cases:
> 1) n < 20: In this case, broker B will not do any truncation. It will begin 
> fetching from offset n, which will ultimately cause an out of order offset 
> error because broker A will return the full batch beginning from offset 11 
> which broker B will be unable to append.
> 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 
> and everything will appear fine though the logs have actually diverged.
> 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in 
> the middle of the batch, it will truncate all the way to offset 10. It can 
> begin fetching from offset 11 and everything is fine.
> The case we have actually seen is the first one. The second one would likely 
> go unnoticed in practice and everything is fine in the third case. To 
> workaround the issue, we deleted the active segment on the replica which 
> allowed it to re-replicate consistently from the leader.
> I'm not sure the best solution for this scenario. Maybe if the leader isn't 
> aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} 
> instead of using the offset of the next highest epoch. That would cause the 
> follower to truncate using its high watermark. Or perhaps instead of doing 
> so, it could

[jira] [Created] (KAFKA-6369) General wildcard support for ACL's in kafka

2017-12-14 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-6369:


 Summary: General wildcard support for ACL's in kafka
 Key: KAFKA-6369
 URL: https://issues.apache.org/jira/browse/KAFKA-6369
 Project: Kafka
  Issue Type: New Feature
Reporter: Antony Stubbs


Especially for streams apps where all intermediate topics are prefixed with the 
application id.

For example, add read and write access to mystreamsapp.* so any new topics 
created by the app don't need to have specific permissions applied to them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient

2017-12-14 Thread Colin P. McCabe (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291888#comment-16291888
 ] 

Colin P. McCabe commented on KAFKA-6363:


Thanks for working on refactoring this.

I think maybe it's just the name that is confusing?  {{MockAdminClientEnv}} 
suggests that it is an environment for {{MockAdminClient}}, but of course it is 
not.  Perhaps we could just rename it to {{AdminClientUnitTestEnv}}?  That 
would make it clear, I think.

> Use MockAdminClient for any unit tests that depend on AdminClient
> -
>
> Key: KAFKA-6363
> URL: https://issues.apache.org/jira/browse/KAFKA-6363
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>  Labels: newbie
>
> Today we have a few unit tests other than KafkaAdminClientTest that relies on 
> MockKafkaAdminClientEnv.
> About this class and MockKafkaAdminClientEnv, my thoughts:
> 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner 
> KafkaClient; it should be only used for the unit test of KafkaAdminClient 
> itself.
> 2. For any other unit tests on classes that depend on AdminClient, we should 
> be using the MockAdminClient that mocks the whole AdminClient.
> So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in 
> KafkaAdminClientTest use MockClient and added a new static constructor that 
> takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2017-12-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291952#comment-16291952
 ] 

ASF GitHub Bot commented on KAFKA-6362:
---

GitHub user huxihx opened a pull request:

https://github.com/apache/kafka/pull/4326

KAFKA-6362: maybeAutoCommitOffsetsAsync should try to discover coordinator

Currently, `maybeAutoCommitOffsetsAsync` may not retry to find out 
coordinator even after the coordinator goes back to service. As a result, all 
asynchronous offset commits will fail.

This patch refines `maybeAutoCommitOffsetsAsync` to have it periodically 
retry the coordinator discovering.

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huxihx/kafka KAFKA-6362

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4326


commit a2e2da2b2215da2053ffc9e6dace4a36e8777e12
Author: huxihx 
Date:   2017-12-15T02:39:31Z

KAFKA-6362: ConsumerCoordinator.maybeAutoCommitOffsetsAsync should try to 
discover coordinator.

Currently, `maybeAutoCommitOffsetsAsync` may not retry to find out 
coordinator even after the coordinator goes back to service. As a result, all 
asynchronous offset commits will fail.

This patch refines `maybeAutoCommitOffsetsAsync` to have it periodically 
retry the coordinator discovering.




> auto commit not work since coordinatorUnknown() is always true.
> ---
>
> Key: KAFKA-6362
> URL: https://issues.apache.org/jira/browse/KAFKA-6362
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.2.1
>Reporter: Renkai Ge
>
> {code}
> [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
> source_bj-docker-large (14/40)] INFO  
> org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
>   auto.commit.interval.ms = 5000
>   auto.offset.reset = latest
>   bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
> 11.192.73.66:3002]
>   check.crcs = true
>   client.id = 
>   connections.max.idle.ms = 54
>   enable.auto.commit = true
>   exclude.internal.topics = true
>   fetch.max.bytes = 52428800
>   fetch.max.wait.ms = 500
>   fetch.min.bytes = 1
>   group.id = tcprtdetail_flink
>   heartbeat.interval.ms = 3000
>   interceptor.classes = null
>   key.deserializer = class 
> org.apache.kafka.common.serialization.ByteArrayDeserializer
>   max.partition.fetch.bytes = 1048576
>   max.poll.interval.ms = 30
>   max.poll.records = 500
>   metadata.max.age.ms = 30
>   metric.reporters = []
>   metrics.num.samples = 2
>   metrics.recording.level = INFO
>   metrics.sample.window.ms = 3
>   partition.assignment.strategy = [class 
> org.apache.kafka.clients.consumer.RangeAssignor]
>   receive.buffer.bytes = 65536
>   reconnect.backoff.ms = 50
>   request.timeout.ms = 305000
>   retry.backoff.ms = 100
>   sasl.jaas.config = null
>   sasl.kerberos.kinit.cmd = /usr/bin/kinit
>   sasl.kerberos.min.time.before.relogin = 6
>   sasl.kerberos.service.name = null
>   sasl.kerberos.ticket.renew.jitter = 0.05
>   sasl.kerberos.ticket.renew.window.factor = 0.8
>   sasl.mechanism = GSSAPI
>   security.protocol = PLAINTEXT
>   send.buffer.bytes = 131072
>   session.timeout.ms = 1
>   ssl.cipher.suites = null
>   ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>   ssl.endpoint.identification.algorithm = null
>   ssl.key.password = null
>   ssl.keymanager.algorithm = SunX509
>   ssl.keystore.location = null
>   ssl.keystore.password = null
>   ssl.keystore.type = JKS
>   ssl.protocol = TLS
>   ssl.provider = null
>   ssl.secure.random.implementation = null
>   ssl.trustmanager.algorithm = PKIX
>   ssl.truststore.location = null
>   ssl.truststore.password = null
> 

[jira] [Commented] (KAFKA-5285) optimize upper / lower byte range for key range scan on windowed stores

2017-12-14 Thread Peter Davis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291962#comment-16291962
 ] 

Peter Davis commented on KAFKA-5285:


In debugging a recent performance blocker in an app of mine, I'm suspecting 
that when calling `ReadOnlySessionStore.fetch(from,to)`, which uses from 
(Timestamp) = Long.MAX_VALUE, it calls `upperRange` with a `maxSuffix` filled 
with mostly 0xFF's.  The resulting upperRange is therefore also mostly 0xFF and 
the resulting RocksDB iterator effectively iterates over 
(binaryKeyFrom...infinity).  With a large number of keys, this is much worse 
than a mere performance issue (though the result appears "correct" since 
SessionKeySchema.hasNextCondition filters out the bogus results).  It iterates 
over thousands of unnecessary records and is slow as molasses.

It looks like the issue dates to KIP-155.

In 
[`SessionKeySchema#upperRange`](https://github.com/apache/kafka/commit/e28752357705568219315375c666f8e500db9c12#diff-52e7d2701ecab21b32621d9b13b7f33bR57),
 why is `putLong(to)` (timestamp) repeated twice and it does not use `put(key)` 
to build the `maxRange`?

When using a timestamp less than `Long.MAX_VALUE`, the issue is avoided because 
`OrderedBytes.upperRange` copies more of the real key.  But 
`ReadOnlySessionStore.fetch` does not let one specify a different timestamp.

> optimize upper / lower byte range for key range scan on windowed stores
> ---
>
> Key: KAFKA-5285
> URL: https://issues.apache.org/jira/browse/KAFKA-5285
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>  Labels: performance
>
> The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} 
> {{upperRange}} and {{lowerRange}} does not make any assumptions with respect 
> to the other key bound (e.g. the upper byte bound does not depends on lower 
> key bound).
> It should be possible to optimize the byte range somewhat further using the 
> information provided by the lower bound.
> More specifically, by incorporating that information, we should be able to 
> eliminate the corresponding {{upperRangeFixedSize}} and 
> {{lowerRangeFixedSize}}, since the result should be the same if we implement 
> that optimization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-5285) optimize upper / lower byte range for key range scan on windowed stores

2017-12-14 Thread Peter Davis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291962#comment-16291962
 ] 

Peter Davis edited comment on KAFKA-5285 at 12/15/17 3:08 AM:
--

In debugging a recent performance blocker in an app of mine, I'm suspecting 
that when calling {{ReadOnlySessionStore.fetch(from,to)}}, which uses from 
(Timestamp) = Long.MAX_VALUE, it calls {{upperRange}} with a maxSuffix filled 
with mostly 0xFF's.  The resulting upperRange is therefore also mostly 0xFF and 
the resulting RocksDB iterator effectively iterates over 
(binaryKeyFrom...infinity)!  With a large number of keys, this is much worse 
than a mere performance issue (though the result appears "correct" since 
SessionKeySchema.hasNextCondition filters out the bogus results).  It iterates 
over thousands of unnecessary records and is slow as molasses.

It looks like the issue dates to KIP-155.

In 
[{{SessionKeySchema#upperRange}}|https://github.com/apache/kafka/commit/e28752357705568219315375c666f8e500db9c12#diff-52e7d2701ecab21b32621d9b13b7f33bR57],
 why is {{putLong(to)}} (timestamp) repeated twice and it does not use the 
actual {{key}} to build the {{maxRange}}?

When using a timestamp less than {{Long.MAX_VALUE}}, the issue is avoided 
because the first timestamp put into {{maxRange}} begins mostly with 0 bytes, 
so {{OrderedBytes.upperRange}} copies more of the real key in its mask loop.  
But {{ReadOnlySessionStore.fetch}} does not let one specify a different 
timestamp.


was (Author: davispw):
In debugging a recent performance blocker in an app of mine, I'm suspecting 
that when calling `ReadOnlySessionStore.fetch(from,to)`, which uses from 
(Timestamp) = Long.MAX_VALUE, it calls `upperRange` with a `maxSuffix` filled 
with mostly 0xFF's.  The resulting upperRange is therefore also mostly 0xFF and 
the resulting RocksDB iterator effectively iterates over 
(binaryKeyFrom...infinity).  With a large number of keys, this is much worse 
than a mere performance issue (though the result appears "correct" since 
SessionKeySchema.hasNextCondition filters out the bogus results).  It iterates 
over thousands of unnecessary records and is slow as molasses.

It looks like the issue dates to KIP-155.

In 
[`SessionKeySchema#upperRange`](https://github.com/apache/kafka/commit/e28752357705568219315375c666f8e500db9c12#diff-52e7d2701ecab21b32621d9b13b7f33bR57),
 why is `putLong(to)` (timestamp) repeated twice and it does not use `put(key)` 
to build the `maxRange`?

When using a timestamp less than `Long.MAX_VALUE`, the issue is avoided because 
`OrderedBytes.upperRange` copies more of the real key.  But 
`ReadOnlySessionStore.fetch` does not let one specify a different timestamp.

> optimize upper / lower byte range for key range scan on windowed stores
> ---
>
> Key: KAFKA-5285
> URL: https://issues.apache.org/jira/browse/KAFKA-5285
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>  Labels: performance
>
> The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} 
> {{upperRange}} and {{lowerRange}} does not make any assumptions with respect 
> to the other key bound (e.g. the upper byte bound does not depends on lower 
> key bound).
> It should be possible to optimize the byte range somewhat further using the 
> information provided by the lower bound.
> More specifically, by incorporating that information, we should be able to 
> eliminate the corresponding {{upperRangeFixedSize}} and 
> {{lowerRangeFixedSize}}, since the result should be the same if we implement 
> that optimization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6370) MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to NullPointerException

2017-12-14 Thread Ted Yu (JIRA)
Ted Yu created KAFKA-6370:
-

 Summary: MirrorMakerIntegrationTest#testCommaSeparatedRegex may 
fail due to NullPointerException
 Key: KAFKA-6370
 URL: https://issues.apache.org/jira/browse/KAFKA-6370
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


>From 
>https://builds.apache.org/job/kafka-trunk-jdk8/2277/testReport/junit/kafka.tools/MirrorMakerIntegrationTest/testCommaSeparatedRegex/
> :
{code}
java.lang.NullPointerException
at 
scala.collection.immutable.StringLike.$anonfun$format$1(StringLike.scala:351)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.map(TraversableLike.scala:234)
at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at scala.collection.immutable.StringLike.format(StringLike.scala:351)
at scala.collection.immutable.StringLike.format$(StringLike.scala:350)
at scala.collection.immutable.StringOps.format(StringOps.scala:29)
at 
kafka.metrics.KafkaMetricsGroup$.$anonfun$toScope$3(KafkaMetricsGroup.scala:170)
at scala.collection.immutable.List.map(List.scala:283)
at 
kafka.metrics.KafkaMetricsGroup$.kafka$metrics$KafkaMetricsGroup$$toScope(KafkaMetricsGroup.scala:170)
at 
kafka.metrics.KafkaMetricsGroup.explicitMetricName(KafkaMetricsGroup.scala:67)
at 
kafka.metrics.KafkaMetricsGroup.explicitMetricName$(KafkaMetricsGroup.scala:51)
at 
kafka.network.RequestMetrics.explicitMetricName(RequestChannel.scala:352)
at 
kafka.metrics.KafkaMetricsGroup.metricName(KafkaMetricsGroup.scala:47)
at 
kafka.metrics.KafkaMetricsGroup.metricName$(KafkaMetricsGroup.scala:42)
at kafka.network.RequestMetrics.metricName(RequestChannel.scala:352)
at 
kafka.metrics.KafkaMetricsGroup.newHistogram(KafkaMetricsGroup.scala:81)
at 
kafka.metrics.KafkaMetricsGroup.newHistogram$(KafkaMetricsGroup.scala:80)
at kafka.network.RequestMetrics.newHistogram(RequestChannel.scala:352)
at kafka.network.RequestMetrics.(RequestChannel.scala:364)
at 
kafka.network.RequestChannel$Metrics.$anonfun$new$2(RequestChannel.scala:57)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.network.RequestChannel$Metrics.(RequestChannel.scala:56)
at kafka.network.RequestChannel.(RequestChannel.scala:243)
at kafka.network.SocketServer.(SocketServer.scala:71)
at kafka.server.KafkaServer.startup(KafkaServer.scala:238)
at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
at 
kafka.integration.KafkaServerTestHarness.$anonfun$setUp$1(KafkaServerTestHarness.scala:93)
{code}
Here is the code from KafkaMetricsGroup.scala :
{code}
.map { case (key, value) => "%s.%s".format(key, value.replaceAll("\\.", 
"_"))}
{code}
It seems (some) value was null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset

2017-12-14 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292054#comment-16292054
 ] 

Matthias J. Sax commented on KAFKA-6367:


{quote}
We can't automatically use {{nextPosition - 1}} as this could be a commit 
marker.
{quote}

It a valid point, but I am not sure if returning the offset of a commit marker 
would be an issue or not. The provided offset is called {{batchEndOffset}} -- 
this does not imply "offset of last restored record"; or does it?

If we don't return commit marker offsets, we get "gaps" -- thus, if someone 
tracks the offsets in the callback and assume "completeness" this would be 
violated. On the other hand, if there are aborted messages, we might have gaps 
anyway...

Not sure what other think. To me it seems, we don't get much advantage if we 
don't "return" commit markers but get lot of additional complexity. Thus, I 
tend to think that using {{nextPosition - 1}} might be ok.

> Fix StateRestoreListener To Use Correct Ending Offset
> -
>
> Key: KAFKA-6367
> URL: https://issues.apache.org/jira/browse/KAFKA-6367
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
> Fix For: 1.0.1
>
>
> {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} 
> long  but the {{nextPosition}} is not correct, it should be the offset of the 
> latest restored offset, but {{nextPosition}} is the offset of the first not 
> restored offset.
> We can't automatically use {{nextPosition}} - 1 as this could be a commit 
> marker.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)