Re: NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

2020-05-27 Thread Guozhang Wang
Hello Pushkar, I think the memory pressure may not come from the topic data consumption, but from rocksDB used for materializing the global table. Note rocksDB allocates large chunk of memory beforehand in mem-table / page cache / reader cache with default configs. You can get some detailed inform

NEED HELP : OutOfMemoryError: Java heap space error while starting KafkaStream with a simple topology

2020-05-27 Thread Pushkar Deole
Hello All, I am using Stream DSL API just to create a GlobalKTable backed by a topic. The topology is simple, just create a global table from a topic and that's it (pasted below code snippet), when I run this service on K8S cluster (container in a pod), the service gets OutOfMemoryError during kaf

How to manually start ingesting in kafka source connector ?

2020-05-27 Thread Yu Watanabe
Dear community . I would like to ask question related to source connector in kafka connect (2.4.0) . Is there a way to manually start source connector after registering to kafka connect ? Looking at the document , I found PAUSE API , https://docs.confluent.io/current/connect/references/restapi.

Re: Kafka Timeout Exception

2020-05-27 Thread Matthias J. Sax
Correct. The error implies that sending was not successful. You can retry sending by calling `Producer#send()` again. If you also increase the corresponding timeouts (`delivery.timeout.ms`). -Matthias On 5/26/20 10:27 AM, JOHN, BIBIN wrote: > > Team, > Could you please help on my below query?

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-27 Thread Pushkar Deole
Thanks... i will try increasing the memory in case you don't spot anything wrong with the code. Other service also have streams and global k table but they use spring-kafka, but i think that should not matter, and it should work with normal kafka-streams code unless i am missing some configuration/

Re: Request for adding to contributors list

2020-05-27 Thread Guruprasad Tahasildar
Sent previous mail w/o seeing this :| Thanks for looking up my userID and adding to the list! -Guru On 2020/05/27 16:51:41, "Matthias J. Sax" wrote: > Seems Guru create the ticket. > > Added user `tguruprasad` to the list of contributors. You can know > self-assign tickets. > > > -Matthias >

Re: Request for adding to contributors list

2020-05-27 Thread Guruprasad Tahasildar
My bad, here is my Jira ID: tguruprasad Thanks, On 2020/05/27 06:41:02, Luke Chen wrote: > Hi Guruprasad, > What Matthias was asking, is what your JIRA account username is? > So that he or other committer can help you grant you the permission to > assign JIRA tickets. > If you haven't got the J

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-27 Thread Matthias J. Sax
There is no hook. Only a restore listener, but this one is only used during startup when the global store is loaded. It's not sure during regular processing. Depending on your usage, maybe you can switch to a global store instead of GlobalKTable? That way, you can implement a custom `Processor` an

Re: Request for adding to contributors list

2020-05-27 Thread Matthias J. Sax
Seems Guru create the ticket. Added user `tguruprasad` to the list of contributors. You can know self-assign tickets. -Matthias On 5/26/20 11:41 PM, Luke Chen wrote: > Hi Guruprasad, > What Matthias was asking, is what your JIRA account username is? > So that he or other committer can help you

Re: Kafka retention policy per topic not working as expected

2020-05-27 Thread Don Ky
Thanks Christopher, Hrm, I checked the logs and retention.ms has been enabled somewhere around October last year. {"log":"[2019-10-20 03:01:41,818] INFO Processing override for entityPath: topics/ETD-TEST with config: Map(retention.ms -\u003e 121000) (kafka.server.DynamicConfigManager)\n","st

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-27 Thread Pushkar Deole
Matthias, I tried with default store as well but getting same error, can you please check if I am initializing the global store in the right way: public void setupGlobalCacheTables(String theKafkaServers) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG,

Re: Broker side partition round robin

2020-05-27 Thread Vinicius Scheidegger
Does anyone know whether we could really have an "out of the box" solution to do round robin over the partitions when we have multiple producers? By that I mean, a round robin on the broker side (or maybe some way to synchronize all producers). Thank you, On Tue, May 26, 2020 at 1:41 PM Vinicius

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-27 Thread Pushkar Deole
Hi Matthias, By the way, I used the in-memory global store and the service is giving out of memory error during startup. Unfortunately i don't have a stack trace now but when i got stack the first time, the error was coming somewhere from memorypool.allocate or similar kind of method. If i get the

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-27 Thread Pushkar Deole
Ok... got it... is there any hook that I can attach to the global k table or global store? What I mean here is I want to know when the global store is updated with data from topic in that case the hook that I specified should be invoked so i can do some activity like logging that, this will allow m