Hi,

1) By default, Flink's Kafka connector is polling data from Kafka every
100ms. There's a configuration key "flink.poll-timeout" to change the
frequency. I don't have experience with these internal log messages from
Kafka, but since they are on INFO level (and if you don't see any
unexpected data), I would ignore them for now.

2) The slots are not reserving memory. A slot is basically a thread running
on the TaskManager. But you can't enforce the amount of memory available to
a thread, thus all slots share the pool of available memory of the
TaskManager.
If you want to run multiple low throughput pipelines on Flink, it is not a
problem to oversubscribe your TaskManagers. For a machine with say 8 cores
and 16 Gb of memory, you could configure 100, or even 500 slots, if they
are not very resource intensive.

With StateFun, you can have millions of actors on a TaskManager. If they
are not receiving any data, they won't allocate resources.

Best,
Robert


On Tue, Dec 17, 2019 at 11:37 AM Andrés Garagiola <andresgaragi...@gmail.com>
wrote:

> Thanks Roberts,
>
>
> About your questions, I don't have yet a real estimation regarding the
> number of records received by the pipeline but I guess that the pipeline
> could be idle for several minutes (I don't think that for hours).
>
>
> My concern comes to me from two aspects:
>
>
> 1) I saw multiple lines in the Flink task manager logs like the ones
> listed below. Sounds like if the pipeline is doing polling over the Kafka
> topic source, I don't know if I can control this behavior in some way to
> reduce the CPU consumption when I can tolerate some latency.
>
>
> *2019-12-17 05:25:56,720 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Resetting offset for partition test-topic-0 to offset 561.*
>
> *2019-12-17 05:25:56,720 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Seeking to LATEST offset of partition test-topic-0*
>
> *2019-12-17 05:25:56,721 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Resetting offset for partition test-topic-0 to offset 561.*
>
> *2019-12-17 05:25:56,721 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Seeking to LATEST offset of partition test-topic-0*
>
> *2019-12-17 05:25:56,722 INFO
> org.apache.kafka.clients.consumer.internals.SubscriptionState - [Consumer
> clientId=consumer-2, groupId=Reader-0_offset_consumer_2088979656_example]
> Resetting offset for partition test-topic-0 to offset 561.*
>
>
> 2) I read that every slot reserves a portion of the task manager's memory,
> so I would like to reuse that memory between multiple pipelines (again in
> the context where some latency is allowed). I understand that this is not
> possible in the current state of Flink but would be possible by avoiding
> the direct map with Statefun, isn't it?
>
>
> Thanks again for your reply.
>
> Regards
>
> On Tue, Dec 17, 2019 at 11:10 AM Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Andrés,
>>
>> sorry for the late reply.
>> 1. The slots are released, when the streaming pipeline ends. In
>> principle, it is not a problem when a slot is allocated, even when not
>> processing any incoming messages. So you are not doing something wrong. How
>> many records do you receive per pipeline? (are they idle for multiple
>> hours?)
>> There's a way to utilize the slots more efficiently: https://statefun.io/ 
>> Statefun
>> will be contributed to Flink soon.
>> StateFun doesn't have a direct slots to pipeline mapping.
>>
>> 2. The memory consumption per slot greatly depends on what kind of
>> operator you are running in it. A heap statebackend might need a few
>> gigabytes, a stateless mapper needs almost no memory. Some time ago, I
>> wrote a blog post on sizing a Flink cluster:
>> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>>
>> Best,
>> Robert
>>
>>
>> On Fri, Dec 13, 2019 at 5:06 PM Andrés Garagiola <
>> andresgaragi...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I'm testing Flink to do stream processing, in my use case there are
>>> multiples pipelines processing messages from multiple Kafka sources. I have
>>> some questions regarding the jobs and slots.
>>>
>>> 1) When I deploy a new job, it takes a job slot in the TM, the job never
>>> ends (I think it doesn't end because is a stream pipeline), and the slot is
>>> never released, this means that the slot is busy even when no new messages
>>> are coming from the Kafka topic. Is that OK or I'm doing something wrong?
>>> Is there a way to do a more efficient utilization of the job slots?
>>>
>>> 2) In my use case, I need good job scalability. Potentially I could have
>>> many pipelines running in the Flink environment, but on the other hand,
>>> increase latency would not be a serious problem for me. There are some
>>> recommendations regarding memory for slot? I saw that the CPU
>>> recommendation is a core per slot, taking into account that increase the
>>> latency would not be a big problem, do you see another good reason to
>>> follow this recommendation?
>>>
>>> Thank you
>>> Regards
>>>
>>

Reply via email to