Hi Alex,

If you are sure that the operations in between do not change the
partitioning of the data and keep the key constant for the whole
pipeline you could use the reinterpretAsKeyedStream[1]. I guess this
answers your questions 1 & 2.

As for the third question, first of all you should look into enabling
object reuse[2]. Make sure though you work with immutable objects.
Secondly, all operators that simply forwards records should be chained
by default. If you need a more fine grained control over it you can look
into this docs[3]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/experimental/#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/execution_configuration/#execution-configuration

[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups

On 10/05/2021 08:59, Alex Drobinsky wrote:
> Dear entity that represents Flink user community,
>
> In order to formulate the question itself, I would need to describe
> the problem in many details, hence please bear with me for a while.
>
> I have following execution graph:
>
> KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy ->
> Storage -> keyBy -> Classifier -> KafkaSink (This is slightly
> simplified version )
>
> When I noticed less than ideal throughput, I executed profiler which
> identified
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput)
> as a major function (83% of time spent here). 45% of total time is
> spent in
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable).
>
> The serialization is protobuf with Kryo, according to benchmarks it
> isn't particularly slow , should be similar or a bit better than POJO.
>
> The problem from my point of view is that serialization shouldn't
> happen at all, unless data is actually sent via network to another
> node ( in my case I have one job manager and one task manager ).
>
> However, I would suspect that keyBy operation implicitly enforces
> usage of serialization / deserialization.
>
> First question : In this particular case, the key is exactly the same
> for every keyBy, is there any other way than combining operations into
> a single operator to avoid performance impact from keyBy chain ?
>
> Second question : could I use the process function after keyBy in such
> a way that it will not merge stream back e.g. it will continue to be
> KeyedStream ?
>
> Third question: could I somehow specify that the sequence of operators
> must be executed in the same thread without
> serialization/deserialization operations in between ?
>
>
> Best regards,
> Alexander 

Attachment: OpenPGP_signature
Description: OpenPGP digital signature

Reply via email to