Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

2020-05-28 Thread Pushkar Deole
Thanks for the help Guozhang!
however i realized that the exception and actual problem is totally
different. The problem was the client was not set with SSL truststore while
server is SSLenabled.
I also found this open bug on kafka
https://issues.apache.org/jira/browse/KAFKA-4493
After setting the SSL properties on stream, I am able to get it up and
running.

@kafka developers, I think the problem is very misleading and should be
fixed as soon as possible, or a proper exception should be thrown.

On Thu, May 28, 2020 at 9:46 AM Guozhang Wang  wrote:

> Hello Pushkar,
>
> I think the memory pressure may not come from the topic data consumption,
> but from rocksDB used for materializing the global table. Note rocksDB
> allocates large chunk of memory beforehand in mem-table / page cache /
> reader cache with default configs. You can get some detailed information
> from this KIP:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB
>
>
> Guozhang
>
>
> On Wed, May 27, 2020 at 8:44 PM Pushkar Deole 
> wrote:
>
> > Hello All,
> >
> > I am using Stream DSL API just to create a GlobalKTable backed by a
> topic.
> > The topology is simple, just create a global table from a topic and
> that's
> > it (pasted below code snippet), when I run this service on K8S cluster
> > (container in a pod), the service gets OutOfMemoryError during
> > kafkaStreams.start() method call (exception trace pasted below). Note
> that
> > the topic is newly created so there is no data in the topic. POD memory
> was
> > set initially to 500MiB which I doubled to 1000MiB but no luck.
> > kafka-streams and kafka-clients jar at 2.3.1 version. Broker might be a
> > version ahead I think 2.4 but that should not be an issue. Any help would
> > be appreciated since I am blocked at this point.
> >
> > Properties props = new Properties();
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> > StreamsBuilder streamsBuilder = new StreamsBuilder();
> > GlobalKTable> groupCacheTable =
> > streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> > Materialized.as(GROUP_CACHE_STORE_NAME));
> > Topology groupCacheTopology = streamsBuilder.build();
> > kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> > kafkaStreams.start();
> >
> > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> > LOG.info("Stopping the stream");
> > kafkaStreams.close();
> > }));
> >
> >
> >
> {"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
> > [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State transition
> from
> > CREATED to
> >
> >
> REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":2}
> >
> >
> {"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
> > exception in thread 'kafka-admin-client-thread |
> >
> >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
> > |
> >
> >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":4,"stack_trace":"java.lang.OutOfMemoryError:
> > Java heap space\n\tat java.base/java.nio.HeapByteBuffer.(Unknown
> > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> Source)\n\tat
> >
> >
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> >
> >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> >
> >
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> >
> >
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> >
> >
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> >
> >
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> >
> >
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
> > java.base/java.lang.Thread.run(Unknown Source)\n"}
> >
> >
> {"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
> > exception in thread 'kafka-producer-network-thread |
> >
> >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-producer-network-thread
> > |
> >
> >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer","level":"ERROR","level_value":4,"stack_trace":"java.lang.OutOfMemoryError:
> > Java heap space\n\tat java.base/java.nio.HeapByteBuffer.(Un

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-28 Thread Pushkar Deole
Matthias,

I realized that the exception and actual problem is totally different. The
problem was the client was not set with SSL truststore while server is
SSLenabled.
I also found this open bug on kafka
https://issues.apache.org/jira/browse/KAFKA-4493
After setting the SSL properties on stream, I am able to get it up and
running.

Due to above problem, it is very difficult to debug the issue and above bug
can be fixed as soon as possible, or a proper exception should be thrown.

On Wed, May 27, 2020 at 10:59 PM Pushkar Deole  wrote:

> Thanks... i will try increasing the memory in case you don't spot anything
> wrong with the code. Other service also have streams and global k table but
> they use spring-kafka, but i think that should not matter, and it should
> work with normal kafka-streams code unless i am missing some
> configuration/setting here
>
> On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax  wrote:
>
>> There is no hook. Only a restore listener, but this one is only used
>> during startup when the global store is loaded. It's not sure during
>> regular processing.
>>
>> Depending on your usage, maybe you can switch to a global store instead
>> of GlobalKTable? That way, you can implement a custom `Processor` and
>> add a hook manually?
>>
>> I don't see anything wrong with your setup. Unclear if/why the global
>> store would require a lot of memory...
>>
>>
>> -Matthias
>>
>> On 5/27/20 7:41 AM, Pushkar Deole wrote:
>> > Matthias,
>> > I tried with default store as well but getting same error, can you
>> please
>> > check if I am initializing the global store in the right way:
>> >
>> > public void setupGlobalCacheTables(String theKafkaServers) {
>> > Properties props = new Properties();
>> > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> DEFAULT_APPLICATION_ID);
>> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
>> > StreamsBuilder streamsBuilder = new StreamsBuilder();
>> > groupCacheTable =
>> > streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
>> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
>> > Materialized.as(GROUP_CACHE_STORE_NAME));
>> > Topology groupCacheTopology = streamsBuilder.build();
>> >  kafkaStreams = new KafkaStreams(groupCacheTopology, props);
>> >   kafkaStreams.start();
>> >
>> > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>> > LOG.info("Stopping the stream");
>> > kafkaStreams.close();
>> > }));
>> > }
>> >
>> > On Wed, May 27, 2020 at 5:06 PM Pushkar Deole 
>> wrote:
>> >
>> >> Hi Matthias,
>> >>
>> >> By the way, I used the in-memory global store and the service is giving
>> >> out of memory error during startup. Unfortunately i don't have a stack
>> >> trace now but when i got stack the first time, the error was coming
>> >> somewhere from memorypool.allocate or similar kind of method. If i get
>> the
>> >> stack trace again, I will share that with you.
>> >> However, the topic from where the store is reading from is empty so I
>> am
>> >> not sure why the global k table is trying to occupy a lot of space.
>> The POD
>> >> memory request and limits are 500 MiB and 750 MiB respectively so the
>> state
>> >> store should fit into this memory I believe since topic is empty. Can
>> you
>> >> provide inputs on this.
>> >>
>> >>
>> >> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole 
>> >> wrote:
>> >>
>> >>> Ok... got it... is there any hook that I can attach to the global k
>> table
>> >>> or global store? What I mean here is I want to know when the global
>> store
>> >>> is updated with data from topic in that case the hook that I specified
>> >>> should be invoked so i can do some activity like logging that, this
>> will
>> >>> allow me to know how long the global store took to sync up with topic
>> after
>> >>> the event has been put on the topic.
>> >>>
>> >>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax 
>> >>> wrote:
>> >>>
>>  For example it could be some "static" information, like a mapping
>> from
>>  zip code to city name.
>> 
>>  Something that does usually not change over time.
>> 
>> 
>>  -Matthias
>> 
>>  On 5/25/20 9:55 PM, Pushkar Deole wrote:
>> > Matthias,
>> >
>> > I am wondering what you mean by "Global store hold "axially" data
>> that
>>  is
>> > provided from "outside" of the
>> > app"
>> >
>> > will you be able to give some example use case here as to what you
>>  mean by
>> > axially data provided from outside app?
>> >
>> > On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax 
>>  wrote:
>> >
>> >> Both stores sever a different purpose.
>> >>
>> >> Regular stores allow you to store state the application computes.
>> >> Writing into the changelog is a fault-tolerance mechanism.
>> >>
>> >> Global store hold "axially" data that is provided from "outside" of
>>  the
>> >> app. There is no changelog topic, but only the input topic (that i

Re: How to manually start ingesting in kafka source connector ?

2020-05-28 Thread Robin Moffatt
When you create the connector, it will start.


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Thu, 28 May 2020 at 04:12, Yu Watanabe  wrote:

> Dear community .
>
> I would like to ask question related to source connector in kafka
> connect (2.4.0) .
>
> Is there a way to manually start source connector after registering to
> kafka connect ?
>
> Looking at the document , I found PAUSE API ,
>
>
> https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-pause
>
> however, could not find set initial state for individual tasks in
> connector properties ..
>
> https://docs.confluent.io/current/connect/managing/configuring.html
>
> I appreciate if I could get some help.
>
> Best Regards,
> Yu Watanabe
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis
>


Re: How to manually start ingesting in kafka source connector ?

2020-05-28 Thread Yu Watanabe
Robin

Thank you for the reply.

Any way to not automatically start after creating connector ?

I am trying to find a way to change connector offset  as described in
below link before starting connector ..

https://www.confluent.jp/blog/kafka-connect-deep-dive-jdbc-source-connector/#starting-table-capture

Steps I want to do will be

1. Create jdbc connector
2. Change connector offset
3. Start connector

Thanks,
Yu

On Thu, May 28, 2020 at 6:01 PM Robin Moffatt  wrote:
>
> When you create the connector, it will start.
>
>
> --
>
> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>
>
> On Thu, 28 May 2020 at 04:12, Yu Watanabe  wrote:
>
> > Dear community .
> >
> > I would like to ask question related to source connector in kafka
> > connect (2.4.0) .
> >
> > Is there a way to manually start source connector after registering to
> > kafka connect ?
> >
> > Looking at the document , I found PAUSE API ,
> >
> >
> > https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-pause
> >
> > however, could not find set initial state for individual tasks in
> > connector properties ..
> >
> > https://docs.confluent.io/current/connect/managing/configuring.html
> >
> > I appreciate if I could get some help.
> >
> > Best Regards,
> > Yu Watanabe
> >
> > --
> > Yu Watanabe
> >
> > linkedin: www.linkedin.com/in/yuwatanabe1/
> > twitter:   twitter.com/yuwtennis
> >



-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Clients may fetch incomplete set of topic partitions during cluster startup

2020-05-28 Thread Debraj Manna
Hi

Is the below issue fixed in latest Kafka 2.5?

https://issues.apache.org/jira/browse/KAFKA-8480

I am seeing this issue still open. So just confirming before upgrading
Kafka to the latest.

Thanks,


Repeated UnknownProducerIdException

2020-05-28 Thread Schmidt-Dumont Georg (BCI/ESW17)
Good morning,

Since a couple of days ago we suddenly have the issue in our Kafka Steams 
application that a UnknownProducerException occurs. Digging into this I came 
across 
KIP-360.
 We are using a centrally managed Kafka, hence we are not able to just update 
it to the latest version which contains the fix for the corresponding issues. 
The Kafka version we are using is 2.2.1.

I also found that a Workaround for Kafka versions pre 2.4 is to increase the 
retention time and the transactional.id.expiration.ms. However, the retention 
time for the topic for which the issue occurs already has a retention time of 
30 days. So from my point of view this cannot be the reason the meta data for 
the producer was deleted.

At this point I have two questions I hope someone can help me with.

First, might be the reason that the producer meta data was deleted other than 
all the data was removed from the topic because of the retention time and low 
traffic on the topic?

Second, the topic in question is used by a local state store. Since we 
currently only have a single instance of the Kafka Streams application running, 
I believe it would be safe to stop the application, flush the topic and then 
start the app again. The state would be loaded from the file system in this 
case and no data would be lost. Is this a valid assumption and would the flush 
of the topic fix the issue? Or would we need to recreate the topic?

Thanks!

Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候

Mr. Georg Schmidt-Dumont
Bosch Connected Industry – BCI/ESW17
Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | 
www.bosch.com
Phone +49 711 811-49893  | 
georg.schmidt-dum...@bosch.com

Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
Denner,
Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
Markus Heyn, Dr. Dirk Hoheisel,
Christoph Kübel, Uwe Raschke, Peter Tyroller



Re: How to manually start ingesting in kafka source connector ?

2020-05-28 Thread Robin Moffatt
You could look at
https://rmoff.net/2019/08/15/reset-kafka-connect-source-connector-offsets/ and
experiment with creating the connector elsewhere to see if you can pre-empt
the key value that Kafka Connect will use when writing the offsets, and so
do your list 2 - 1 - 3 instead


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Thu, 28 May 2020 at 10:12, Yu Watanabe  wrote:

> Robin
>
> Thank you for the reply.
>
> Any way to not automatically start after creating connector ?
>
> I am trying to find a way to change connector offset  as described in
> below link before starting connector ..
>
>
> https://www.confluent.jp/blog/kafka-connect-deep-dive-jdbc-source-connector/#starting-table-capture
>
> Steps I want to do will be
>
> 1. Create jdbc connector
> 2. Change connector offset
> 3. Start connector
>
> Thanks,
> Yu
>
> On Thu, May 28, 2020 at 6:01 PM Robin Moffatt  wrote:
> >
> > When you create the connector, it will start.
> >
> >
> > --
> >
> > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
> >
> >
> > On Thu, 28 May 2020 at 04:12, Yu Watanabe  wrote:
> >
> > > Dear community .
> > >
> > > I would like to ask question related to source connector in kafka
> > > connect (2.4.0) .
> > >
> > > Is there a way to manually start source connector after registering to
> > > kafka connect ?
> > >
> > > Looking at the document , I found PAUSE API ,
> > >
> > >
> > >
> https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-pause
> > >
> > > however, could not find set initial state for individual tasks in
> > > connector properties ..
> > >
> > > https://docs.confluent.io/current/connect/managing/configuring.html
> > >
> > > I appreciate if I could get some help.
> > >
> > > Best Regards,
> > > Yu Watanabe
> > >
> > > --
> > > Yu Watanabe
> > >
> > > linkedin: www.linkedin.com/in/yuwatanabe1/
> > > twitter:   twitter.com/yuwtennis
> > >
>
>
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis
>


Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

2020-05-28 Thread John Roesler
Woah, that's a nasty bug. I've just pinged the Jira ticket. Please feel free to
do the same.

Thanks,
-John

On Thu, May 28, 2020, at 02:55, Pushkar Deole wrote:
> Thanks for the help Guozhang!
> however i realized that the exception and actual problem is totally
> different. The problem was the client was not set with SSL truststore while
> server is SSLenabled.
> I also found this open bug on kafka
> https://issues.apache.org/jira/browse/KAFKA-4493
> After setting the SSL properties on stream, I am able to get it up and
> running.
> 
> @kafka developers, I think the problem is very misleading and should be
> fixed as soon as possible, or a proper exception should be thrown.
> 
> On Thu, May 28, 2020 at 9:46 AM Guozhang Wang  wrote:
> 
> > Hello Pushkar,
> >
> > I think the memory pressure may not come from the topic data consumption,
> > but from rocksDB used for materializing the global table. Note rocksDB
> > allocates large chunk of memory beforehand in mem-table / page cache /
> > reader cache with default configs. You can get some detailed information
> > from this KIP:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB
> >
> >
> > Guozhang
> >
> >
> > On Wed, May 27, 2020 at 8:44 PM Pushkar Deole 
> > wrote:
> >
> > > Hello All,
> > >
> > > I am using Stream DSL API just to create a GlobalKTable backed by a
> > topic.
> > > The topology is simple, just create a global table from a topic and
> > that's
> > > it (pasted below code snippet), when I run this service on K8S cluster
> > > (container in a pod), the service gets OutOfMemoryError during
> > > kafkaStreams.start() method call (exception trace pasted below). Note
> > that
> > > the topic is newly created so there is no data in the topic. POD memory
> > was
> > > set initially to 500MiB which I doubled to 1000MiB but no luck.
> > > kafka-streams and kafka-clients jar at 2.3.1 version. Broker might be a
> > > version ahead I think 2.4 but that should not be an issue. Any help would
> > > be appreciated since I am blocked at this point.
> > >
> > > Properties props = new Properties();
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
> > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> > > StreamsBuilder streamsBuilder = new StreamsBuilder();
> > > GlobalKTable> groupCacheTable =
> > > streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> > > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> > > Materialized.as(GROUP_CACHE_STORE_NAME));
> > > Topology groupCacheTopology = streamsBuilder.build();
> > > kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> > > kafkaStreams.start();
> > >
> > > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> > > LOG.info("Stopping the stream");
> > > kafkaStreams.close();
> > > }));
> > >
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
> > > [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State transition
> > from
> > > CREATED to
> > >
> > >
> > REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":2}
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
> > > exception in thread 'kafka-admin-client-thread |
> > >
> > >
> > DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
> > > |
> > >
> > >
> > DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":4,"stack_trace":"java.lang.OutOfMemoryError:
> > > Java heap space\n\tat java.base/java.nio.HeapByteBuffer.(Unknown
> > > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> > Source)\n\tat
> > >
> > >
> > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> > >
> > >
> > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > > org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> > >
> > >
> > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
> > > java.base/java.lang.Thread.run(Unknown Source)\n"}
> > >
> > >
> > {"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
> > > exception in thread 'kafka-producer-network-thread |
> > >
> > >
> > DsiApplication-3d

Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

2020-05-28 Thread Guozhang Wang
Thanks for the update Pushkar! I'd have to say it is indeed very very
misleading error message and we should fix it asap. Will follow-up on the
ticket.

Guozhang

On Thu, May 28, 2020 at 9:17 AM John Roesler  wrote:

> Woah, that's a nasty bug. I've just pinged the Jira ticket. Please feel
> free to
> do the same.
>
> Thanks,
> -John
>
> On Thu, May 28, 2020, at 02:55, Pushkar Deole wrote:
> > Thanks for the help Guozhang!
> > however i realized that the exception and actual problem is totally
> > different. The problem was the client was not set with SSL truststore
> while
> > server is SSLenabled.
> > I also found this open bug on kafka
> > https://issues.apache.org/jira/browse/KAFKA-4493
> > After setting the SSL properties on stream, I am able to get it up and
> > running.
> >
> > @kafka developers, I think the problem is very misleading and should be
> > fixed as soon as possible, or a proper exception should be thrown.
> >
> > On Thu, May 28, 2020 at 9:46 AM Guozhang Wang 
> wrote:
> >
> > > Hello Pushkar,
> > >
> > > I think the memory pressure may not come from the topic data
> consumption,
> > > but from rocksDB used for materializing the global table. Note rocksDB
> > > allocates large chunk of memory beforehand in mem-table / page cache /
> > > reader cache with default configs. You can get some detailed
> information
> > > from this KIP:
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, May 27, 2020 at 8:44 PM Pushkar Deole 
> > > wrote:
> > >
> > > > Hello All,
> > > >
> > > > I am using Stream DSL API just to create a GlobalKTable backed by a
> > > topic.
> > > > The topology is simple, just create a global table from a topic and
> > > that's
> > > > it (pasted below code snippet), when I run this service on K8S
> cluster
> > > > (container in a pod), the service gets OutOfMemoryError during
> > > > kafkaStreams.start() method call (exception trace pasted below). Note
> > > that
> > > > the topic is newly created so there is no data in the topic. POD
> memory
> > > was
> > > > set initially to 500MiB which I doubled to 1000MiB but no luck.
> > > > kafka-streams and kafka-clients jar at 2.3.1 version. Broker might
> be a
> > > > version ahead I think 2.4 but that should not be an issue. Any help
> would
> > > > be appreciated since I am blocked at this point.
> > > >
> > > > Properties props = new Properties();
> > > > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> DEFAULT_APPLICATION_ID);
> > > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> > > > StreamsBuilder streamsBuilder = new StreamsBuilder();
> > > > GlobalKTable> groupCacheTable =
> > > > streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> > > > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> > > > Materialized.as(GROUP_CACHE_STORE_NAME));
> > > > Topology groupCacheTopology = streamsBuilder.build();
> > > > kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> > > > kafkaStreams.start();
> > > >
> > > > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> > > > LOG.info("Stopping the stream");
> > > > kafkaStreams.close();
> > > > }));
> > > >
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
> > > > [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State
> transition
> > > from
> > > > CREATED to
> > > >
> > > >
> > >
> REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":2}
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
> > > > exception in thread 'kafka-admin-client-thread |
> > > >
> > > >
> > >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
> > > > |
> > > >
> > > >
> > >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":4,"stack_trace":"java.lang.OutOfMemoryError:
> > > > Java heap space\n\tat
> java.base/java.nio.HeapByteBuffer.(Unknown
> > > > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> > > Source)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > > >
> org.apache.kafka.common.netwo

a few URP causes high latencies to producers and consumers

2020-05-28 Thread nitin agarwal
Hi,

We have noticed that where a URPs in the cluster causes increase in
producer and consumer latencies.
The cause of the URP was either one of the broker went down or Kafka
rebalancer was running.

Is it the expected scenario ?


Thanks,
Nitin


Re: Repeated UnknownProducerIdException

2020-05-28 Thread Matthias J. Sax
The issue with producer metadata often occurs for repartition topics.
Those are purged actively by Kafka Streams. It might help to increase
the segment size and maybe `segment.ms` config for those topics to
preserve a large history (the active segment is not purged).

By default, Kafka Streams creates those topics with smaller segment
sized to make data purging more efficient.

Not sure if your app has repartition topics though?

-Matthias

On 5/28/20 3:29 AM, Schmidt-Dumont Georg (BCI/ESW17) wrote:
> Good morning,
> 
> Since a couple of days ago we suddenly have the issue in our Kafka Steams 
> application that a UnknownProducerException occurs. Digging into this I came 
> across 
> KIP-360.
>  We are using a centrally managed Kafka, hence we are not able to just update 
> it to the latest version which contains the fix for the corresponding issues. 
> The Kafka version we are using is 2.2.1.
> 
> I also found that a Workaround for Kafka versions pre 2.4 is to increase the 
> retention time and the transactional.id.expiration.ms. However, the retention 
> time for the topic for which the issue occurs already has a retention time of 
> 30 days. So from my point of view this cannot be the reason the meta data for 
> the producer was deleted.
> 
> At this point I have two questions I hope someone can help me with.
> 
> First, might be the reason that the producer meta data was deleted other than 
> all the data was removed from the topic because of the retention time and low 
> traffic on the topic?
> 
> Second, the topic in question is used by a local state store. Since we 
> currently only have a single instance of the Kafka Streams application 
> running, I believe it would be safe to stop the application, flush the topic 
> and then start the app again. The state would be loaded from the file system 
> in this case and no data would be lost. Is this a valid assumption and would 
> the flush of the topic fix the issue? Or would we need to recreate the topic?
> 
> Thanks!
> 
> Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
> 
> Mr. Georg Schmidt-Dumont
> Bosch Connected Industry – BCI/ESW17
> Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | 
> www.bosch.com
> Phone +49 711 811-49893  | 
> georg.schmidt-dum...@bosch.com
> 
> Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
> Denner,
> Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
> Markus Heyn, Dr. Dirk Hoheisel,
> Christoph Kübel, Uwe Raschke, Peter Tyroller
> 



signature.asc
Description: OpenPGP digital signature


Kafka _Querys

2020-05-28 Thread Csk Raju
Hi Kafka team,

As of now we have successfully implemented kafka for our environment, We
stuck up with below questions so please provide assistance and help for
below questions.

1)  What are the distinct consumer group names currently consuming messages
from the same topic
2) Total number of messages consumed from topic by the given Consumer group
for the given time interval : from and to date time
3)  for the given consumer group and topic, How many new messages arrived
into topic from the previously committed offset position
(Example: Consumer application is down and admin wants to know how many new
messages arrived after that specific consuming app went down)
4) Explicitly move offset to a different position for the given topic and
consumer group
5) Replay messages from Failure topic to Replay topic
6) How do we monitor a topic – alert when a new message is arrived or a
threshold of 5 or 10 new messages arrived

Hoping your response gives me all clarifications on above questions:)

Thanks
Sudhir Raju
+919666432888


Re: Clients may fetch incomplete set of topic partitions during cluster startup

2020-05-28 Thread Debraj Manna
Anyone any update on my below query?

On Thu, 28 May 2020, 15:45 Debraj Manna,  wrote:

> Hi
>
> Is the below issue fixed in latest Kafka 2.5?
>
> https://issues.apache.org/jira/browse/KAFKA-8480
>
> I am seeing this issue still open. So just confirming before upgrading
> Kafka to the latest.
>
> Thanks,
>
>