[jira] [Commented] (KAFKA-6755) MaskField SMT should optionally take a literal value to use instead of using null

2019-02-12 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765847#comment-16765847
 ] 

Valeria Vasylieva commented on KAFKA-6755:
--

Hi [~rhauch], if it still needed to be implemented, I will work on it, ok?

> MaskField SMT should optionally take a literal value to use instead of using 
> null
> -
>
> Key: KAFKA-6755
> URL: https://issues.apache.org/jira/browse/KAFKA-6755
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Valeria Vasylieva
>Priority: Major
>  Labels: needs-kip, newbie
>   Original Estimate: 8h
>  Remaining Estimate: 8h
>
> The existing {{org.apache.kafka.connect.transforms.MaskField}} SMT always 
> uses the null value for the type of field. It'd be nice to *optionally* be 
> able to specify a literal value for the type, where the SMT would convert the 
> literal string value in the configuration to the desired type (using the new 
> {{Values}} methods).
> Use cases: mask out the IP address, or SSN, or other personally identifiable 
> information (PII).
> Since this changes the API, and thus will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6755) MaskField SMT should optionally take a literal value to use instead of using null

2019-02-12 Thread Valeria Vasylieva (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valeria Vasylieva reassigned KAFKA-6755:


Assignee: Valeria Vasylieva

> MaskField SMT should optionally take a literal value to use instead of using 
> null
> -
>
> Key: KAFKA-6755
> URL: https://issues.apache.org/jira/browse/KAFKA-6755
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Valeria Vasylieva
>Priority: Major
>  Labels: needs-kip, newbie
>   Original Estimate: 8h
>  Remaining Estimate: 8h
>
> The existing {{org.apache.kafka.connect.transforms.MaskField}} SMT always 
> uses the null value for the type of field. It'd be nice to *optionally* be 
> able to specify a literal value for the type, where the SMT would convert the 
> literal string value in the configuration to the desired type (using the new 
> {{Values}} methods).
> Use cases: mask out the IP address, or SSN, or other personally identifiable 
> information (PII).
> Since this changes the API, and thus will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5792) Transient failure in KafkaAdminClientTest.testHandleTimeout

2019-02-12 Thread Gert van Dijk (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765898#comment-16765898
 ] 

Gert van Dijk commented on KAFKA-5792:
--

I'm also seeing this issue still with 2.1.0 with my own AdminClient. It took me 
huge amounts of time to figure out what is happening, to discover a unit test 
is disabled in Kafka masking the erroneous behaviour. :(

FWIW, what I'm seeing with debug logging enabled is: 1) successful connection 
to its bootstrap server. 2) huge amounts of those lines:
{noformat}
[kafka-admin-client-thread | adminclient-1] DEBUG
   o.a.k.c.a.i.AdminMetadataManager - [AdminClient clientId=adminclient-1]
   Updating cluster metadata to Cluster(id = q7XgghZqQUW_o5W2-Nn5Qw,
   nodes = [], partitions = [], controller = null){noformat}
(note {{nodes = []}}.)

and 3) then, after a while (could be 1 second, could be 40+ seconds in my 
case), it finally responds with a node and it connects just fine:
{noformat}
[kafka-admin-client-thread | adminclient-1] DEBUG 
   o.a.k.c.a.i.AdminMetadataManager - [AdminClient clientId=adminclient-1]
   Updating cluster metadata to Cluster(id = q7XgghZqQUW_o5W2-Nn5Qw,
   nodes = [kafka:9092 (id: 0 rack: null)], partitions = [],
   controller = kafka:9092 (id: 0 rack: null))  
   
[kafka-admin-client-thread | adminclient-1]
   DEBUG o.apache.kafka.clients.NetworkClient -
   [AdminClient clientId=adminclient-1] Initiating connection to node
   kafka:9092 (id: 0 rack: null) using address kafka/x{noformat}

This all happens in a completely clean, freshly started, default configuration, 
non-SSL, no-authentication, single-node Zookeeper-Kafka deployment in Docker 
without any data or other connections. (See also [my QA @ 
ServerFault|https://serverfault.com/q/953393/135437].)

It would be really great if this could be fixed some day.

> Transient failure in KafkaAdminClientTest.testHandleTimeout
> ---
>
> Key: KAFKA-5792
> URL: https://issues.apache.org/jira/browse/KAFKA-5792
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Colin P. McCabe
>Priority: Major
>  Labels: transient-unit-test-failure
> Fix For: 2.2.0
>
>
> The {{KafkaAdminClientTest.testHandleTimeout}} test occasionally fails with 
> the following:
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:213)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClientTest.testHandleTimeout(KafkaAdminClientTest.java:356)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2019-02-12 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765924#comment-16765924
 ] 

Rajini Sivaram commented on KAFKA-7304:
---

[~mjsax] We have fixed all the known memory leak issues. [~yuyang08] Can we 
reduce the priority of this JIRA since the only remaining issue is high CPU 
with SSL?

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Critical
> Fix For: 1.1.2, 2.2.0, 2.0.2
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot 
> 2018-08-29 at 10.50.47 AM.png, Screen Shot 2018-09-29 at 10.38.12 PM.png, 
> Screen Shot 2018-09-29 at 10.38.38 PM.png, Screen Shot 2018-09-29 at 8.34.50 
> PM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7920) Do not permit zstd use until inter.broker.protocol.version is updated to 2.1

2019-02-12 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7920:
--

Assignee: Lee Dongjin

> Do not permit zstd use until inter.broker.protocol.version is updated to 2.1
> 
>
> Key: KAFKA-7920
> URL: https://issues.apache.org/jira/browse/KAFKA-7920
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Jason Gustafson
>Assignee: Lee Dongjin
>Priority: Major
>
> After brokers have been upgraded to 2.1, users can begin using zstd 
> compression. Regardless of the inter.broker.protocol.version, the broker will 
> happily accept zstd-compressed data as long as the right produce request 
> version is used. However, if the inter.broker.protocol.version is set to 2.0 
> or below, then followers will not be able to use the minimum required fetch 
> version, which will result in the following error:
> {code}
> [2019-02-11 17:42:47,116] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition foo-0 at offset 0 
> (kafka.server.ReplicaFetcherThread)   
>   
>  
> org.apache.kafka.common.errors.UnsupportedCompressionTypeException: The 
> requesting client does not support the compression type of given partition.
> {code}
> We should make produce request validation consistent. Until the 
> inter.broker.protocol.version is at 2.1 or later, we should reject produce 
> requests with UNSUPPORTED_COMPRESSION_TYPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7920) Do not permit zstd use until inter.broker.protocol.version is updated to 2.1

2019-02-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765962#comment-16765962
 ] 

ASF GitHub Bot commented on KAFKA-7920:
---

dongjinleekr commented on pull request #6256: KAFKA-7920: Do not permit zstd 
use until inter.broker.protocol.version is updated to 2.1
URL: https://github.com/apache/kafka/pull/6256
 
 
   Here is the fix. Since `inter.broker.protocol.version` is defined in 
`KafkaConfig` (not `LogConfig`), I concluded that `ProduceRequest` validation 
routine is the best place for check. (I first thought that `LogValidator` would 
be the right place, but it required too many parameter passing. So I rejected 
that approach.)
   
   Here are the details:
   
   1. Add `Optional interBrokerProtocolVersion` to 
`ProduceRequest#validateRecords.`
   2. Disallow zstd compressed `MemoryRecords` if 
`inter.broker.protocol.version` < 2.1 (i.e., `ApiVersion.id` = 17).
   3. Add `ProduceRequest#validateRecords(short, MemoryRecords)` for backward 
compatibility.
   4. Fix a typo: "note allowed to" → "not allowed to"
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Do not permit zstd use until inter.broker.protocol.version is updated to 2.1
> 
>
> Key: KAFKA-7920
> URL: https://issues.apache.org/jira/browse/KAFKA-7920
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Jason Gustafson
>Priority: Major
>
> After brokers have been upgraded to 2.1, users can begin using zstd 
> compression. Regardless of the inter.broker.protocol.version, the broker will 
> happily accept zstd-compressed data as long as the right produce request 
> version is used. However, if the inter.broker.protocol.version is set to 2.0 
> or below, then followers will not be able to use the minimum required fetch 
> version, which will result in the following error:
> {code}
> [2019-02-11 17:42:47,116] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition foo-0 at offset 0 
> (kafka.server.ReplicaFetcherThread)   
>   
>  
> org.apache.kafka.common.errors.UnsupportedCompressionTypeException: The 
> requesting client does not support the compression type of given partition.
> {code}
> We should make produce request validation consistent. Until the 
> inter.broker.protocol.version is at 2.1 or later, we should reject produce 
> requests with UNSUPPORTED_COMPRESSION_TYPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-12 Thread Mateusz Owczarek (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765967#comment-16765967
 ] 

Mateusz Owczarek commented on KAFKA-7882:
-

[~vvcephei]

This is pretty much, how do I call suppress method:

{code:java}
  .suppress(new FinalResultsSuppressionBuilder(
stateStoreName,
new StrictBufferConfigImpl(Long.MaxValue, Long.MaxValue, 
BufferFullStrategy.SPILL_TO_DISK)
  ))
{code}

As far as I can see using static untilWindowCloses method also requires 
StrictBufferConfig and StrictBufferConfigImpl is a public class. Maybe it 
should not be? I presume I should have used withMaxBytes or unbounded static 
methods from the BufferConfig interface. I understand that the way I used the 
suppress method, fulfilled buffer will eventually throw an unexpected exception 
and stop my application when the buffer is full?


bq. Offhand, if you were previously using wall-clock time, you don't need 
semantically airtight suppression behavior, so, emitting early when the buffer 
fills up should be fine.

As far as I understand, emitEarlyWhenFull does not guarantee one event per 
window so it's not sufficient for me. Could you elaborate on why any wall-clock 
time implementation does not ensure it either?

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6755) MaskField SMT should optionally take a literal value to use instead of using null

2019-02-12 Thread Valeria Vasylieva (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765976#comment-16765976
 ] 

Valeria Vasylieva commented on KAFKA-6755:
--

[~rhauch] I am new to Kafka, could you please tell me, which should go first: 
KIP or pull request?

> MaskField SMT should optionally take a literal value to use instead of using 
> null
> -
>
> Key: KAFKA-6755
> URL: https://issues.apache.org/jira/browse/KAFKA-6755
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Valeria Vasylieva
>Priority: Major
>  Labels: needs-kip, newbie
>   Original Estimate: 8h
>  Remaining Estimate: 8h
>
> The existing {{org.apache.kafka.connect.transforms.MaskField}} SMT always 
> uses the null value for the type of field. It'd be nice to *optionally* be 
> able to specify a literal value for the type, where the SMT would convert the 
> literal string value in the configuration to the desired type (using the new 
> {{Values}} methods).
> Use cases: mask out the IP address, or SSN, or other personally identifiable 
> information (PII).
> Since this changes the API, and thus will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Kartik (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766100#comment-16766100
 ] 

Kartik commented on KAFKA-7794:
---

[~ijuma] Can you help me here? Should we return latest offset for the timestamp 
> latest committed record timestamp or just throw an error message. So that I 
can work accordingly on this issue.

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Daniele Ascione (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniele Ascione updated KAFKA-7794:
---
Attachment: image-2019-02-12-16-19-25-170.png

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Daniele Ascione (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniele Ascione updated KAFKA-7794:
---
Attachment: image-2019-02-12-16-21-13-126.png

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Daniele Ascione (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniele Ascione updated KAFKA-7794:
---
Attachment: image-2019-02-12-16-23-38-399.png

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Daniele Ascione (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766139#comment-16766139
 ] 

Daniele Ascione commented on KAFKA-7794:


[~kartikvk1996], I am able to reproduce the behaviour on my machine. I am using 
Kafka in Red Hat Enterprise Linux Server release 7.3 (Maipo).
You could produce more messages using:
{code:java}
bin/kafka-producer-perf-test.sh  --topic demo --throughput 10 --num-records 
5 --record-size 5 --producer-props bootstrap.servers=127.0.0.1:9092
{code}
If you then take a message using the procedure I described, you should have. 
I executed in my system right now, then I typed:
{code:java}
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 
--topic test --time 1549983817353
{code}
and the output is:

!image-2019-02-12-16-19-25-170.png!

I have some messages after and before that timestamp:
!image-2019-02-12-16-23-38-399.png!

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Daniele Ascione (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766139#comment-16766139
 ] 

Daniele Ascione edited comment on KAFKA-7794 at 2/12/19 3:24 PM:
-

[~kartikvk1996], I am able to reproduce the behaviour on my machine. I am using 
Kafka in Red Hat Enterprise Linux Server release 7.3 (Maipo).
 You could produce more messages using:
{code:java}
bin/kafka-producer-perf-test.sh  --topic demo --throughput 10 --num-records 
5 --record-size 5 --producer-props bootstrap.servers=127.0.0.1:9092
{code}

 I executed it  in my system right now, then I typed:
{code:java}
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 
--topic test --time 1549983817353
{code}
and the output is:

!image-2019-02-12-16-19-25-170.png!

I have some messages after and before that timestamp:
 !image-2019-02-12-16-23-38-399.png!


was (Author: dascione):
[~kartikvk1996], I am able to reproduce the behaviour on my machine. I am using 
Kafka in Red Hat Enterprise Linux Server release 7.3 (Maipo).
You could produce more messages using:
{code:java}
bin/kafka-producer-perf-test.sh  --topic demo --throughput 10 --num-records 
5 --record-size 5 --producer-props bootstrap.servers=127.0.0.1:9092
{code}
If you then take a message using the procedure I described, you should have. 
I executed in my system right now, then I typed:
{code:java}
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 
--topic test --time 1549983817353
{code}
and the output is:

!image-2019-02-12-16-19-25-170.png!

I have some messages after and before that timestamp:
!image-2019-02-12-16-23-38-399.png!

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6572) kafka-consumer-groups does not reset offsets to specified datetime correctly

2019-02-12 Thread Bruno Lenski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766167#comment-16766167
 ] 

Bruno Lenski commented on KAFKA-6572:
-

Hi

We run in the very same issue.

I have two topic : CEF and event.

while resetting offset with --to-datetime, the new-offset is set correctly for 
the topic CEF whereas it always point to the last entry for the topic event.

Did anyone found a solution to this issue ?

 

Thanks

 

Bruno

 

> kafka-consumer-groups does not reset offsets to specified datetime correctly
> 
>
> Key: KAFKA-6572
> URL: https://issues.apache.org/jira/browse/KAFKA-6572
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Sharon Lucas
>Priority: Major
>
> With Kafka 1.0.0 we are seeing a problem using the kafka-consumer-groups.sh 
> --reset-offsets option to reset offsets to a specific date/time in our 
> production environment.
> We first tried to use the kafka-consumer-groups.sh command with the 
> --reset-offsets option and with option --to-datetime 2018-02-10T00:00:00.000 
> in our staging environment and it worked correctly.  Running the following 
> command changed it to start processing logs from February 12, 2018 (4 days 
> ago) for a topic that had a large lag.  We did a dry run to verify before 
> running with the --execute option.
> {code:java}
> root@mlpstagemon0101a:/# /opt/kafka/bin/kafka-consumer-groups.sh 
> --bootstrap-server NN.NNN.NN.NN:9092 --group logstash-elasticsearch-latest 
> --to-datetime 2018-02-12T00:00:00.000-06:00 --reset-offsets --topic 
> staging-mon01-rg-elasticsearch --execute{code}
> We stopped the kafka mirrors that process this topic before resetting the 
> offsets and started the kafka mirrors after rsetting the offsets.  We 
> verified that it correctly started processing logs from February 12, 2018.
> Then we tried resetting offsets in a production environment for a topic that 
> had a very large lag using option --to-datetime 2018-02-10T00:00:00.000 and 
> it did not work as expected. We stopped the kafka mirrors that process this 
> topic before resetting the offsets and did a dry run to see what the new 
> offsets would be:
> {code:java}
> root@mlplon0401e:# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 
> NN.N.NNN.NNN:9092 --group mirror-consumer-ams03-geo-earliest --to-datetime 
> 2018-02-10T00:00:00.000 --reset-offsets --topic prod_in-ams03-geo-ca_access
> Note: This will not show information about old Zookeeper-based consumers.
> ^@^@^@^@
> TOPIC  PARTITION  NEW-OFFSET
> prod_in-ams03-geo-ca_access    52 52084147
> prod_in-ams03-geo-ca_access    106    52154199
> prod_in-ams03-geo-ca_access    75 52148673
> prod_in-ams03-geo-ca_access    61 52130753
> prod_in-ams03-geo-ca_access    49 52151667
> prod_in-ams03-geo-ca_access    48 52145233
> prod_in-ams03-geo-ca_access    27 52092805
> prod_in-ams03-geo-ca_access    26 52139644
> prod_in-ams03-geo-ca_access    65 52157504
> prod_in-ams03-geo-ca_access    105    52166289
> prod_in-ams03-geo-ca_access    38 52160464
> prod_in-ams03-geo-ca_access    22 52093451
> prod_in-ams03-geo-ca_access    4  52151660
> prod_in-ams03-geo-ca_access    90 52160296
> prod_in-ams03-geo-ca_access    25 52161691
> prod_in-ams03-geo-ca_access    13 52145828
> prod_in-ams03-geo-ca_access    56 52162867
> prod_in-ams03-geo-ca_access    42 52072094
> prod_in-ams03-geo-ca_access    7  52069496
> prod_in-ams03-geo-ca_access    117    52087078
> prod_in-ams03-geo-ca_access    32 52073732
> prod_in-ams03-geo-ca_access    102    52082022
> prod_in-ams03-geo-ca_access    76 52141018
> prod_in-ams03-geo-ca_access    83 52154542
> prod_in-ams03-geo-ca_access    72 52095051
> prod_in-ams03-geo-ca_access    85 52149907
> prod_in-ams03-geo-ca_access    119    52134435
> prod_in-ams03-geo-ca_access    113    52159340
> prod_in-ams03-geo-ca_access    55 52146597
> prod_in-ams03-geo-ca_access    18 52149079
> prod_in-ams03-geo-ca_access    35 52149058
> prod_in-ams03-geo-ca_access    99 52143277
> prod_in-ams03-geo-ca_access    41 52158872
> prod_in-ams03-geo-ca_access    112    52083901
> prod_in-ams03-geo-ca_access    34 52137932
> prod_in-ams03-geo-ca_access    89 52158135
> prod_in-ams03-geo-ca_access    40 5212
> prod_in-ams03-geo-ca_access    53 52138400
> prod_in-ams03-geo-ca_access    19 52144966
> prod_in-ams03-geo-ca_access    44 52166404
> prod_in-ams03-geo-ca_access    31 

[jira] [Comment Edited] (KAFKA-6572) kafka-consumer-groups does not reset offsets to specified datetime correctly

2019-02-12 Thread Bruno Lenski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6572?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766167#comment-16766167
 ] 

Bruno Lenski edited comment on KAFKA-6572 at 2/12/19 3:51 PM:
--

Hi

We run in the very same issue.

I have two topic : CEF and event.

while resetting offset with --to-datetime, the new-offset is set correctly for 
the topic CEF whereas it always point to the last entry for the topic event.

 

Note that this happened when we started having a big data inflow in the event 
topic. it works for several weeks with low level of activites before we got 
into such issue

Did anyone found a solution to this issue ?

 

Thanks

 

Bruno

 


was (Author: blenski):
Hi

We run in the very same issue.

I have two topic : CEF and event.

while resetting offset with --to-datetime, the new-offset is set correctly for 
the topic CEF whereas it always point to the last entry for the topic event.

Did anyone found a solution to this issue ?

 

Thanks

 

Bruno

 

> kafka-consumer-groups does not reset offsets to specified datetime correctly
> 
>
> Key: KAFKA-6572
> URL: https://issues.apache.org/jira/browse/KAFKA-6572
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 1.0.0
>Reporter: Sharon Lucas
>Priority: Major
>
> With Kafka 1.0.0 we are seeing a problem using the kafka-consumer-groups.sh 
> --reset-offsets option to reset offsets to a specific date/time in our 
> production environment.
> We first tried to use the kafka-consumer-groups.sh command with the 
> --reset-offsets option and with option --to-datetime 2018-02-10T00:00:00.000 
> in our staging environment and it worked correctly.  Running the following 
> command changed it to start processing logs from February 12, 2018 (4 days 
> ago) for a topic that had a large lag.  We did a dry run to verify before 
> running with the --execute option.
> {code:java}
> root@mlpstagemon0101a:/# /opt/kafka/bin/kafka-consumer-groups.sh 
> --bootstrap-server NN.NNN.NN.NN:9092 --group logstash-elasticsearch-latest 
> --to-datetime 2018-02-12T00:00:00.000-06:00 --reset-offsets --topic 
> staging-mon01-rg-elasticsearch --execute{code}
> We stopped the kafka mirrors that process this topic before resetting the 
> offsets and started the kafka mirrors after rsetting the offsets.  We 
> verified that it correctly started processing logs from February 12, 2018.
> Then we tried resetting offsets in a production environment for a topic that 
> had a very large lag using option --to-datetime 2018-02-10T00:00:00.000 and 
> it did not work as expected. We stopped the kafka mirrors that process this 
> topic before resetting the offsets and did a dry run to see what the new 
> offsets would be:
> {code:java}
> root@mlplon0401e:# /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 
> NN.N.NNN.NNN:9092 --group mirror-consumer-ams03-geo-earliest --to-datetime 
> 2018-02-10T00:00:00.000 --reset-offsets --topic prod_in-ams03-geo-ca_access
> Note: This will not show information about old Zookeeper-based consumers.
> ^@^@^@^@
> TOPIC  PARTITION  NEW-OFFSET
> prod_in-ams03-geo-ca_access    52 52084147
> prod_in-ams03-geo-ca_access    106    52154199
> prod_in-ams03-geo-ca_access    75 52148673
> prod_in-ams03-geo-ca_access    61 52130753
> prod_in-ams03-geo-ca_access    49 52151667
> prod_in-ams03-geo-ca_access    48 52145233
> prod_in-ams03-geo-ca_access    27 52092805
> prod_in-ams03-geo-ca_access    26 52139644
> prod_in-ams03-geo-ca_access    65 52157504
> prod_in-ams03-geo-ca_access    105    52166289
> prod_in-ams03-geo-ca_access    38 52160464
> prod_in-ams03-geo-ca_access    22 52093451
> prod_in-ams03-geo-ca_access    4  52151660
> prod_in-ams03-geo-ca_access    90 52160296
> prod_in-ams03-geo-ca_access    25 52161691
> prod_in-ams03-geo-ca_access    13 52145828
> prod_in-ams03-geo-ca_access    56 52162867
> prod_in-ams03-geo-ca_access    42 52072094
> prod_in-ams03-geo-ca_access    7  52069496
> prod_in-ams03-geo-ca_access    117    52087078
> prod_in-ams03-geo-ca_access    32 52073732
> prod_in-ams03-geo-ca_access    102    52082022
> prod_in-ams03-geo-ca_access    76 52141018
> prod_in-ams03-geo-ca_access    83 52154542
> prod_in-ams03-geo-ca_access    72 52095051
> prod_in-ams03-geo-ca_access    85 52149907
> prod_in-ams03-geo-ca_access    119    52134435
> prod_in-ams03-geo-ca_access    113    52159340
> prod_in-ams03-geo-ca_access    55 52146597
> prod_in-ams03-geo-ca_access    18 52149079
> prod_in-ams03-geo-ca_acce

[jira] [Commented] (KAFKA-6755) MaskField SMT should optionally take a literal value to use instead of using null

2019-02-12 Thread Randall Hauch (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766183#comment-16766183
 ] 

Randall Hauch commented on KAFKA-6755:
--

[~nimfadora], thanks for volunteering. Perhaps the best place to start for this 
issue is a PR, and when that gets close we'll need to do a simple KIP before 
the PR is merged. If you create a PR, please title it something like 
"KAFKA-6755: Allow literal value for MaskField SMT [WIP]" and then begin the 
description with "Requires KIP -- do not merge".

It is important that the SMT continues to accept configurations as used today, 
so the config needs to be backward compatible.

> MaskField SMT should optionally take a literal value to use instead of using 
> null
> -
>
> Key: KAFKA-6755
> URL: https://issues.apache.org/jira/browse/KAFKA-6755
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Valeria Vasylieva
>Priority: Major
>  Labels: needs-kip, newbie
>   Original Estimate: 8h
>  Remaining Estimate: 8h
>
> The existing {{org.apache.kafka.connect.transforms.MaskField}} SMT always 
> uses the null value for the type of field. It'd be nice to *optionally* be 
> able to specify a literal value for the type, where the SMT would convert the 
> literal string value in the configuration to the desired type (using the new 
> {{Values}} methods).
> Use cases: mask out the IP address, or SSN, or other personally identifiable 
> information (PII).
> Since this changes the API, and thus will require a KIP.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7897) Invalid use of epoch cache with old message format versions

2019-02-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766215#comment-16766215
 ] 

ASF GitHub Bot commented on KAFKA-7897:
---

hachikuji commented on pull request #6253: KAFKA-7897; Do not write epoch start 
offset for older message format versions
URL: https://github.com/apache/kafka/pull/6253
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Invalid use of epoch cache with old message format versions
> ---
>
> Key: KAFKA-7897
> URL: https://issues.apache.org/jira/browse/KAFKA-7897
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.0
>
>
> Message format downgrades are not supported, but they generally work as long 
> as broker/clients at least can continue to parse both message formats. After 
> a downgrade, the truncation logic should revert to using the high watermark, 
> but currently we use the existence of any cached epoch as the sole 
> prerequisite in order to leverage OffsetsForLeaderEpoch. This has the effect 
> of causing a massive truncation after startup which causes re-replication.
> I think our options to fix this are to either 1) clear the cache when we 
> notice a downgrade, or 2) forbid downgrades and raise an error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-12 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766239#comment-16766239
 ] 

Matthias J. Sax commented on KAFKA-7882:


[~nijo] you should not use `FinalResultsSuppressionBuilder` directly – note, 
it's in an internal package 
`org.apache.kafka.streams.kstream.internals.suppress` and not part of public 
API.

Instead, you should use `suppress(Suppressed.untilWindowCloses(...))` using the 
`Suppressed` configuration class that is part of public API. (There are other 
config methods on `Suppressed` available, too. As you will notice, the public 
API does not allow you to specify spill-to-disk strategy (because it's not 
implemented yet).

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7882) StateStores are frequently closed during the 'transform' method

2019-02-12 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766241#comment-16766241
 ] 

Matthias J. Sax commented on KAFKA-7882:


[~nijo] For the original error reported on this ticket: I agree with 
[~bbejeck]'s observation, that you code snippet does not make sense. Can you 
confirm that you create a new `Transformer` object each time 
`TransformerSupper.get()` is called – it would be incorrect, to create only one 
object and reuse it.

> StateStores are frequently closed during the 'transform' method
> ---
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Mateusz Owczarek
>Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while 
> transforming upcoming records. To ensure only one record of the same key and 
> the window comes to an aggregate I have created a custom Transformer (I know 
> something similar is going to be introduced with suppress method on KTable in 
> the future, but my implementation is quite simple and imo should work 
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition() 
> if (partition != -1) store.put(key.key(), (value, partition), 
> key.window().start()) 
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the 
> punctuator callback
> }
> {code}
>  
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input 
> topic partitions) is greater than 1 even if I'm just saving to the store and 
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value 
> in the transform method. I'm familiar with the basic docs, however, I haven't 
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
>   Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs + 
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
>   ),
>   serde[K],
>   serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as 
> long as possible for the debugging stage, I tried to hotfix that with the 
> retry in the transform method and the state stores reopen at the end and the 
> `put` method works, but this approach is questionable and I am concerned if 
> it actually works.
> Edit:
> May this be because of the fact that the 
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low 
> value? If I understand correctly, spilling to disk is done therefore more 
> frequently, may it, therefore, cause closing the store?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss

2019-02-12 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7672:
---
Priority: Critical  (was: Major)

> The local state not fully restored after KafkaStream rebalanced, resulting in 
> data loss
> ---
>
> Key: KAFKA-7672
> URL: https://issues.apache.org/jira/browse/KAFKA-7672
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: linyue li
>Priority: Critical
>  Labels: bug
> Fix For: 2.2.0
>
>
> Normally, when a task is migrated to a new thread and no checkpoint file was 
> found under its task folder, Kafka Stream needs to restore the local state 
> for remote changelog topic completely and then resume running. However, in 
> some scenarios, we found that Kafka Stream *NOT* restore this state even no 
> checkpoint was found, but just clean the state folder and transition to 
> running state directly, resulting the historic data loss. 
> To be specific, I will give the detailed logs for Kafka Stream in our project 
> to show this scenario: 
> {quote}2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to 
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting 
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to 
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread 
> AuditTrailBatch-StreamThread-1:
>  # the previous running task assigned to thread 1 is task 0_5 (the 
> corresponding partition is AuditTrailBatch-0-5)
>  # group begins to rebalance, the new task 1_1 is assigned to thread 1.
>  # no checkpoint was found under 1_1 state folder, so reset the offset to 0 
> and clean the local state folder.
>  # thread 1 transitions to RUNNING state directly without the restoration for 
> task 1_1, so the historic data for state 1_1 is lost for thread 1. 
> *ThoubleShoot*
> To investigate the cause for this issue, we analysis the source code in 
> KafkaStream and found the key is the variable named "completedRestorers".
> This is the definition of the variable:
> {code:java}
> private final Set completedRestorers = new HashSet<>();{code}
> Each thread object has its own completedRestorers, which is created in the 
> thread initialization, and not accessed crossly by other threads. The 
> completedRestorers is used to record the 

[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2019-02-12 Thread Yu Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766265#comment-16766265
 ] 

Yu Yang commented on KAFKA-7304:


[~rsivaram] sure.  I have reduced te priority of this ticket to "major". 

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Major
> Fix For: 1.1.2, 2.2.0, 2.0.2
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot 
> 2018-08-29 at 10.50.47 AM.png, Screen Shot 2018-09-29 at 10.38.12 PM.png, 
> Screen Shot 2018-09-29 at 10.38.38 PM.png, Screen Shot 2018-09-29 at 8.34.50 
> PM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2019-02-12 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated KAFKA-7304:
---
Priority: Major  (was: Critical)

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Major
> Fix For: 1.1.2, 2.2.0, 2.0.2
>
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot 
> 2018-08-29 at 10.50.47 AM.png, Screen Shot 2018-09-29 at 10.38.12 PM.png, 
> Screen Shot 2018-09-29 at 10.38.38 PM.png, Screen Shot 2018-09-29 at 8.34.50 
> PM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7921) Instable KafkaStreamsTest

2019-02-12 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-7921:
--

 Summary: Instable KafkaStreamsTest
 Key: KAFKA-7921
 URL: https://issues.apache.org/jira/browse/KAFKA-7921
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: Matthias J. Sax


{{KafkaStreamsTest}} failed multiple times, eg,
{quote}java.lang.AssertionError: Condition not met within timeout 15000. 
Streams never started.
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
at 
org.apache.kafka.streams.KafkaStreamsTest.shouldThrowOnCleanupWhileRunning(KafkaStreamsTest.java:556){quote}
or
{quote}java.lang.AssertionError: Condition not met within timeout 15000. 
Streams never started.
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
at 
org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:255){quote}
 
The preserved logs are as follows:

{quote}[2019-02-12 07:02:17,198] INFO Kafka version: 2.3.0-SNAPSHOT 
(org.apache.kafka.common.utils.AppInfoParser:109)
[2019-02-12 07:02:17,198] INFO Kafka commitId: 08036fa4b1e5b822 
(org.apache.kafka.common.utils.AppInfoParser:110)
[2019-02-12 07:02:17,199] INFO stream-client [clientId] State transition from 
CREATED to REBALANCING (org.apache.kafka.streams.KafkaStreams:263)
[2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
[2019-02-12 07:02:17,200] INFO stream-client [clientId] State transition from 
REBALANCING to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:263)
[2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
[2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] State 
transition from CREATED to STARTING 
(org.apache.kafka.streams.processor.internals.StreamThread:214)
[2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] State 
transition from CREATED to STARTING 
(org.apache.kafka.streams.processor.internals.StreamThread:214)
[2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
Informed to shut down 
(org.apache.kafka.streams.processor.internals.StreamThread:1192)
[2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-238] State 
transition from STARTING to PENDING_SHUTDOWN 
(org.apache.kafka.streams.processor.internals.StreamThread:214)
[2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
Informed to shut down 
(org.apache.kafka.streams.processor.internals.StreamThread:1192)
[2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] State 
transition from STARTING to PENDING_SHUTDOWN 
(org.apache.kafka.streams.processor.internals.StreamThread:214)
[2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
(org.apache.kafka.clients.Metadata:365)
[2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
(org.apache.kafka.clients.Metadata:365)
[2019-02-12 07:02:17,205] INFO [Consumer 
clientId=clientId-StreamThread-238-consumer, groupId=appId] Discovered group 
coordinator localhost:36122 (id: 2147483647 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-02-12 07:02:17,205] INFO [Consumer 
clientId=clientId-StreamThread-239-consumer, groupId=appId] Discovered group 
coordinator localhost:36122 (id: 2147483647 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
[2019-02-12 07:02:17,206] INFO [Consumer 
clientId=clientId-StreamThread-238-consumer, groupId=appId] Revoking previously 
assigned partitions [] 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
[2019-02-12 07:02:17,206] INFO [Consumer 
clientId=clientId-StreamThread-239-consumer, groupId=appId] Revoking previously 
assigned partitions [] 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
[2019-02-12 07:02:17,206] INFO [Consumer 
clientId=clientId-StreamThread-238-consumer, groupId=appId] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-02-12 07:02:17,206] INFO [Consumer 
clientId=clientId-StreamThread-239-consumer, groupId=appId] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-02-12 07:02:17,208] INFO [Consumer 
clientId=clientId-StreamThread-239-consumer, groupId=appId] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-02-12 07:02:17,208] INFO [Consumer 
clientId=clientId-StreamThread-238-consumer, groupId=appId] (Re-)joining group 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
[2019-02-12 07:02:17,278] INFO Cluster ID: J8

[jira] [Created] (KAFKA-7922) Returned authorized operations in describe responses (KIP-430)

2019-02-12 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7922:
-

 Summary: Returned authorized operations in describe responses 
(KIP-430)
 Key: KAFKA-7922
 URL: https://issues.apache.org/jira/browse/KAFKA-7922
 Project: Kafka
  Issue Type: New Feature
  Components: core
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram


Add an option to request authorized operations on resources when describing 
resources (topics, onsumer groups and cluster).

See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses
 for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7897) Invalid use of epoch cache with old message format versions

2019-02-12 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-7897:
---
Affects Version/s: 1.1.1
   2.0.1
   2.1.0

> Invalid use of epoch cache with old message format versions
> ---
>
> Key: KAFKA-7897
> URL: https://issues.apache.org/jira/browse/KAFKA-7897
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1, 2.0.1, 2.1.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.0
>
>
> Message format downgrades are not supported, but they generally work as long 
> as broker/clients at least can continue to parse both message formats. After 
> a downgrade, the truncation logic should revert to using the high watermark, 
> but currently we use the existence of any cached epoch as the sole 
> prerequisite in order to leverage OffsetsForLeaderEpoch. This has the effect 
> of causing a massive truncation after startup which causes re-replication.
> I think our options to fix this are to either 1) clear the cache when we 
> notice a downgrade, or 2) forbid downgrades and raise an error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7401) Broker fails to start when recovering a segment from before the log start offset

2019-02-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766359#comment-16766359
 ] 

ASF GitHub Bot commented on KAFKA-7401:
---

hachikuji commented on pull request #6220: KAFKA-7401: Fix inconsistent range 
exception on segment recovery
URL: https://github.com/apache/kafka/pull/6220
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Broker fails to start when recovering a segment from before the log start 
> offset
> 
>
> Key: KAFKA-7401
> URL: https://issues.apache.org/jira/browse/KAFKA-7401
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bob Barrett
>Assignee: Anna Povzner
>Priority: Major
>
> If a segment needs to be recovered (for example, because of a missing index 
> file or uncompleted swap operation) and its base offset is less than the log 
> start offset, the broker will crash with the following error:
> Fatal error during KafkaServer startup. Prepare to shutdown 
> (kafka.server.KafkaServer)
>  java.lang.IllegalArgumentException: inconsistent range
>  at java.util.concurrent.ConcurrentSkipListMap$SubMap.(Unknown Source)
>  at java.util.concurrent.ConcurrentSkipListMap.subMap(Unknown Source)
>  at java.util.concurrent.ConcurrentSkipListMap.subMap(Unknown Source)
>  at kafka.log.Log$$anonfun$12.apply(Log.scala:1579)
>  at kafka.log.Log$$anonfun$12.apply(Log.scala:1578)
>  at scala.Option.map(Option.scala:146)
>  at kafka.log.Log.logSegments(Log.scala:1578)
>  at kafka.log.Log.kafka$log$Log$$recoverSegment(Log.scala:358)
>  at kafka.log.Log$$anonfun$completeSwapOperations$1.apply(Log.scala:389)
>  at kafka.log.Log$$anonfun$completeSwapOperations$1.apply(Log.scala:380)
>  at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
>  at kafka.log.Log.completeSwapOperations(Log.scala:380)
>  at kafka.log.Log.loadSegments(Log.scala:408)
>  at kafka.log.Log.(Log.scala:216)
>  at kafka.log.Log$.apply(Log.scala:1765)
>  at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:260)
>  at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:340)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>  at java.util.concurrent.FutureTask.run(Unknown Source)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>  at java.lang.Thread.run(Unknown Source)
> Since these segments are outside the log range, we should delete them, or at 
> least not block broker startup because of them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7401) Broker fails to start when recovering a segment from before the log start offset

2019-02-12 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7401.

   Resolution: Fixed
Fix Version/s: 1.1.2

> Broker fails to start when recovering a segment from before the log start 
> offset
> 
>
> Key: KAFKA-7401
> URL: https://issues.apache.org/jira/browse/KAFKA-7401
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Bob Barrett
>Assignee: Anna Povzner
>Priority: Major
> Fix For: 1.1.2
>
>
> If a segment needs to be recovered (for example, because of a missing index 
> file or uncompleted swap operation) and its base offset is less than the log 
> start offset, the broker will crash with the following error:
> Fatal error during KafkaServer startup. Prepare to shutdown 
> (kafka.server.KafkaServer)
>  java.lang.IllegalArgumentException: inconsistent range
>  at java.util.concurrent.ConcurrentSkipListMap$SubMap.(Unknown Source)
>  at java.util.concurrent.ConcurrentSkipListMap.subMap(Unknown Source)
>  at java.util.concurrent.ConcurrentSkipListMap.subMap(Unknown Source)
>  at kafka.log.Log$$anonfun$12.apply(Log.scala:1579)
>  at kafka.log.Log$$anonfun$12.apply(Log.scala:1578)
>  at scala.Option.map(Option.scala:146)
>  at kafka.log.Log.logSegments(Log.scala:1578)
>  at kafka.log.Log.kafka$log$Log$$recoverSegment(Log.scala:358)
>  at kafka.log.Log$$anonfun$completeSwapOperations$1.apply(Log.scala:389)
>  at kafka.log.Log$$anonfun$completeSwapOperations$1.apply(Log.scala:380)
>  at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
>  at kafka.log.Log.completeSwapOperations(Log.scala:380)
>  at kafka.log.Log.loadSegments(Log.scala:408)
>  at kafka.log.Log.(Log.scala:216)
>  at kafka.log.Log$.apply(Log.scala:1765)
>  at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:260)
>  at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:340)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>  at java.util.concurrent.FutureTask.run(Unknown Source)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>  at java.lang.Thread.run(Unknown Source)
> Since these segments are outside the log range, we should delete them, or at 
> least not block broker startup because of them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7921) Instable KafkaStreamsTest

2019-02-12 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766360#comment-16766360
 ] 

Guozhang Wang commented on KAFKA-7921:
--

Note that in a recent commit we've updated {{InternalTopicManager}} so that if 
topic creation / list topics return fatal errors it would be logged. So if we 
did not find it in the logs it means not related to admin-client creating 
topics.

The "informed shutdown" can only be triggered from two places: 1) 
KafkaStreams#close() call, which should not be the case (there's no caller at 
that time). or 2):

{code}
if (streamThread.assignmentErrorCode.get() == 
StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
log.debug("Received error code {} - shutdown", 
streamThread.assignmentErrorCode.get());
streamThread.shutdown();
streamThread.setStateListener(null);
return;
}
{code}

This indicates that the source topics are not available yet:

{code}
for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
for (final String topic : topicsInfo.sourceTopics) {
if 
(!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
!metadata.topics().contains(topic)) {
return errorAssignment(clientsMetadata, topic, 
Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code);
}
}
for (final InternalTopicConfig topic: 
topicsInfo.repartitionSourceTopics.values()) {
repartitionTopicMetadata.put(topic.name(), new 
InternalTopicMetadata(topic));
}
}
{code}

I'd suggest we upgrade DEBUG to ERROR logs on the above places when setting the 
error, as well as when receiving the error code to confirm. And in this case, 
inside {{KafkaStreamsTest}} we should add waitForCondition to wait for source 
topics to be successfully created. Cc [~vvcephei]

> Instable KafkaStreamsTest
> -
>
> Key: KAFKA-7921
> URL: https://issues.apache.org/jira/browse/KAFKA-7921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>
> {{KafkaStreamsTest}} failed multiple times, eg,
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.shouldThrowOnCleanupWhileRunning(KafkaStreamsTest.java:556){quote}
> or
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:255){quote}
>  
> The preserved logs are as follows:
> {quote}[2019-02-12 07:02:17,198] INFO Kafka version: 2.3.0-SNAPSHOT 
> (org.apache.kafka.common.utils.AppInfoParser:109)
> [2019-02-12 07:02:17,198] INFO Kafka commitId: 08036fa4b1e5b822 
> (org.apache.kafka.common.utils.AppInfoParser:110)
> [2019-02-12 07:02:17,199] INFO stream-client [clientId] State transition from 
> CREATED to REBALANCING (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-client [clientId] State transition from 
> REBALANCING to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-238] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO

[jira] [Created] (KAFKA-7923) Add unit test to verify Kafka-7401 in AK versions >= 2.0

2019-02-12 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-7923:
---

 Summary: Add unit test to verify Kafka-7401 in AK versions >= 2.0
 Key: KAFKA-7923
 URL: https://issues.apache.org/jira/browse/KAFKA-7923
 Project: Kafka
  Issue Type: Test
Affects Versions: 2.1.0, 2.0.1
Reporter: Anna Povzner
Assignee: Anna Povzner


Kafka-7401 affected versions 1.0 and 1.1, which was fixed and the unit test was 
added. Versions 2.0 did not have that bug, because it was fixed as part of 
another change. To make sure we don't regress, we need to add a similar unit 
test that was added as part of Kafka-7401.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7921) Instable KafkaStreamsTest

2019-02-12 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766393#comment-16766393
 ] 

Guozhang Wang commented on KAFKA-7921:
--

Yup that's what I proposed as well :)

> Instable KafkaStreamsTest
> -
>
> Key: KAFKA-7921
> URL: https://issues.apache.org/jira/browse/KAFKA-7921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>
> {{KafkaStreamsTest}} failed multiple times, eg,
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.shouldThrowOnCleanupWhileRunning(KafkaStreamsTest.java:556){quote}
> or
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:255){quote}
>  
> The preserved logs are as follows:
> {quote}[2019-02-12 07:02:17,198] INFO Kafka version: 2.3.0-SNAPSHOT 
> (org.apache.kafka.common.utils.AppInfoParser:109)
> [2019-02-12 07:02:17,198] INFO Kafka commitId: 08036fa4b1e5b822 
> (org.apache.kafka.common.utils.AppInfoParser:110)
> [2019-02-12 07:02:17,199] INFO stream-client [clientId] State transition from 
> CREATED to REBALANCING (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-client [clientId] State transition from 
> REBALANCING to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-238] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
> (org.apache.kafka.clients.Metadata:365)
> [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
> (org.apache.kafka.clients.Metadata:365)
> [2019-02-12 07:02:17,205] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] Discovered group 
> coordinator localhost:36122 (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
> [2019-02-12 07:02:17,205] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] Discovered group 
> coordinator localhost:36122 (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] Revoking 
> previously assigned partitions [] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] Revoking 
> previously assigned partitions [] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] (Re-)joining 
> group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] (Re-)joining 
> group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
> [2019-02-12 07:02:17,

[jira] [Commented] (KAFKA-7656) ReplicaManager fetch fails on leader due to long/integer overflow

2019-02-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766400#comment-16766400
 ] 

ASF GitHub Bot commented on KAFKA-7656:
---

jsancio commented on pull request #6261: KAFKA-7656: Disallow negative max 
bytes in fetch request
URL: https://github.com/apache/kafka/pull/6261
 
 
   Implement a new Validator that
   
   1. Returns an INVALID_REQUEST for all of the fetch partitions if the
   fetch request has a negative max bytes.
   2. Returns an INVALID_REQUEST for any fetch partion that has a negative
   max bytes.
   
   Extended the unittest FetchRequestTest to cover this scenario.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ReplicaManager fetch fails on leader due to long/integer overflow
> -
>
> Key: KAFKA-7656
> URL: https://issues.apache.org/jira/browse/KAFKA-7656
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.1
> Environment: Linux 3.10.0-693.el7.x86_64 #1 SMP Thu Jul 6 19:56:57 
> EDT 2017 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Patrick Haas
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> (Note: From 2.0.1-cp1 from confluent distribution)
> {{[2018-11-19 21:13:13,687] ERROR [ReplicaManager broker=103] Error 
> processing fetch operation on partition __consumer_offsets-20, offset 0 
> (kafka.server.ReplicaManager)}}
> {{java.lang.IllegalArgumentException: Invalid max size -2147483648 for log 
> read from segment FileRecords(file= 
> /prod/kafka/data/kafka-logs/__consumer_offsets-20/.log, 
> start=0, end=2147483647)}}
> {{ at kafka.log.LogSegment.read(LogSegment.scala:274)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1159)}}
> {{ at kafka.log.Log$$anonfun$read$2.apply(Log.scala:1114)}}
> {{ at kafka.log.Log.maybeHandleIOException(Log.scala:1842)}}
> {{ at kafka.log.Log.read(Log.scala:1114)}}
> {{ at 
> kafka.server.ReplicaManager.kafka$server$ReplicaManager$$read$1(ReplicaManager.scala:912)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:974)}}
> {{ at 
> kafka.server.ReplicaManager$$anonfun$readFromLocalLog$1.apply(ReplicaManager.scala:973)}}
> {{ at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)}}
> {{ at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)}}
> {{ at kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:973)}}
> {{ at kafka.server.ReplicaManager.readFromLog$1(ReplicaManager.scala:802)}}
> {{ at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:815)}}
> {{ at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:685)}}
> {{ at kafka.server.KafkaApis.handle(KafkaApis.scala:114)}}
> {{ at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7921) Instable KafkaStreamsTest

2019-02-12 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766391#comment-16766391
 ] 

Matthias J. Sax commented on KAFKA-7921:


Can we do a "two phase" approach here? First only change the log level and wait 
until the test fails again to see if it's really a correct root cause analysis? 
And afterwards put the fix into the test?

> Instable KafkaStreamsTest
> -
>
> Key: KAFKA-7921
> URL: https://issues.apache.org/jira/browse/KAFKA-7921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Major
>
> {{KafkaStreamsTest}} failed multiple times, eg,
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.shouldThrowOnCleanupWhileRunning(KafkaStreamsTest.java:556){quote}
> or
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:255){quote}
>  
> The preserved logs are as follows:
> {quote}[2019-02-12 07:02:17,198] INFO Kafka version: 2.3.0-SNAPSHOT 
> (org.apache.kafka.common.utils.AppInfoParser:109)
> [2019-02-12 07:02:17,198] INFO Kafka commitId: 08036fa4b1e5b822 
> (org.apache.kafka.common.utils.AppInfoParser:110)
> [2019-02-12 07:02:17,199] INFO stream-client [clientId] State transition from 
> CREATED to REBALANCING (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-client [clientId] State transition from 
> REBALANCING to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-238] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
> (org.apache.kafka.clients.Metadata:365)
> [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
> (org.apache.kafka.clients.Metadata:365)
> [2019-02-12 07:02:17,205] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] Discovered group 
> coordinator localhost:36122 (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
> [2019-02-12 07:02:17,205] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] Discovered group 
> coordinator localhost:36122 (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] Revoking 
> previously assigned partitions [] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] Revoking 
> previously assigned partitions [] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] (Re-)joining 
> group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clien

[jira] [Commented] (KAFKA-7799) Fix flaky test RestServerTest.testCORSEnabled

2019-02-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766403#comment-16766403
 ] 

ASF GitHub Bot commented on KAFKA-7799:
---

gwenshap commented on pull request #6236: KAFKA-7799: Use httpcomponents-client 
in RestServerTest.
URL: https://github.com/apache/kafka/pull/6236
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix flaky test RestServerTest.testCORSEnabled
> -
>
> Key: KAFKA-7799
> URL: https://issues.apache.org/jira/browse/KAFKA-7799
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Starting to see this failure quite a lot, locally and on jenkins:
> {code}
> org.apache.kafka.connect.runtime.rest.RestServerTest.testCORSEnabled
> Failing for the past 7 builds (Since Failed#18600 )
> Took 0.7 sec.
> Error Message
> java.lang.AssertionError: expected: but was:
> Stacktrace
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.kafka.connect.runtime.rest.RestServerTest.checkCORSRequest(RestServerTest.java:221)
>   at 
> org.apache.kafka.connect.runtime.rest.RestServerTest.testCORSEnabled(RestServerTest.java:84)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:326)
>   at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89)
>   at 
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97)
> {code}
> If it helps, I see an uncaught exception in the stdout:
> {code}
> [2019-01-08 19:35:23,664] ERROR Uncaught exception in REST call to 
> /connector-plugins/FileStreamSource/validate 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
> javax.ws.rs.NotFoundException: HTTP 404 Not Found
>   at 
> org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:274)
>   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
>   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
>   at 
> org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
>   at 
> org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
>   at 
> org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7924) Kafka broker does not list nodes first ~ 30s after startup

2019-02-12 Thread Gert van Dijk (JIRA)
Gert van Dijk created KAFKA-7924:


 Summary: Kafka broker does not list nodes first ~ 30s after startup
 Key: KAFKA-7924
 URL: https://issues.apache.org/jira/browse/KAFKA-7924
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.1.0
 Environment: Clean Docker environment, very simple everything single 
node.
Reporter: Gert van Dijk


Steps to reproduce:
# Start single Zookeeper instance.
# Start single Kafka instance.
# Validate that the line {noformat}INFO [KafkaServer id=0] started 
(kafka.server.KafkaServer){noformat} is printed.
# Connect using a Kafka client +with debug logging enabled+, right after seeing 
that line, _do not wait_.
# Observe that it is connected to the Kafka broker just fine, but that the 
broker lists an empty lists of nodes: {noformat}Updating cluster metadata to 
Cluster(id = xxx, nodes = [], [...]{noformat}
# Keep watching for about 30 seconds, seeing that log entry popping up hundreds 
of times. Nothing happens in Kafka/Zookeeper logs in the meantime, but then 
suddenly the Kafka server start reporting nodes to the connected client: 
{noformat}[...] nodes = [kafka:9092 (id: 0 rack: null)], [...]
o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] 
Initiating connection to node [...]{noformat} and it connects all fine.

My expectation is that it should list itself rightaway after listening on the 
network as a broker. It should not take ~ 30 seconds for a Kafka client to be 
blocked waiting and timing out first.

Use case: I'm trying to create topics in a CI/CD pipeline and spinning up a 
clean Kafka for that. Right after it's started, it should be possible to create 
topics using an AdminClient, but currently experiencing {{TimeoutException: 
Timed out waiting for a node assignment}} errors unless I put a {{sleep 30}} 
between observing a Kafka reportedly ready and starting the topic creation 
process. Not very ideal.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7924) Kafka broker does not list nodes first ~ 30s after startup

2019-02-12 Thread Gert van Dijk (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gert van Dijk updated KAFKA-7924:
-
Description: 
Steps to reproduce:
# Start single Zookeeper instance.
# Start single Kafka instance.
# Validate that the line {noformat}INFO [KafkaServer id=0] started 
(kafka.server.KafkaServer){noformat} is printed.
# Connect using a Kafka client +with debug logging enabled+, right after seeing 
that line, _do not wait_.
# Observe that it is connected to the Kafka broker just fine, but that the 
broker lists an +empty lists of nodes+: {noformat}Updating cluster metadata to 
Cluster(id = xxx, nodes = [], [...]{noformat}
# Keep watching for about 30 seconds, seeing that log entry popping up hundreds 
of times on the client. Nothing happens in Kafka/Zookeeper logs in the 
meantime, but then suddenly the Kafka server start reporting nodes to the 
connected client: {noformat}[...] nodes = [kafka:9092 (id: 0 rack: null)], [...]
o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] 
Initiating connection to node [...]{noformat} and it connects all fine.

My expectation is that it should list itself rightaway after listening on the 
network as a broker. It should not take ~ 30 seconds for a Kafka client to be 
blocked waiting and timing out first.

Use case: I'm trying to create topics in a CI/CD pipeline and spinning up a 
clean Kafka for that. Right after it's started, it should be possible to create 
topics using an AdminClient, but currently experiencing {{TimeoutException: 
Timed out waiting for a node assignment}} errors unless I put a {{sleep 30}} 
between observing a Kafka reportedly ready and starting the topic creation 
process. Not very ideal.

  was:
Steps to reproduce:
# Start single Zookeeper instance.
# Start single Kafka instance.
# Validate that the line {noformat}INFO [KafkaServer id=0] started 
(kafka.server.KafkaServer){noformat} is printed.
# Connect using a Kafka client +with debug logging enabled+, right after seeing 
that line, _do not wait_.
# Observe that it is connected to the Kafka broker just fine, but that the 
broker lists an empty lists of nodes: {noformat}Updating cluster metadata to 
Cluster(id = xxx, nodes = [], [...]{noformat}
# Keep watching for about 30 seconds, seeing that log entry popping up hundreds 
of times. Nothing happens in Kafka/Zookeeper logs in the meantime, but then 
suddenly the Kafka server start reporting nodes to the connected client: 
{noformat}[...] nodes = [kafka:9092 (id: 0 rack: null)], [...]
o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] 
Initiating connection to node [...]{noformat} and it connects all fine.

My expectation is that it should list itself rightaway after listening on the 
network as a broker. It should not take ~ 30 seconds for a Kafka client to be 
blocked waiting and timing out first.

Use case: I'm trying to create topics in a CI/CD pipeline and spinning up a 
clean Kafka for that. Right after it's started, it should be possible to create 
topics using an AdminClient, but currently experiencing {{TimeoutException: 
Timed out waiting for a node assignment}} errors unless I put a {{sleep 30}} 
between observing a Kafka reportedly ready and starting the topic creation 
process. Not very ideal.


> Kafka broker does not list nodes first ~ 30s after startup
> --
>
> Key: KAFKA-7924
> URL: https://issues.apache.org/jira/browse/KAFKA-7924
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
> Environment: Clean Docker environment, very simple everything single 
> node.
>Reporter: Gert van Dijk
>Priority: Major
>
> Steps to reproduce:
> # Start single Zookeeper instance.
> # Start single Kafka instance.
> # Validate that the line {noformat}INFO [KafkaServer id=0] started 
> (kafka.server.KafkaServer){noformat} is printed.
> # Connect using a Kafka client +with debug logging enabled+, right after 
> seeing that line, _do not wait_.
> # Observe that it is connected to the Kafka broker just fine, but that the 
> broker lists an +empty lists of nodes+: {noformat}Updating cluster metadata 
> to Cluster(id = xxx, nodes = [], [...]{noformat}
> # Keep watching for about 30 seconds, seeing that log entry popping up 
> hundreds of times on the client. Nothing happens in Kafka/Zookeeper logs in 
> the meantime, but then suddenly the Kafka server start reporting nodes to the 
> connected client: {noformat}[...] nodes = [kafka:9092 (id: 0 rack: null)], 
> [...]
> o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-1] 
> Initiating connection to node [...]{noformat} and it connects all fine.
> My expectation is that it should list itself rightaway after listening on the 
> network as a broker. It should not take ~ 30 seconds for a Kafka cl

[jira] [Comment Edited] (KAFKA-5792) Transient failure in KafkaAdminClientTest.testHandleTimeout

2019-02-12 Thread Gert van Dijk (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16765898#comment-16765898
 ] 

Gert van Dijk edited comment on KAFKA-5792 at 2/12/19 8:28 PM:
---

I think I'm also seeing this issue still with 2.1.0 with my own AdminClient. 
Reported via KAFKA-7924.


was (Author: gertvdijk):
I'm also seeing this issue still with 2.1.0 with my own AdminClient. It took me 
huge amounts of time to figure out what is happening, to discover a unit test 
is disabled in Kafka masking the erroneous behaviour. :(

FWIW, what I'm seeing with debug logging enabled is: 1) successful connection 
to its bootstrap server. 2) huge amounts of those lines:
{noformat}
[kafka-admin-client-thread | adminclient-1] DEBUG
   o.a.k.c.a.i.AdminMetadataManager - [AdminClient clientId=adminclient-1]
   Updating cluster metadata to Cluster(id = q7XgghZqQUW_o5W2-Nn5Qw,
   nodes = [], partitions = [], controller = null){noformat}
(note {{nodes = []}}.)

and 3) then, after a while (could be 1 second, could be 40+ seconds in my 
case), it finally responds with a node and it connects just fine:
{noformat}
[kafka-admin-client-thread | adminclient-1] DEBUG 
   o.a.k.c.a.i.AdminMetadataManager - [AdminClient clientId=adminclient-1]
   Updating cluster metadata to Cluster(id = q7XgghZqQUW_o5W2-Nn5Qw,
   nodes = [kafka:9092 (id: 0 rack: null)], partitions = [],
   controller = kafka:9092 (id: 0 rack: null))  
   
[kafka-admin-client-thread | adminclient-1]
   DEBUG o.apache.kafka.clients.NetworkClient -
   [AdminClient clientId=adminclient-1] Initiating connection to node
   kafka:9092 (id: 0 rack: null) using address kafka/x{noformat}

This all happens in a completely clean, freshly started, default configuration, 
non-SSL, no-authentication, single-node Zookeeper-Kafka deployment in Docker 
without any data or other connections. (See also [my QA @ 
ServerFault|https://serverfault.com/q/953393/135437].)

It would be really great if this could be fixed some day.

> Transient failure in KafkaAdminClientTest.testHandleTimeout
> ---
>
> Key: KAFKA-5792
> URL: https://issues.apache.org/jira/browse/KAFKA-5792
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Colin P. McCabe
>Priority: Major
>  Labels: transient-unit-test-failure
> Fix For: 2.2.0
>
>
> The {{KafkaAdminClientTest.testHandleTimeout}} test occasionally fails with 
> the following:
> {noformat}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node 
> assignment.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:213)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClientTest.testHandleTimeout(KafkaAdminClientTest.java:356)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
> for a node assignment.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7921) Instable KafkaStreamsTest

2019-02-12 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-7921:
---

Assignee: John Roesler

> Instable KafkaStreamsTest
> -
>
> Key: KAFKA-7921
> URL: https://issues.apache.org/jira/browse/KAFKA-7921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>
> {{KafkaStreamsTest}} failed multiple times, eg,
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.shouldThrowOnCleanupWhileRunning(KafkaStreamsTest.java:556){quote}
> or
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:255){quote}
>  
> The preserved logs are as follows:
> {quote}[2019-02-12 07:02:17,198] INFO Kafka version: 2.3.0-SNAPSHOT 
> (org.apache.kafka.common.utils.AppInfoParser:109)
> [2019-02-12 07:02:17,198] INFO Kafka commitId: 08036fa4b1e5b822 
> (org.apache.kafka.common.utils.AppInfoParser:110)
> [2019-02-12 07:02:17,199] INFO stream-client [clientId] State transition from 
> CREATED to REBALANCING (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-client [clientId] State transition from 
> REBALANCING to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-238] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
> (org.apache.kafka.clients.Metadata:365)
> [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
> (org.apache.kafka.clients.Metadata:365)
> [2019-02-12 07:02:17,205] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] Discovered group 
> coordinator localhost:36122 (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
> [2019-02-12 07:02:17,205] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] Discovered group 
> coordinator localhost:36122 (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] Revoking 
> previously assigned partitions [] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] Revoking 
> previously assigned partitions [] 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:458)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] (Re-)joining 
> group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] (Re-)joining 
> group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)
> [2019-02-12 07:02:17,208] INFO [Consumer 
> clien

[jira] [Commented] (KAFKA-7921) Instable KafkaStreamsTest

2019-02-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766445#comment-16766445
 ] 

ASF GitHub Bot commented on KAFKA-7921:
---

vvcephei commented on pull request #6262: KAFKA-7921: log at error level for 
missing source topic
URL: https://github.com/apache/kafka/pull/6262
 
 
   This condition is a fatal error, so error level is warranted, to provide 
more context on why Streams shuts down.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Instable KafkaStreamsTest
> -
>
> Key: KAFKA-7921
> URL: https://issues.apache.org/jira/browse/KAFKA-7921
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>
> {{KafkaStreamsTest}} failed multiple times, eg,
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.shouldThrowOnCleanupWhileRunning(KafkaStreamsTest.java:556){quote}
> or
> {quote}java.lang.AssertionError: Condition not met within timeout 15000. 
> Streams never started.
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
> at 
> org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:255){quote}
>  
> The preserved logs are as follows:
> {quote}[2019-02-12 07:02:17,198] INFO Kafka version: 2.3.0-SNAPSHOT 
> (org.apache.kafka.common.utils.AppInfoParser:109)
> [2019-02-12 07:02:17,198] INFO Kafka commitId: 08036fa4b1e5b822 
> (org.apache.kafka.common.utils.AppInfoParser:110)
> [2019-02-12 07:02:17,199] INFO stream-client [clientId] State transition from 
> CREATED to REBALANCING (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-client [clientId] State transition from 
> REBALANCING to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:263)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> Starting (org.apache.kafka.streams.processor.internals.StreamThread:767)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-239] 
> State transition from CREATED to STARTING 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,200] INFO stream-thread [clientId-StreamThread-238] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-238] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> Informed to shut down 
> (org.apache.kafka.streams.processor.internals.StreamThread:1192)
> [2019-02-12 07:02:17,201] INFO stream-thread [clientId-StreamThread-239] 
> State transition from STARTING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread:214)
> [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
> (org.apache.kafka.clients.Metadata:365)
> [2019-02-12 07:02:17,205] INFO Cluster ID: J8uJhiTKQx-Y_i9LzT0iLg 
> (org.apache.kafka.clients.Metadata:365)
> [2019-02-12 07:02:17,205] INFO [Consumer 
> clientId=clientId-StreamThread-238-consumer, groupId=appId] Discovered group 
> coordinator localhost:36122 (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
> [2019-02-12 07:02:17,205] INFO [Consumer 
> clientId=clientId-StreamThread-239-consumer, groupId=appId] Discovered group 
> coordinator localhost:36122 (id: 2147483647 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)
> [2019-02-12 07:02:17,206] INFO [Consumer 
> clientId=clientId-Strea

[jira] [Resolved] (KAFKA-7799) Fix flaky test RestServerTest.testCORSEnabled

2019-02-12 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7799.
--
   Resolution: Fixed
Fix Version/s: 2.3.0

> Fix flaky test RestServerTest.testCORSEnabled
> -
>
> Key: KAFKA-7799
> URL: https://issues.apache.org/jira/browse/KAFKA-7799
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.3.0
>
>
> Starting to see this failure quite a lot, locally and on jenkins:
> {code}
> org.apache.kafka.connect.runtime.rest.RestServerTest.testCORSEnabled
> Failing for the past 7 builds (Since Failed#18600 )
> Took 0.7 sec.
> Error Message
> java.lang.AssertionError: expected: but was:
> Stacktrace
> java.lang.AssertionError: expected: but was:
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:118)
>   at org.junit.Assert.assertEquals(Assert.java:144)
>   at 
> org.apache.kafka.connect.runtime.rest.RestServerTest.checkCORSRequest(RestServerTest.java:221)
>   at 
> org.apache.kafka.connect.runtime.rest.RestServerTest.testCORSEnabled(RestServerTest.java:84)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at org.junit.internal.runners.TestMethod.invoke(TestMethod.java:68)
>   at 
> org.powermock.modules.junit4.internal.impl.PowerMockJUnit44RunnerDelegateImpl$PowerMockJUnit44MethodRunner.runTestMethod(PowerMockJUnit44RunnerDelegateImpl.java:326)
>   at org.junit.internal.runners.MethodRoadie$2.run(MethodRoadie.java:89)
>   at 
> org.junit.internal.runners.MethodRoadie.runBeforesThenTestThenAfters(MethodRoadie.java:97)
> {code}
> If it helps, I see an uncaught exception in the stdout:
> {code}
> [2019-01-08 19:35:23,664] ERROR Uncaught exception in REST call to 
> /connector-plugins/FileStreamSource/validate 
> (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
> javax.ws.rs.NotFoundException: HTTP 404 Not Found
>   at 
> org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:274)
>   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
>   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
>   at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
>   at 
> org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
>   at 
> org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
>   at 
> org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7672) The local state not fully restored after KafkaStream rebalanced, resulting in data loss

2019-02-12 Thread linyue li (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766723#comment-16766723
 ] 

linyue li commented on KAFKA-7672:
--

[~guozhang] Thanks for the fix, and I'll validate it once it's merged.

> The local state not fully restored after KafkaStream rebalanced, resulting in 
> data loss
> ---
>
> Key: KAFKA-7672
> URL: https://issues.apache.org/jira/browse/KAFKA-7672
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.1.0
>Reporter: linyue li
>Assignee: linyue li
>Priority: Critical
>  Labels: bug
> Fix For: 2.2.0
>
>
> Normally, when a task is migrated to a new thread and no checkpoint file was 
> found under its task folder, Kafka Stream needs to restore the local state 
> for remote changelog topic completely and then resume running. However, in 
> some scenarios, we found that Kafka Stream *NOT* restore this state even no 
> checkpoint was found, but just clean the state folder and transition to 
> running state directly, resulting the historic data loss. 
> To be specific, I will give the detailed logs for Kafka Stream in our project 
> to show this scenario: 
> {quote}2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Revoking previously assigned partitions [AuditTrailBatch-0-5]
> 2018-10-23 08:27:07,684 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_ASSIGNED to 
> PARTITIONS_REVOKED
> 2018-10-23 08:27:10,856 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> (Re-)joining group
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Successfully joined group with generation 323
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-consumer, groupId=AuditTrailBatch] 
> Setting newly assigned partitions [AuditTrailBatch-store1-repartition-1]
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] State transition from PARTITIONS_REVOKED to 
> PARTITIONS_ASSIGNED
> 2018-10-23 08:27:53,153 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] *Creating producer client for task 1_1*
> 2018-10-23 08:27:53,622 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1] partition assignment took 469 ms.
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.streams.processor.internals.StoreChangelogReader - 
> stream-thread [AuditTrailBatch-StreamThread-1]*No checkpoint found for task 
> 1_1 state store AuditTrailBatch-store1-changelog-1 with EOS turned on.* 
> *Reinitializing the task and restore its state from the beginning.*
> 2018-10-23 08:27:54,357 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher  - [Consumer 
> clientId=AuditTrailBatch-StreamThread-1-restore-consumer, groupId=]*Resetting 
> offset for partition AuditTrailBatch-store1-changelog-1 to offset 0.*
> 2018-10-23 08:27:54,653 INFO  
> org.apache.kafka.streams.processor.internals.StreamThread    - stream-thread 
> [AuditTrailBatch-StreamThread-1]*State transition from PARTITIONS_ASSIGNED to 
> RUNNING*
> {quote}
> From the logs above, we can get the procedure for thread 
> AuditTrailBatch-StreamThread-1:
>  # the previous running task assigned to thread 1 is task 0_5 (the 
> corresponding partition is AuditTrailBatch-0-5)
>  # group begins to rebalance, the new task 1_1 is assigned to thread 1.
>  # no checkpoint was found under 1_1 state folder, so reset the offset to 0 
> and clean the local state folder.
>  # thread 1 transitions to RUNNING state directly without the restoration for 
> task 1_1, so the historic data for state 1_1 is lost for thread 1. 
> *ThoubleShoot*
> To investigate the cause for this issue, we analysis the source code in 
> KafkaStream and found the key is the variable named "completedRestorers".
> This is the definition of the variable:
> {code:java}
> private final Set completedRestorers = new HashSet<>();{code}
> Each thread object has its own completedRestorers, which is created in the 
> thread initialization, and not acc

[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema

2019-02-12 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766769#comment-16766769
 ] 

Ismael Juma commented on KAFKA-7481:


I would remove the target version and reduce priority personally.

> Consider options for safer upgrade of offset commit value schema
> 
>
> Key: KAFKA-7481
> URL: https://issues.apache.org/jira/browse/KAFKA-7481
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Blocker
> Fix For: 2.2.0
>
>
> KIP-211 and KIP-320 add new versions of the offset commit value schema. The 
> use of the new schema version is controlled by the 
> `inter.broker.protocol.version` configuration.  Once the new inter-broker 
> version is in use, it is not possible to downgrade since the older brokers 
> will not be able to parse the new schema. 
> The options at the moment are the following:
> 1. Do nothing. Users can try the new version and keep 
> `inter.broker.protocol.version` locked to the old release. Downgrade will 
> still be possible, but users will not be able to test new capabilities which 
> depend on inter-broker protocol changes.
> 2. Instead of using `inter.broker.protocol.version`, we could use 
> `message.format.version`. This would basically extend the use of this config 
> to apply to all persistent formats. The advantage is that it allows users to 
> upgrade the broker and begin using the new inter-broker protocol while still 
> allowing downgrade. But features which depend on the persistent format could 
> not be tested.
> Any other options?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7565) NPE in KafkaConsumer

2019-02-12 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766772#comment-16766772
 ] 

Ismael Juma commented on KAFKA-7565:


Any thoughts [~rsivaram]? Maybe related 
https://github.com/apache/kafka/pull/6221

> NPE in KafkaConsumer
> 
>
> Key: KAFKA-7565
> URL: https://issues.apache.org/jira/browse/KAFKA-7565
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.1
>Reporter: Alexey Vakhrenev
>Priority: Critical
> Fix For: 2.2.0
>
>
> The stacktrace is
> {noformat}
> java.lang.NullPointerException
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
> {noformat}
> Couldn't find minimal reproducer, but it happens quite often in our system. 
> We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is 
> somehow related.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2019-02-12 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7304:
---
Fix Version/s: (was: 2.0.2)
   (was: 2.2.0)
   (was: 1.1.2)

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Major
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot 
> 2018-08-29 at 10.50.47 AM.png, Screen Shot 2018-09-29 at 10.38.12 PM.png, 
> Screen Shot 2018-09-29 at 10.38.38 PM.png, Screen Shot 2018-09-29 at 8.34.50 
> PM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7556) KafkaConsumer.beginningOffsets does not return actual first offsets

2019-02-12 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766773#comment-16766773
 ] 

Ismael Juma commented on KAFKA-7556:


[~mgharat] are you still planning to work on this? Removing 2.2.

> KafkaConsumer.beginningOffsets does not return actual first offsets
> ---
>
> Key: KAFKA-7556
> URL: https://issues.apache.org/jira/browse/KAFKA-7556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0
>Reporter: Robert V
>Priority: Critical
>  Labels: documentation, usability
> Fix For: 2.2.0
>
>
> h2. Description of the problem
> The method `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets` 
> claims in its Javadoc documentation that it would 'Get the first offset for 
> the given partitions.'.
> I used it with a compacted topic, and it always returned offset 0 for all 
> partitions.
>  Not sure if using a compacted topic actually matters, but I'm enclosing this 
> information anyway.
> Given a Kafka topic with retention set, and old log files being removed as a 
> result of that, the effective start offset of those partitions move further; 
> it simply will be greater than offset 0.
> However, calling the `beginningOffsets` method always returns offset 0 as the 
> first offset.
> In contrast, when the method 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes` is called 
> with a timestamp of 0L (UNIX epoch 1st Jan, 1970), it correctly returns the 
> effective start offsets for each partitions.
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets`: 
> {code:java}
> {test.topic-87=0, test.topic-54=0, test.topic-21=0, test.topic-79=0, 
> test.topic-46=0, test.topic-13=0, test.topic-70=0, test.topic-37=0, 
> test.topic-12=0, test.topic-95=0, test.topic-62=0, test.topic-29=0, 
> test.topic-4=0, test.topic-88=0, test.topic-55=0, test.topic-22=0, 
> test.topic-80=0, test.topic-47=0, test.topic-14=0, test.topic-71=0, 
> test.topic-38=0, test.topic-5=0, test.topic-96=0, test.topic-63=0, 
> test.topic-30=0, test.topic-56=0, test.topic-23=0, test.topic-89=0, 
> test.topic-48=0, test.topic-15=0, test.topic-81=0, test.topic-72=0, 
> test.topic-39=0, test.topic-6=0, test.topic-64=0, test.topic-31=0, 
> test.topic-97=0, test.topic-24=0, test.topic-90=0, test.topic-57=0, 
> test.topic-16=0, test.topic-82=0, test.topic-49=0, test.topic-40=0, 
> test.topic-7=0, test.topic-73=0, test.topic-32=0, test.topic-98=0, 
> test.topic-65=0, test.topic-91=0, test.topic-58=0, test.topic-25=0, 
> test.topic-83=0, test.topic-50=0, test.topic-17=0, test.topic-8=0, 
> test.topic-74=0, test.topic-41=0, test.topic-0=0, test.topic-99=0, 
> test.topic-66=0, test.topic-33=0, test.topic-92=0, test.topic-59=0, 
> test.topic-26=0, test.topic-84=0, test.topic-51=0, test.topic-18=0, 
> test.topic-75=0, test.topic-42=0, test.topic-9=0, test.topic-67=0, 
> test.topic-34=0, test.topic-1=0, test.topic-85=0, test.topic-60=0, 
> test.topic-27=0, test.topic-77=0, test.topic-52=0, test.topic-19=0, 
> test.topic-76=0, test.topic-43=0, test.topic-10=0, test.topic-93=0, 
> test.topic-68=0, test.topic-35=0, test.topic-2=0, test.topic-86=0, 
> test.topic-53=0, test.topic-28=0, test.topic-78=0, test.topic-45=0, 
> test.topic-20=0, test.topic-69=0, test.topic-44=0, test.topic-11=0, 
> test.topic-94=0, test.topic-61=0, test.topic-36=0, test.topic-3=0}
> {code}
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes`:
> {code:java}
> {test.topic-87=(timestamp=1511264434285, offset=289), 
> test.topic-54=(timestamp=1511265134993, offset=45420), 
> test.topic-21=(timestamp=1511265534207, offset=63643), 
> test.topic-79=(timestamp=1511270338275, offset=380750), 
> test.topic-46=(timestamp=1511266883588, offset=266379), 
> test.topic-13=(timestamp=1511265900538, offset=98512), 
> test.topic-70=(timestamp=1511266972452, offset=118522), 
> test.topic-37=(timestamp=1511264396370, offset=763), 
> test.topic-12=(timestamp=1511265504886, offset=61108), 
> test.topic-95=(timestamp=1511289492800, offset=847647), 
> test.topic-62=(timestamp=1511265831298, offset=68299), 
> test.topic-29=(timestamp=1511278767417, offset=548361), 
> test.topic-4=(timestamp=1511269316679, offset=144855), 
> test.topic-88=(timestamp=1511265608468, offset=107831), 
> test.topic-55=(timestamp=1511267449288, offset=129241), 
> test.topic-22=(timestamp=1511283134114, offset=563095), 
> test.topic-80=(timestamp=1511277334877, offset=534859), 
> test.topic-47=(timestamp=1511265530689, offset=71608), 
> test.topic-14=(timestamp=1511266308829, offset=80962), 
> test.topic-71=(timestamp=1511265474740, offset=83607), 
> test.topic-38=(timestamp=1511268268259, offset=166460), 
> test.t

[jira] [Updated] (KAFKA-7556) KafkaConsumer.beginningOffsets does not return actual first offsets

2019-02-12 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7556:
---
Fix Version/s: (was: 2.2.0)

> KafkaConsumer.beginningOffsets does not return actual first offsets
> ---
>
> Key: KAFKA-7556
> URL: https://issues.apache.org/jira/browse/KAFKA-7556
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0
>Reporter: Robert V
>Priority: Critical
>  Labels: documentation, usability
>
> h2. Description of the problem
> The method `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets` 
> claims in its Javadoc documentation that it would 'Get the first offset for 
> the given partitions.'.
> I used it with a compacted topic, and it always returned offset 0 for all 
> partitions.
>  Not sure if using a compacted topic actually matters, but I'm enclosing this 
> information anyway.
> Given a Kafka topic with retention set, and old log files being removed as a 
> result of that, the effective start offset of those partitions move further; 
> it simply will be greater than offset 0.
> However, calling the `beginningOffsets` method always returns offset 0 as the 
> first offset.
> In contrast, when the method 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes` is called 
> with a timestamp of 0L (UNIX epoch 1st Jan, 1970), it correctly returns the 
> effective start offsets for each partitions.
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets`: 
> {code:java}
> {test.topic-87=0, test.topic-54=0, test.topic-21=0, test.topic-79=0, 
> test.topic-46=0, test.topic-13=0, test.topic-70=0, test.topic-37=0, 
> test.topic-12=0, test.topic-95=0, test.topic-62=0, test.topic-29=0, 
> test.topic-4=0, test.topic-88=0, test.topic-55=0, test.topic-22=0, 
> test.topic-80=0, test.topic-47=0, test.topic-14=0, test.topic-71=0, 
> test.topic-38=0, test.topic-5=0, test.topic-96=0, test.topic-63=0, 
> test.topic-30=0, test.topic-56=0, test.topic-23=0, test.topic-89=0, 
> test.topic-48=0, test.topic-15=0, test.topic-81=0, test.topic-72=0, 
> test.topic-39=0, test.topic-6=0, test.topic-64=0, test.topic-31=0, 
> test.topic-97=0, test.topic-24=0, test.topic-90=0, test.topic-57=0, 
> test.topic-16=0, test.topic-82=0, test.topic-49=0, test.topic-40=0, 
> test.topic-7=0, test.topic-73=0, test.topic-32=0, test.topic-98=0, 
> test.topic-65=0, test.topic-91=0, test.topic-58=0, test.topic-25=0, 
> test.topic-83=0, test.topic-50=0, test.topic-17=0, test.topic-8=0, 
> test.topic-74=0, test.topic-41=0, test.topic-0=0, test.topic-99=0, 
> test.topic-66=0, test.topic-33=0, test.topic-92=0, test.topic-59=0, 
> test.topic-26=0, test.topic-84=0, test.topic-51=0, test.topic-18=0, 
> test.topic-75=0, test.topic-42=0, test.topic-9=0, test.topic-67=0, 
> test.topic-34=0, test.topic-1=0, test.topic-85=0, test.topic-60=0, 
> test.topic-27=0, test.topic-77=0, test.topic-52=0, test.topic-19=0, 
> test.topic-76=0, test.topic-43=0, test.topic-10=0, test.topic-93=0, 
> test.topic-68=0, test.topic-35=0, test.topic-2=0, test.topic-86=0, 
> test.topic-53=0, test.topic-28=0, test.topic-78=0, test.topic-45=0, 
> test.topic-20=0, test.topic-69=0, test.topic-44=0, test.topic-11=0, 
> test.topic-94=0, test.topic-61=0, test.topic-36=0, test.topic-3=0}
> {code}
> Output of using 
> `org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes`:
> {code:java}
> {test.topic-87=(timestamp=1511264434285, offset=289), 
> test.topic-54=(timestamp=1511265134993, offset=45420), 
> test.topic-21=(timestamp=1511265534207, offset=63643), 
> test.topic-79=(timestamp=1511270338275, offset=380750), 
> test.topic-46=(timestamp=1511266883588, offset=266379), 
> test.topic-13=(timestamp=1511265900538, offset=98512), 
> test.topic-70=(timestamp=1511266972452, offset=118522), 
> test.topic-37=(timestamp=1511264396370, offset=763), 
> test.topic-12=(timestamp=1511265504886, offset=61108), 
> test.topic-95=(timestamp=1511289492800, offset=847647), 
> test.topic-62=(timestamp=1511265831298, offset=68299), 
> test.topic-29=(timestamp=1511278767417, offset=548361), 
> test.topic-4=(timestamp=1511269316679, offset=144855), 
> test.topic-88=(timestamp=1511265608468, offset=107831), 
> test.topic-55=(timestamp=1511267449288, offset=129241), 
> test.topic-22=(timestamp=1511283134114, offset=563095), 
> test.topic-80=(timestamp=1511277334877, offset=534859), 
> test.topic-47=(timestamp=1511265530689, offset=71608), 
> test.topic-14=(timestamp=1511266308829, offset=80962), 
> test.topic-71=(timestamp=1511265474740, offset=83607), 
> test.topic-38=(timestamp=1511268268259, offset=166460), 
> test.topic-5=(timestamp=1511276243850, offset=294307), 
> test.topic-96=(timestamp=1511276749138, offset=483237), 
> test.to

[jira] [Updated] (KAFKA-3955) Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to failed broker boot

2019-02-12 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-3955:
---
Fix Version/s: (was: 2.2.0)

> Kafka log recovery doesn't truncate logs on non-monotonic offsets, leading to 
> failed broker boot
> 
>
> Key: KAFKA-3955
> URL: https://issues.apache.org/jira/browse/KAFKA-3955
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0, 0.10.1.1, 0.11.0.0, 1.0.0
>Reporter: Tom Crayford
>Assignee: Dhruvil Shah
>Priority: Critical
>  Labels: reliability
>
> Hi,
> I've found a bug impacting kafka brokers on startup after an unclean 
> shutdown. If a log segment is corrupt and has non-monotonic offsets (see the 
> appendix of this bug for a sample output from {{DumpLogSegments}}), then 
> {{LogSegment.recover}} throws an {{InvalidOffsetException}} error: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/OffsetIndex.scala#L218
> That code is called by {{LogSegment.recover}}: 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogSegment.scala#L191
> Which is called in several places in {{Log.scala}}. Notably it's called four 
> times during recovery:
> Thrice in Log.loadSegments
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L199
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L204
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L226
> and once in Log.recoverLog
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L268
> Of these, only the very last one has a {{catch}} for 
> {{InvalidOffsetException}}. When that catches the issue, it truncates the 
> whole log (not just this segment): 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L274
>  to the start segment of the bad log segment.
> However, this code can't be hit on recovery, because of the code paths in 
> {{loadSegments}} - they mean we'll never hit truncation here, as we always 
> throw this exception and that goes all the way to the toplevel exception 
> handler and crashes the JVM.
> As {{Log.recoverLog}} is always called during recovery, I *think* a fix for 
> this is to move this crash recovery/truncate code inside a new method in 
> {{Log.scala}}, and call that instead of {{LogSegment.recover}} in each place. 
> That code should return the number of {{truncatedBytes}} like we do in 
> {{Log.recoverLog}} and then truncate the log. The callers will have to be 
> notified "stop iterating over files in the directory", likely via a return 
> value of {{truncatedBytes}} like {{Log.recoverLog` does right now.
> I'm happy working on a patch for this. I'm aware this recovery code is tricky 
> and important to get right.
> I'm also curious (and currently don't have good theories as of yet) as to how 
> this log segment got into this state with non-monotonic offsets. This segment 
> is using gzip compression, and is under 0.9.0.1. The same bug with respect to 
> recovery exists in trunk, but I'm unsure if the new handling around 
> compressed messages (KIP-31) means the bug where non-monotonic offsets get 
> appended is still present in trunk.
> As a production workaround, one can manually truncate that log folder 
> yourself (delete all .index/.log files including and after the one with the 
> bad offset). However, kafka should (and can) handle this case well - with 
> replication we can truncate in broker startup.
> stacktrace and error message:
> {code}
> pri=WARN  t=pool-3-thread-4 at=Log Found a corrupted index file, 
> /$DIRECTORY/$TOPIC-22/14306536.index, deleting and rebuilding 
> index...
> pri=ERROR t=main at=LogManager There was an error in one of the threads 
> during logs loading: kafka.common.InvalidOffsetException: Attempt to append 
> an offset (15000337) to position 111719 no larger than the last offset 
> appended (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> pri=FATAL t=main at=KafkaServer Fatal error during KafkaServer startup. 
> Prepare to shutdown kafka.common.InvalidOffsetException: Attempt to append an 
> offset (15000337) to position 111719 no larger than the last offset appended 
> (15000337) to /$DIRECTORY/$TOPIC-8/14008931.index.
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
> at kafk

[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Kartik (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik updated KAFKA-7794:
--
Attachment: image-2019-02-13-11-43-28-873.png

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, 
> image-2019-02-13-11-43-28-873.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Kartik (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766828#comment-16766828
 ] 

Kartik commented on KAFKA-7794:
---

[~audhumla]

I tried your steps and for me, it's working as expected.
 # Created a new topic with 10 partitions and replication factor = 1
 # Inserted 5000 rows !image-2019-02-13-11-43-28-873.png!
 # Consumed all the messages !image-2019-02-13-11-44-18-736.png!
 # Now if I give 10th timestamp value I get the offset properly. 
!image-2019-02-13-11-45-21-459.png!

 

Can you tell me which version you are testing? Looks like in new version the 
issue is fixed.

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, 
> image-2019-02-13-11-43-28-873.png, image-2019-02-13-11-44-18-736.png, 
> image-2019-02-13-11-45-21-459.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Kartik (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik updated KAFKA-7794:
--
Attachment: image-2019-02-13-11-45-21-459.png

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, 
> image-2019-02-13-11-43-28-873.png, image-2019-02-13-11-44-18-736.png, 
> image-2019-02-13-11-45-21-459.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Kartik (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik updated KAFKA-7794:
--
Attachment: image-2019-02-13-11-43-24-128.png

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, 
> image-2019-02-13-11-43-28-873.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7794) kafka.tools.GetOffsetShell does not return the offset in some cases

2019-02-12 Thread Kartik (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik updated KAFKA-7794:
--
Attachment: image-2019-02-13-11-44-18-736.png

> kafka.tools.GetOffsetShell does not return the offset in some cases
> ---
>
> Key: KAFKA-7794
> URL: https://issues.apache.org/jira/browse/KAFKA-7794
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: Daniele Ascione
>Assignee: Kartik
>Priority: Critical
>  Labels: Kafka, ShellCommands, kafka-0.10, offset, shell, 
> shell-script, shellscript, tools, usability
> Attachments: image-2019-02-11-20-51-07-805.png, 
> image-2019-02-11-20-56-13-362.png, image-2019-02-11-20-57-03-579.png, 
> image-2019-02-12-16-19-25-170.png, image-2019-02-12-16-21-13-126.png, 
> image-2019-02-12-16-23-38-399.png, image-2019-02-13-11-43-24-128.png, 
> image-2019-02-13-11-43-28-873.png, image-2019-02-13-11-44-18-736.png
>
>
> For some input for the timestamps (different from -1 or -2) the GetOffset is 
> not able to retrieve the offset.
> For example, if _x_ is the timestamp in that "not working range", and you 
> execute:
> {code:java}
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time x
> {code}
> The output is:
> {code:java}
> MY_TOPIC:8:
> MY_TOPIC:2:
> MY_TOPIC:5:
> MY_TOPIC:4:
> MY_TOPIC:7:
> MY_TOPIC:1:
> MY_TOPIC:9:{code}
> while after the last ":" an integer representing the offset is expected.
> 
> Steps to reproduce it:
>  # Consume all the messages from the beginning and print the timestamp:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true  > 
> messages{code}
>  # Sort the messages by timestamp and get some of the oldest messages:
> {code:java}
>  awk -F "CreateTime:" '{ print $2}' messages | sort -n > msg_sorted{code}
>  # Take (for example) the timestamp of the 10th oldest message, and see if 
> GetOffsetShell is not able to print the offset:
> {code:java}
> timestamp="$(sed '10q;d' msg_sorted | cut -f1)"
> bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --time $timestamp
> # The output should be something like:
> # MY_TOPIC:1:
> # MY_TOPIC:2:
> (repeated for every partition){code}
>  # Verify that the message with that timestamp is still in Kafka:
> {code:java}
> bin/kafka-simple-consumer-shell.sh --no-wait-at-logend --broker-list 
> $KAFKA_ADDRESS --topic $MY_TOPIC --property print.timestamp=true | grep 
> "CreateTime:$timestamp" {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-02-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16766843#comment-16766843
 ] 

ASF GitHub Bot commented on KAFKA-7652:
---

guozhangwang commented on pull request #6191: KAFKA-7652: Part III; Put to 
underlying before Flush
URL: https://github.com/apache/kafka/pull/6191
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Priority: Major
> Attachments: kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2019-02-12 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7652.
--
   Resolution: Fixed
 Assignee: Guozhang Wang
Fix Version/s: 2.2.0

I think this issue should have been fixed with the three PRs listed above. 
[~jonathanpdx] would appreciate if you can try it out and validate if it is 
true.

> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.2.0
>
> Attachments: kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)