I'm not 100% sure about this answer, that's why I'm CCing Aljoscha to correct me if needed:
Partitioners are not regular operators (like a map or window), thus they are not included in the regular Task lifecycle methods (of open() / map() etc. / close(), with the proper error handling, task cancellation mechanisms etc.). The custom partition function is called somewhere close to the network stack. It would be quite a lot of effort (and added complexity to the codebase) to allow for rich partitioners. Given that custom partitioners are a rarely used feature, it would not be justified to spend a lot of time for this (there's also a good workaround available) On Fri, May 29, 2020 at 2:46 PM LINZ, Arnaud <al...@bouyguestelecom.fr> wrote: > Hello, > > > > Yes, that would definitely do the trick, with an extra mapper after keyBy > to remove the tuple so that it stays seamless. It’s less hacky that what I > was thinking of, thanks! > > However, is there any plan in a future release to have rich partitioners ? > That would avoid adding overhead and “intermediate” technical info in the > stream payload. > > Best, > > Arnaud > > > > *De :* Robert Metzger <rmetz...@apache.org> > *Envoyé :* vendredi 29 mai 2020 13:10 > *À :* LINZ, Arnaud <al...@bouyguestelecom.fr> > *Cc :* user <user@flink.apache.org> > *Objet :* Re: Best way to "emulate" a rich Partitioner with open() and > close() methods ? > > > > Hi Arnaud, > > > > Maybe I don't fully understand the constraints, but what about > > stream.map(new GetKuduPartitionMapper).keyBy(0).addSink(KuduSink()); > > > The map(new GetKuduPartitionMapper) will be a regular RichMapFunction with > open() and close() where you can handle the connection with Kudu's > partitioning service. > > The map will output a Tuple2<PartitionId, Data> (or something nicer :) ), > then Flink shuffles your data correctly, and the sinks will process the > data correctly partitioned. > > > > I hope that this is what you were looking for! > > > > Best, > > Robert > > > > On Thu, May 28, 2020 at 6:21 PM LINZ, Arnaud <al...@bouyguestelecom.fr> > wrote: > > Hello, > > > > I would like to upgrade the performance of my Apache Kudu Sink by using > the new “KuduPartitioner” of Kudu API to match Flink stream partitions with > Kudu partitions to lower the network shuffling. > > For that, I would like to implement something like > > stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new > KuduSink(…))); > > With KuduFLinkPartitioner a implementation of > org.apache.flink.api.common.functions.Partitioner > that internally make use of the KuduPartitioner client tool of Kudu’s API. > > > > However for that KuduPartioner to work, it needs to open – and close at > the end – a connection to the Kudu table – obviously something that can’t > be done for each line. But there is no “AbstractRichPartitioner” with > open() and close() method that I can use for that (the way I use it in the > sink for instance). > > > > What is the best way to implement this ? > > I thought of ThreadLocals that would be initialized during the first call > to *int* partition(K key, *int* numPartitions); but I won’t be able to > close() things nicely as I won’t be notified on job termination. > > > > I thought of putting those static ThreadLocals inside a “Identity Mapper” > that would be called just prior the partition with something like : > > stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new > KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…))); > > with kudu connections initialized in the mapper open(), closed in the > mapper close(), and used in the partitioner partition(). > > However It looks like an ugly hack breaking every coding principle, but as > long as the threads are reused between the mapper and the partitioner I > think that it should work. > > > > Is there a better way to do this ? > > > > Best regards, > > Arnaud > > > > > > > ------------------------------ > > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses pièces > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si > vous n'êtes pas destinataire de ce message, merci de le détruire et > d'avertir l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. The > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then > please delete it and notify the sender. > >