Hi Theo,

there are lot of performance improvements that Flink could do but they would further complicate the interfaces and API. They would require deep knowledge of users about the runtime when it is safe to reuse object and when not.

The Table/SQL API of Flink uses a lot of these optimization under the hood and works on binary data for reducing garbage collection.

For the DataStream API, the community decided for safety/correctness before performance in this case. But disabling the object reuse and further low level optimization should give a good result if needed.

Regards,
Timo


On 19.02.20 10:43, Theo Diefenthal wrote:
I have the same experience as Eleanore,

When enabling object reuse, I saw a significant performance improvement and in my profiling session. I saw that a lot of serialization/deserialization was not performed any more.

That’s why my question should originally aim a bit further: It’s good that Flink reuses objects, but I still need to create a new instance of my objects per event when parsed, which is ultimately dropped at some processing step in the flink pipeline later on (map, shuffle or sink). Wouldn’t it be possible that the “deserialize” method can have an optional “oldPOJO” input where Flink provides me an unused old instance of my POJO if it has one left? And if null, I instantiate a new instance in my code? With billions of small events ingested per day, I can imagine this to be another small performance improvement especially in terms of garbage collection…

Best regads

Theo

*From:*Till Rohrmann <trohrm...@apache.org>
*Sent:* Mittwoch, 19. Februar 2020 07:34
*To:* Jin Yi <eleanore....@gmail.com>
*Cc:* user <user@flink.apache.org>
*Subject:* Re: Parallelize Kafka Deserialization of a single partition?

Then my statement must be wrong. Let me double check this. Yesterday when checking the usage of the objectReuse field, I could only see it in the batch operators. I'll get back to you.

Cheers,

Till

On Wed, Feb 19, 2020, 07:05 Jin Yi <eleanore....@gmail.com <mailto:eleanore....@gmail.com>> wrote:

    Hi Till,

    I just read your comment:

    Currently, enabling object reuse via
    ExecutionConfig.enableObjectReuse() only affects the DataSet API.
    DataStream programs will always do defensive copies. There is a FLIP
    to improve this behaviour [1].

    I have an application that is written in apache beam, but the runner
    is flink, in the configuration of the pipeline, it is in streaming
    mode, and I see performance difference between enable/disable
    ObjectReuse, also when running in debugging mode, I noticed that
    with objectReuse set to true, there is no
    serialization/deserialization happening between operators, however,
    when set to false, in between each operator, the serialization and
    deserialization is happening. So do you have any idea why this is
    happening?

    MyOptions options = PipelineOptionsFactory./as/(MyOptions.*class*);

    options.setStreaming(*true*);

    options.setObjectReuse(*true*);

    Thanks a lot!

    Eleanore

    On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann <trohrm...@apache.org
    <mailto:trohrm...@apache.org>> wrote:

        Hi Theo,

        the KafkaDeserializationSchema does not allow to return
        asynchronous results. Hence, Flink will always wait until
        KafkaDeserializationSchema.deserialize returns the parsed value.
        Consequently, the only way I can think of to offload the complex
        parsing logic would be to do it in a downstream operator where
        you could use AsyncI/O to run the parsing logic in a thread
        pool, for example.

        Alternatively, you could think about a simple program which
        transforms your input events into another format where it is
        easier to extract the timestamp from. This comes, however, at
        the cost of another Kafka topic.

        Currently, enabling object reuse via
        ExecutionConfig.enableObjectReuse() only affects the DataSet
        API. DataStream programs will always do defensive copies. There
        is a FLIP to improve this behaviour [1].

        [1]
        
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982

        Cheers,

        Till

        On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal
        <theo.diefent...@scoop-software.de
        <mailto:theo.diefent...@scoop-software.de>> wrote:

            Hi,

            As for most pipelines, our flink pipeline start with parsing
            source kafka events into POJOs. We perform this step within
            a KafkaDeserizationSchema so that we properly extract the
            event itme timestamp for the downstream Timestamp-Assigner.

            Now it turned out that parsing is currently the most CPU
            intensive task in our pipeline and thus CPU bounds the
            number of elements we can ingest per second. Further
            splitting up the partitions will be hard as we need to
            maintain the exact order of events per partition and would
            also required quite some architectural changes for producers
            and the flink job.

            Now I had the idea to put the parsing task into ordered
            Async-IO. But AsyncIO can only be plugged in into an
            existing Stream, not into the deserialization schema, as far
            as I see. So the best idea I currently have is to keep
            parsing in the DeserializationSchema as minimal as possible
            to extract the Event timestamp and do the full parsing
            downstream in Async IO. This however, seems to be a bit
            tedious, especially as we have to deal with multiple input
            formats and would need to develop two parsers for the heavy
            load once: a timestamp only and a full parser.

            Do you know if it is somehow possible to parallelize / async
            IO the parsing within the KafkaDeserializationSchema? I
            don't have state access in there and I don't have a
            "collector" object in there so that one element as input
            needs to produce exactly one output element.

            Another question: My parsing produces Java POJO objects via
            "new", which are sent downstream (reusePOJO setting set) and
            finally will be garbage collected once they reached the
            sink. Is there some mechanism in Flink so that I could reuse
            "old" sinked POJOs in the source? All tasks are chained so
            that theoretically, that could be possible?

            Best regards

            Theo


Reply via email to