Hi Piotr,

Thanks!
I will try it. It is a bit ugly solution, but it may work :)


> On 23 May 2018, at 16:11, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> One more remark. Currently there is unwritten assumption in Flink, that time 
> to process records is proportional number of bytes.  As you noted, this 
> brakes in case of mixed workloads (especially with file paths sent as 
> records). 
> 
> There is interesting workaround this problem though. You could use custom 
> serializer for the file paths to artificially blow the record size, for 
> example to "segment-size” (32KB), or even more. This is easy to do - for 
> example just pad the string with spaces. It would ensure that there is at 
> most one file path to process per network buffer and would even out the 
> imbalance of the assumption of record size being proportional to number of 
> bytes.
> 
> Piotrek
> 
>> On 23 May 2018, at 15:40, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> Yes if you have mixed workload in your pipeline, it is matter of finding a 
>> right balance. Situation will be better in Flink 1.5, but the underlying 
>> issue will remain as well - in 1.5.0 there also will be no way to change 
>> network buffers configuration between stages of the single job.
>> 
>> Currently such explosion of records (one small records producing huge bunch 
>> of new records) is kind of anti pattern in Flink. Besides the problem that 
>> we were discussing, the other problem is that you can not checkpoint in the 
>> middle of processing the big record. I hope that this will change in future 
>> Flink releases, but currently those are the limitations.
>> 
>> For your case, with initial records being file paths, it might be better to 
>> embed this logic within a data source, so your data source is already 
>> producing parsed records. For example FlinkKafkaConsumer is discovering 
>> topics/partitions on the fly, and the smallest transport unit is still 
>> “parsed record” and not a “topic” (“file path” in your case). With proper 
>> offsets implementation this also handles the problem of checkpointing in the 
>> middle of processing large file.
>> 
>> Piotrek
>> 
>>> On 23 May 2018, at 15:10, Andrei Shumanski <and...@shumanski.com 
>>> <mailto:and...@shumanski.com>> wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Thank you very much for your response.
>>> I will try the new feature of Flink 1.5 when it is released.
>>> 
>>> But I am not sure minimising buffers sizes will work in all scenarios.
>>> If I understand correctly these settings are affecting the whole Flink 
>>> instance.
>>> 
>>> We might have a flow like this:
>>> 
>>> Source: Read file paths --> Unpack and parse files --> Analyse parsed data 
>>> -> ….
>>> 
>>> So it will be a very small amount of data at first step but quite a lot of 
>>> parsed data later.
>>> Changing buffer sizes globally will probably affect throughput of later 
>>> steps, as you wrote.
>>> 
>>> 
>>>> On 23 May 2018, at 14:48, Piotr Nowojski <pi...@data-artisans.com 
>>>> <mailto:pi...@data-artisans.com>> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Yes, Flink 1.5.0 will come with better tools to handle this problem. 
>>>> Namely you will be able to limit the “in flight” data, by controlling the 
>>>> number of assigned credits per channel/input gate. Even without any 
>>>> configuring Flink 1.5.0 will out of the box buffer less data, thus 
>>>> mitigating the problem.
>>>> 
>>>> There are some tweaks that you could use to make 1.4.x work better. With 
>>>> small records that require heavy processing, generally speaking you do not 
>>>> need huge buffers sizes to keep max throughput. You can try to both reduce 
>>>> the buffer pool and reduce the memory segment sizes:
>>>> 
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers
>>>>  
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers>
>>>>    • taskmanager.network.memory.fraction: Fraction of JVM memory to use 
>>>> for network buffers (DEFAULT: 0.1),
>>>>    • taskmanager.network.memory.min: Minimum memory size for network 
>>>> buffers in bytes (DEFAULT: 64 MB),
>>>>    • taskmanager.network.memory.max: Maximum memory size for network 
>>>> buffers in bytes (DEFAULT: 1 GB), and
>>>>    • taskmanager.memory.segment-size: Size of memory buffers used by the 
>>>> memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 
>>>> KiBytes)).
>>>> 
>>>> Reducing those values will reduce amount of in-flight data that will be 
>>>> caught between checkpoints. But keep in mind that smaller values can lead 
>>>> to smaller throughput, but as I said, with small number of heavy 
>>>> processing records this is not an issue. In an extreme example, if your 
>>>> records are lets say 8 bytes each and require 1 hour to process, there is 
>>>> almost no need for any buffering. 
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 23 May 2018, at 12:58, Fabian Hueske <fhue...@gmail.com 
>>>>> <mailto:fhue...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Andrei,
>>>>> 
>>>>> With the current version of Flink, there is no general solution to this 
>>>>> problem.
>>>>> The upcoming version 1.5.0 of Flink adds a feature called credit-based 
>>>>> flow control which might help here.
>>>>> 
>>>>> I'm adding @Piotr to this thread who knows more about the details of this 
>>>>> new feature.
>>>>> 
>>>>> Best, Fabian
>>>>> 
>>>>> 2018-05-18 11:59 GMT+02:00 Andrei Shumanski <and...@shumanski.com 
>>>>> <mailto:and...@shumanski.com>>:
>>>>> Hi,
>>>>> 
>>>>> 
>>>>> Right now it is a Kafka source, but I had the same issue when reading 
>>>>> data from local FS.
>>>>> 
>>>>> It looks like a common problem for many (all?) sources.
>>>>> When incoming data is very small (paths to large archives) but each entry 
>>>>> requires a significant time to process (unpack, parse, etc.) Flink 
>>>>> detects the back pressure with delay and too much data becomes part of 
>>>>> the first transaction.
>>>>> 
>>>>> 
>>>>> 
>>>>> -- 
>>>>> Best regards,
>>>>> Andrei Shumanski
>>>>> 
>>>>> 
>>>>> 
>>>>> On Fri, May 18, 2018 at 11:44 AM, makeyang <riverbuild...@hotmail.com 
>>>>> <mailto:riverbuild...@hotmail.com>> wrote:
>>>>> Andrei Shumanski:
>>>>>     which source are u using?
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Sent from: 
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to