Hi, 1) By default, Flink's Kafka connector is polling data from Kafka every 100ms. There's a configuration key "flink.poll-timeout" to change the frequency. I don't have experience with these internal log messages from Kafka, but since they are on INFO level (and if you don't see any unexpected data), I would ignore them for now.
2) The slots are not reserving memory. A slot is basically a thread running on the TaskManager. But you can't enforce the amount of memory available to a thread, thus all slots share the pool of available memory of the TaskManager. If you want to run multiple low throughput pipelines on Flink, it is not a problem to oversubscribe your TaskManagers. For a machine with say 8 cores and 16 Gb of memory, you could configure 100, or even 500 slots, if they are not very resource intensive. With StateFun, you can have millions of actors on a TaskManager. If they are not receiving any data, they won't allocate resources. Best, Robert On Tue, Dec 17, 2019 at 11:37 AM Andrés Garagiola <andresgaragi...@gmail.com> wrote: > Thanks Roberts, > > > About your questions, I don't have yet a real estimation regarding the > number of records received by the pipeline but I guess that the pipeline > could be idle for several minutes (I don't think that for hours). > > > My concern comes to me from two aspects: > > > 1) I saw multiple lines in the Flink task manager logs like the ones > listed below. Sounds like if the pipeline is doing polling over the Kafka > topic source, I don't know if I can control this behavior in some way to > reduce the CPU consumption when I can tolerate some latency. > > > *2019-12-17 05:25:56,720 INFO > org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer > clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example] > Resetting offset for partition test-topic-0 to offset 561.* > > *2019-12-17 05:25:56,720 INFO > org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer > clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example] > Seeking to LATEST offset of partition test-topic-0* > > *2019-12-17 05:25:56,721 INFO > org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer > clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example] > Resetting offset for partition test-topic-0 to offset 561.* > > *2019-12-17 05:25:56,721 INFO > org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer > clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example] > Seeking to LATEST offset of partition test-topic-0* > > *2019-12-17 05:25:56,722 INFO > org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer > clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example] > Resetting offset for partition test-topic-0 to offset 561.* > > > 2) I read that every slot reserves a portion of the task manager's memory, > so I would like to reuse that memory between multiple pipelines (again in > the context where some latency is allowed). I understand that this is not > possible in the current state of Flink but would be possible by avoiding > the direct map with Statefun, isn't it? > > > Thanks again for your reply. > > Regards > > On Tue, Dec 17, 2019 at 11:10 AM Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Andrés, >> >> sorry for the late reply. >> 1. The slots are released, when the streaming pipeline ends. In >> principle, it is not a problem when a slot is allocated, even when not >> processing any incoming messages. So you are not doing something wrong. How >> many records do you receive per pipeline? (are they idle for multiple >> hours?) >> There's a way to utilize the slots more efficiently: https://statefun.io/ >> Statefun >> will be contributed to Flink soon. >> StateFun doesn't have a direct slots to pipeline mapping. >> >> 2. The memory consumption per slot greatly depends on what kind of >> operator you are running in it. A heap statebackend might need a few >> gigabytes, a stateless mapper needs almost no memory. Some time ago, I >> wrote a blog post on sizing a Flink cluster: >> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines >> >> Best, >> Robert >> >> >> On Fri, Dec 13, 2019 at 5:06 PM Andrés Garagiola < >> andresgaragi...@gmail.com> wrote: >> >>> Hi >>> >>> I'm testing Flink to do stream processing, in my use case there are >>> multiples pipelines processing messages from multiple Kafka sources. I have >>> some questions regarding the jobs and slots. >>> >>> 1) When I deploy a new job, it takes a job slot in the TM, the job never >>> ends (I think it doesn't end because is a stream pipeline), and the slot is >>> never released, this means that the slot is busy even when no new messages >>> are coming from the Kafka topic. Is that OK or I'm doing something wrong? >>> Is there a way to do a more efficient utilization of the job slots? >>> >>> 2) In my use case, I need good job scalability. Potentially I could have >>> many pipelines running in the Flink environment, but on the other hand, >>> increase latency would not be a serious problem for me. There are some >>> recommendations regarding memory for slot? I saw that the CPU >>> recommendation is a core per slot, taking into account that increase the >>> latency would not be a big problem, do you see another good reason to >>> follow this recommendation? >>> >>> Thank you >>> Regards >>> >>