Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi Arvid, Thank you so much for your detailed reply. I think I will go with one schema per topic using GenericRecordAvroTypeInfo for genericRecords and not do any custom magic. Approach of sending records as byte array also seems quite interesting. Right now I am deserializing avro records so

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
You need to differentiate two serialization abstractions (which I guess you already know). One is coming from reading the source, where the DeserializationSchema is used, and it translates the bytes of Kafka into something that Flink can handle. The second serialization occurs within Flink through

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
So in this case, flink will fall back to default kyro serialiser right ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
If you follow the best practices, then topics should never have different schemas as you can't enforce schema compatibility. You also have very limited processing capabilities and clumsy workflows attached to it. If you want to encode different kinds of events, then the common approach is to use so

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi Arvid, Thanks a lot for your reply. And yes, we do use confluent schema registry extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects reader schema to be provided. That means it reads the message using writer schema and converts to reader schema. But this is not what I want

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
The common solution is to use a schema registry, like Confluent schema registry [1]. All records have a small 5 byte prefix that identifies the schema and that gets fetched by deserializer [2]. Here are some resources on how to properly secure communication if needed [3]. [1] https://docs.confluen

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi, Thanks a lot for the reply. And you both are right. Serializing GenericRecord without specifying schema was indeed a HUGE bottleneck in my app. I got to know it through jfr analysis and then read the blog post you mentioned. Now I am able to pump in lot more data per second. (In my test setup a

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Robert Metzger
Hi, from my experience serialization contributes a lot to the maximum achievable throughput. I can strongly recommend checking out this blog post, which has a lot of details on the topic: https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html On Tue, Nov 10, 2020 at 9:46 AM

Re: Help needed to increase throughput of simple flink app

2020-11-10 Thread ashwinkonale
Hey, I am reading messages with schema id and using confluent schema registry to deserialize to Genericrecord. After this point, pipelineline will have this objects moving across. Can you give me some examples of `special handling of avro messages` you mentioned ? -- Sent from: http://apache-fli

Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread Jaffe, Julian
One thing to check is how much you're serializing to the network. If you're using Avro Generic records without special handling you can wind up serializing the schema with every record, greatly increasing the amount of data you're sending across the wire. On 11/9/20, 8:14 AM, "ashwinkonale" w

Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread ashwinkonale
Hi, Thanks a lot for the reply. I added some more metrics to the pipeline to understand bottleneck. Seems like avro deserialization introduces some delay. With use of histogram I found processing of a single message takes ~300us(p99). ~180(p50). Which means a single slot can output at most 3000 mes

Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread Kostas Kloudas
Hi Ashwin, Do you have any filtering or aggregation (or any operation that emits less data than it receives) in your logic? If yes, you could for example put if before the reschaling operation so that it gets chained to your source and you reduce the amount of data you ship through the network. Af

Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread ashwinkonale
Hi Till, Thanks a lot for the reply. The problem I am facing is as soon as I add network(remove chaining) to discarding sink, I have huge problem with throughput. Do you have any pointers on how can I go about debugging this ? - Ashwin -- Sent from: http://apache-flink-user-mailing-list-archive

Re: Help needed to increase throughput of simple flink app

2020-11-09 Thread Till Rohrmann
Hi Ashwin, Thanks for reaching out to the Flink community. Since you have tested that a kafka_source -> discarding_sink can process 10 Million records/s you might also wanna test the write throughput to data_sink and dlq_sink. Maybe these sinks are limiting your overall throughput by backpressurin

Help needed to increase throughput of simple flink app

2020-11-08 Thread ashwin konale
Hey guys, I am struggling to improve the throughput of my simple flink application. The target topology is this. read_from_kafka(byte array deserializer) --rescale--> processFunction(confluent avro deserialization) -> split -> 1. data_sink,2.dlq_sink Kafka traffic is pretty high Partitions: 128 T