Hi Piotr, Thanks! I will try it. It is a bit ugly solution, but it may work :)
> On 23 May 2018, at 16:11, Piotr Nowojski <pi...@data-artisans.com> wrote: > > One more remark. Currently there is unwritten assumption in Flink, that time > to process records is proportional number of bytes. As you noted, this > brakes in case of mixed workloads (especially with file paths sent as > records). > > There is interesting workaround this problem though. You could use custom > serializer for the file paths to artificially blow the record size, for > example to "segment-size” (32KB), or even more. This is easy to do - for > example just pad the string with spaces. It would ensure that there is at > most one file path to process per network buffer and would even out the > imbalance of the assumption of record size being proportional to number of > bytes. > > Piotrek > >> On 23 May 2018, at 15:40, Piotr Nowojski <pi...@data-artisans.com >> <mailto:pi...@data-artisans.com>> wrote: >> >> Hi, >> >> Yes if you have mixed workload in your pipeline, it is matter of finding a >> right balance. Situation will be better in Flink 1.5, but the underlying >> issue will remain as well - in 1.5.0 there also will be no way to change >> network buffers configuration between stages of the single job. >> >> Currently such explosion of records (one small records producing huge bunch >> of new records) is kind of anti pattern in Flink. Besides the problem that >> we were discussing, the other problem is that you can not checkpoint in the >> middle of processing the big record. I hope that this will change in future >> Flink releases, but currently those are the limitations. >> >> For your case, with initial records being file paths, it might be better to >> embed this logic within a data source, so your data source is already >> producing parsed records. For example FlinkKafkaConsumer is discovering >> topics/partitions on the fly, and the smallest transport unit is still >> “parsed record” and not a “topic” (“file path” in your case). With proper >> offsets implementation this also handles the problem of checkpointing in the >> middle of processing large file. >> >> Piotrek >> >>> On 23 May 2018, at 15:10, Andrei Shumanski <and...@shumanski.com >>> <mailto:and...@shumanski.com>> wrote: >>> >>> Hi Piotr, >>> >>> Thank you very much for your response. >>> I will try the new feature of Flink 1.5 when it is released. >>> >>> But I am not sure minimising buffers sizes will work in all scenarios. >>> If I understand correctly these settings are affecting the whole Flink >>> instance. >>> >>> We might have a flow like this: >>> >>> Source: Read file paths --> Unpack and parse files --> Analyse parsed data >>> -> …. >>> >>> So it will be a very small amount of data at first step but quite a lot of >>> parsed data later. >>> Changing buffer sizes globally will probably affect throughput of later >>> steps, as you wrote. >>> >>> >>>> On 23 May 2018, at 14:48, Piotr Nowojski <pi...@data-artisans.com >>>> <mailto:pi...@data-artisans.com>> wrote: >>>> >>>> Hi, >>>> >>>> Yes, Flink 1.5.0 will come with better tools to handle this problem. >>>> Namely you will be able to limit the “in flight” data, by controlling the >>>> number of assigned credits per channel/input gate. Even without any >>>> configuring Flink 1.5.0 will out of the box buffer less data, thus >>>> mitigating the problem. >>>> >>>> There are some tweaks that you could use to make 1.4.x work better. With >>>> small records that require heavy processing, generally speaking you do not >>>> need huge buffers sizes to keep max throughput. You can try to both reduce >>>> the buffer pool and reduce the memory segment sizes: >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers >>>> >>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#configuring-the-network-buffers> >>>> • taskmanager.network.memory.fraction: Fraction of JVM memory to use >>>> for network buffers (DEFAULT: 0.1), >>>> • taskmanager.network.memory.min: Minimum memory size for network >>>> buffers in bytes (DEFAULT: 64 MB), >>>> • taskmanager.network.memory.max: Maximum memory size for network >>>> buffers in bytes (DEFAULT: 1 GB), and >>>> • taskmanager.memory.segment-size: Size of memory buffers used by the >>>> memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 >>>> KiBytes)). >>>> >>>> Reducing those values will reduce amount of in-flight data that will be >>>> caught between checkpoints. But keep in mind that smaller values can lead >>>> to smaller throughput, but as I said, with small number of heavy >>>> processing records this is not an issue. In an extreme example, if your >>>> records are lets say 8 bytes each and require 1 hour to process, there is >>>> almost no need for any buffering. >>>> >>>> Piotrek >>>> >>>>> On 23 May 2018, at 12:58, Fabian Hueske <fhue...@gmail.com >>>>> <mailto:fhue...@gmail.com>> wrote: >>>>> >>>>> Hi Andrei, >>>>> >>>>> With the current version of Flink, there is no general solution to this >>>>> problem. >>>>> The upcoming version 1.5.0 of Flink adds a feature called credit-based >>>>> flow control which might help here. >>>>> >>>>> I'm adding @Piotr to this thread who knows more about the details of this >>>>> new feature. >>>>> >>>>> Best, Fabian >>>>> >>>>> 2018-05-18 11:59 GMT+02:00 Andrei Shumanski <and...@shumanski.com >>>>> <mailto:and...@shumanski.com>>: >>>>> Hi, >>>>> >>>>> >>>>> Right now it is a Kafka source, but I had the same issue when reading >>>>> data from local FS. >>>>> >>>>> It looks like a common problem for many (all?) sources. >>>>> When incoming data is very small (paths to large archives) but each entry >>>>> requires a significant time to process (unpack, parse, etc.) Flink >>>>> detects the back pressure with delay and too much data becomes part of >>>>> the first transaction. >>>>> >>>>> >>>>> >>>>> -- >>>>> Best regards, >>>>> Andrei Shumanski >>>>> >>>>> >>>>> >>>>> On Fri, May 18, 2018 at 11:44 AM, makeyang <riverbuild...@hotmail.com >>>>> <mailto:riverbuild...@hotmail.com>> wrote: >>>>> Andrei Shumanski: >>>>> which source are u using? >>>>> >>>>> >>>>> >>>>> -- >>>>> Sent from: >>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> >>>>> >>>>> >>>> >>> >> >