Hi Arnaud, just to add up. The overhead of this additional map is negligible if you enable object reuse [1].
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html On Tue, Jun 2, 2020 at 10:34 AM Robert Metzger <rmetz...@apache.org> wrote: > I'm not 100% sure about this answer, that's why I'm CCing Aljoscha to > correct me if needed: > > Partitioners are not regular operators (like a map or window), thus they > are not included in the regular Task lifecycle methods (of open() / map() > etc. / close(), with the proper error handling, task cancellation > mechanisms etc.). The custom partition function is called somewhere close > to the network stack. > It would be quite a lot of effort (and added complexity to the codebase) > to allow for rich partitioners. Given that custom partitioners are a rarely > used feature, it would not be justified to spend a lot of time for this > (there's also a good workaround available) > > > On Fri, May 29, 2020 at 2:46 PM LINZ, Arnaud <al...@bouyguestelecom.fr> > wrote: > >> Hello, >> >> >> >> Yes, that would definitely do the trick, with an extra mapper after keyBy >> to remove the tuple so that it stays seamless. It’s less hacky that what I >> was thinking of, thanks! >> >> However, is there any plan in a future release to have rich partitioners >> ? That would avoid adding overhead and “intermediate” technical info in >> the stream payload. >> >> Best, >> >> Arnaud >> >> >> >> *De :* Robert Metzger <rmetz...@apache.org> >> *Envoyé :* vendredi 29 mai 2020 13:10 >> *À :* LINZ, Arnaud <al...@bouyguestelecom.fr> >> *Cc :* user <user@flink.apache.org> >> *Objet :* Re: Best way to "emulate" a rich Partitioner with open() and >> close() methods ? >> >> >> >> Hi Arnaud, >> >> >> >> Maybe I don't fully understand the constraints, but what about >> >> stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink()); >> >> >> The map(new GetKuduPartitionMapper) will be a regular RichMapFunction >> with open() and close() where you can handle the connection with Kudu's >> partitioning service. >> >> The map will output a Tuple2<PartitionId, Data> (or something nicer :) ), >> then Flink shuffles your data correctly, and the sinks will process the >> data correctly partitioned. >> >> >> >> I hope that this is what you were looking for! >> >> >> >> Best, >> >> Robert >> >> >> >> On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <al...@bouyguestelecom.fr> >> wrote: >> >> Hello, >> >> >> >> I would like to upgrade the performance of my Apache Kudu Sink by using >> the new “KuduPartitioner” of Kudu API to match Flink stream partitions with >> Kudu partitions to lower the network shuffling. >> >> For that, I would like to implement something like >> >> stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new >> KuduSink(…))); >> >> With KuduFLinkPartitioner a implementation of >> org.apache.flink.api.common.functions.Partitioner >> that internally make use of the KuduPartitioner client tool of Kudu’s API. >> >> >> >> However for that KuduPartioner to work, it needs to open – and close at >> the end – a connection to the Kudu table – obviously something that can’t >> be done for each line. But there is no “AbstractRichPartitioner” with >> open() and close() method that I can use for that (the way I use it in the >> sink for instance). >> >> >> >> What is the best way to implement this ? >> >> I thought of ThreadLocals that would be initialized during the first call >> to *int* partition(K key, *int* numPartitions); but I won’t be able to >> close() things nicely as I won’t be notified on job termination. >> >> >> >> I thought of putting those static ThreadLocals inside a “Identity Mapper” >> that would be called just prior the partition with something like : >> >> stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new >> KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…))); >> >> with kudu connections initialized in the mapper open(), closed in the >> mapper close(), and used in the partitioner partition(). >> >> However It looks like an ugly hack breaking every coding principle, but >> as long as the threads are reused between the mapper and the partitioner I >> think that it should work. >> >> >> >> Is there a better way to do this ? >> >> >> >> Best regards, >> >> Arnaud >> >> >> >> >> >> >> ------------------------------ >> >> >> L'intégrité de ce message n'étant pas assurée sur internet, la société >> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces >> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si >> vous n'êtes pas destinataire de ce message, merci de le détruire et >> d'avertir l'expéditeur. >> >> The integrity of this message cannot be guaranteed on the Internet. The >> company that sent this message cannot therefore be held liable for its >> content nor attachments. Any unauthorized use or dissemination is >> prohibited. If you are not the intended recipient of this message, then >> please delete it and notify the sender. >> >> -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng