Hi Alex, If you are sure that the operations in between do not change the partitioning of the data and keep the key constant for the whole pipeline you could use the reinterpretAsKeyedStream[1]. I guess this answers your questions 1 & 2.
As for the third question, first of all you should look into enabling object reuse[2]. Make sure though you work with immutable objects. Secondly, all operators that simply forwards records should be chained by default. If you need a more fine grained control over it you can look into this docs[3] Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/experimental/#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/execution_configuration/#execution-configuration [3] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups On 10/05/2021 08:59, Alex Drobinsky wrote: > Dear entity that represents Flink user community, > > In order to formulate the question itself, I would need to describe > the problem in many details, hence please bear with me for a while. > > I have following execution graph: > > KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy -> > Storage -> keyBy -> Classifier -> KafkaSink (This is slightly > simplified version ) > > When I noticed less than ideal throughput, I executed profiler which > identified > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput) > as a major function (83% of time spent here). 45% of total time is > spent in > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable). > > The serialization is protobuf with Kryo, according to benchmarks it > isn't particularly slow , should be similar or a bit better than POJO. > > The problem from my point of view is that serialization shouldn't > happen at all, unless data is actually sent via network to another > node ( in my case I have one job manager and one task manager ). > > However, I would suspect that keyBy operation implicitly enforces > usage of serialization / deserialization. > > First question : In this particular case, the key is exactly the same > for every keyBy, is there any other way than combining operations into > a single operator to avoid performance impact from keyBy chain ? > > Second question : could I use the process function after keyBy in such > a way that it will not merge stream back e.g. it will continue to be > KeyedStream ? > > Third question: could I somehow specify that the sequence of operators > must be executed in the same thread without > serialization/deserialization operations in between ? > > > Best regards, > Alexander
OpenPGP_signature
Description: OpenPGP digital signature
