My pleasure, happy to hear that it solved your problem. I was wondering: Do more people have a similar structure in their types and utilities? If yes, it may make sense to add a generic version of these utilities to the Fink codebase.
On Fri, May 1, 2015 at 11:51 AM, LINZ, Arnaud <al...@bouyguestelecom.fr> wrote: > Hi Stephen, > > Thanks a lot, that solves my problem. I am truly amazed and grateful by > the time you spent to help me on this subject. > > Greetings, > Arnaud > ------------------------------ > De : Stephan Ewen <se...@apache.org> > Envoyé : 29/04/2015 17:19 > > À : user@flink.apache.org > Objet : Re: How to make a generic key for groupBy > > Hey Arnaud! > > I have made a quick sample implementation of how you can very > efficiently support generic keys, like yours. I put the code in this > repository: https://github.com/StephanEwen/flink-generic-keys > > > It implements a special key selector. You can use that to do what you > used to do, and it internally does a bit of magic to expose type > information to the Flink pre-flight phase: > > > https://github.com/StephanEwen/flink-generic-keys/blob/master/src/main/java/com/dataartisans/flink/TupleKeySelector.java > > > > Here is a quick intro to what this does: > > - It exposes keys as Flink tuples, by wrapping your Keys in a Tuple in > a generic way. Tuples are the fastest keys in Flink, because they support > the best "on-binary-data" operations. > > - It figures out which types are going to be in the tuples (and in > your generic keys), by analyzing the Method signature of your concrete > "getKey()" method implementation. > > > The requirement for the second part is that your concrete types declare > the concrete key in their signature. Below is an example. Note that the > "SomeType" class does not declare "Key" as the return type of "getKey()", > but very specifically "Key2<String, Integer>". That way, the key selector > can pick up the types (String and Integer) and expose it to Flink, so that > Flink can generate the key comparators to do efficient binary operations > for the keys. > > > public abstract class TypeBase { > public abstract Key getKey(); > } > > public class SomeType extends TypeBase { > public String someString; > public int anotherInteger; > public Date aDate; > > @Override > public Key2<String, Integer> getKey() { > return new Key2<String, Integer>(someString, anotherInteger); > } > } > > > The good thing about exposing this information to Flink in the > pre-flight phase (before the job runs in parallel) is that it pre-checks > many things to prevent most memory and serialization surprises at runtime. > > Let me know if you have questions! > > Greetings, > Stephan > > > On Mon, Apr 27, 2015 at 6:38 PM, Stephan Ewen <se...@apache.org> wrote: > >> Hi! >> >> I have some ideas, let me see if I can make them concrete until >> tomorrow... >> >> Greetings, >> Stephan >> >> >> On Mon, Apr 27, 2015 at 5:29 PM, LINZ, Arnaud <al...@bouyguestelecom.fr> >> wrote: >> >>> Hi, >>> >>> I see. My Key class is an abstract class, which subclasses are Key1<?>, >>> Key2<?,?> etc, so it’s very like a tuple. It is heavily used in >>> “non-distributed” hash maps once the dataset is reduced to fit on a single >>> JVM. >>> >>> It exposes the common contract that I need (such as getHeadKey(), >>> getLastl(), or makeKey(Key,Object)) to “navigate” in the key space, and a >>> cached hash code to make hash maps faster. My generic algorithms do not >>> need to know how many fields are exposed in the Key, but they need to be >>> able to construct another key from two keys. >>> >>> >>> >>> Arnaud >>> >>> >>> >>> *De :* ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] *De la part >>> de* Stephan Ewen >>> *Envoyé :* vendredi 24 avril 2015 11:14 >>> *À :* user@flink.apache.org >>> *Objet :* Re: How to make a generic key for groupBy >>> >>> >>> >>> Hi Arnaud! >>> >>> >>> >>> Thank you for the warm words! Let's find a good way to get this to >>> work... >>> >>> >>> >>> As a bit of background: >>> >>> In Flink, the API needs to now a bit about the types that go through the >>> functions, because Flink pre-generates and configures serializers, and >>> validates that things fit together. >>> >>> >>> >>> It is also important that keys are exposed rather specifically, because >>> Flink internally tries to work on serialized data (that makes it in-memory >>> operations predictable and robust). >>> >>> >>> >>> If you expose a key as a "String", or "long" or "double", then Flink >>> knows how to work on it in a binary fashion. >>> >>> Also, if you expose a key as a POJO, then Flink interprets the key as a >>> combination of the fields, and can again work on the serialized data. >>> >>> >>> >>> If you only expose "Comparable" (which is the bare minimum for a key), >>> you experience performance degradation (most notably for sorts), because >>> every key operation involves serialization and deserialization. >>> >>> >>> >>> So the goal would be to expose the key properly. We can always hint to >>> the API what the key type is, precisely for the cases where the inference >>> cannot do it. >>> >>> - To understand things a bit better: What is your "Key" type? Is it an >>> abstract class, an interface, a generic parameter? >>> >>> >>> >>> >>> >>> Greetings, >>> >>> Stephan >>> >>> >>> >>> >>> >>> FYI: In Scala, this works actually quite a bit easier, since Scala does >>> preserve generic types. In Java, we built a lot of reflection tooling, but >>> there are cases where it is impossible to infer the types via reflection, >>> like yours. >>> >>> >>> >>> >>> >>> >>> >>> On Thu, Apr 23, 2015 at 6:35 PM, Soumitra Kumar < >>> kumar.soumi...@gmail.com> wrote: >>> >>> Will you elaborate on your use case? It would help to find out where >>> Flink shines. IMO, its a great project, but needs more differentiation from >>> Spark. >>> >>> >>> >>> On Thu, Apr 23, 2015 at 7:25 AM, LINZ, Arnaud <al...@bouyguestelecom.fr> >>> wrote: >>> >>> Hello, >>> >>> >>> >>> After a quite successful benchmark yesterday (Flink being about twice >>> faster than Spark on my use cases), I’ve turned instantly from spark-fan to >>> flink-fan – great job, committers! >>> >>> So I’ve decided to port my existing Spark tools to Flink. Happily, most >>> of the difficulty was renaming classes, packages and variables with “spark” >>> in them to something more neutral J >>> >>> >>> >>> However there is one easy thing in Spark I’m still wondering how to do >>> in Flink : generic keys. >>> >>> >>> >>> I’m trying to make a framework on which my applications are built. That >>> framework thus manipulate “generic types” representing the data, inheriting >>> from an abstract class with a common contract, let’s call it “Bean”. >>> >>> >>> >>> Among other things Bean exposes an abstract method >>> >>> *public* Key getKey(); >>> >>> >>> >>> Key being one of my core types used in several java algorithms. >>> >>> >>> >>> Let’s say I have the class : >>> >>> *public* *class* Framework<T *extends* Bean> *implements* Serializable { >>> >>> >>> >>> *public *DataSet<T> doCoolStuff(*final* DataSet<T> inputDataset) { >>> >>> // Group lines according to a key >>> >>> *final* UnsortedGrouping<YT> groupe = inputDataset.groupBy(*new* >>> KeySelector<T, Key>() { >>> >>> @Override >>> >>> *public* Key getKey(T record) { >>> >>> *return* record.getKey(); >>> >>> } >>> >>> }); >>> >>> (…) >>> >>> } >>> >>> } >>> >>> >>> >>> With Spark, a mapToPair works fine because all I have to do is >>> implements correctly hashCode() and equals() on my Key type. >>> >>> With Flink, Key is not recognized as a POJO object (well it is not) and >>> that does not work. >>> >>> >>> >>> I have tried to expose something like *public* Tuple getKeyAsTuple(); in >>> Key but Flink does not accept generic Tuples. I’ve tried to >>> parameterize my Tuple but Flink does not know how to infer >>> >>> the generic type value. >>> >>> >>> >>> So I’m wondering what is the best way to implement it. >>> >>> For now I have exposed something like *public* String getKeyAsString(); and >>> turned my generic treatment into : >>> >>> *final* UnsortedGrouping<YT> groupe = inputDataset.groupBy(*new* >>> KeySelector<T, String>() { >>> >>> @Override >>> >>> *public* String getKey(T record) { >>> >>> *return* record.getKey().getKeyAsString(); >>> >>> } >>> >>> }); >>> >>> But that “ASCII” representation is suboptimal. >>> >>> >>> >>> I thought of passing a key to tuple conversion lambda upon creation of >>> the Framework class but that would be boiler-plate code on the user’s end, >>> which I’m not fond of. >>> >>> >>> >>> So my questions are : >>> >>> - Is there a smarter way to do this ? >>> >>> - What kind of objects can be passed as a Key ? Is there an >>> Interface to respect ? >>> >>> - In the worst case, is byte[] ok as a Key ? (I can code the >>> serialization on the framework side…) >>> >>> >>> >>> >>> >>> Best regards, >>> >>> Arnaud >>> >>> >>> >>> >>> ------------------------------ >>> >>> >>> L'intégrité de ce message n'étant pas assurée sur internet, la société >>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces >>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si >>> vous n'êtes pas destinataire de ce message, merci de le détruire et >>> d'avertir l'expéditeur. >>> >>> The integrity of this message cannot be guaranteed on the Internet. The >>> company that sent this message cannot therefore be held liable for its >>> content nor attachments. Any unauthorized use or dissemination is >>> prohibited. If you are not the intended recipient of this message, then >>> please delete it and notify the sender. >>> >>> >>> >>> >>> >> >> >