Hey Hequn & Fabian, It seems like i found a reasonable way using both Row and my own TypeInfo:
- I started by just using my own TypeInfo using your example. So i'm using a serializer which is basically a compound of the original event type serializer as well as a string array serializer (used BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO.createSerializer for it). the rest is pretty much boiler plate similar to the MapSerializer (as well as the snapshot class). I extended TypeInformation<TaggedEvent<T>> and the downside was that unlike Row/Pojo i didnt have the field names for the SQL Query so i went back looking at Row. - I realized that i can use Row without knowing the internals of the origin event class in advance basically mapping to Row with an explicit RowTypeInfo as follows: final TypeInformation<Row> rowTypeInformation = Types.ROW(new String[]{"originalEvent", "tags"}, new TypeInformation[]{dataStream.getType(), BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO}); final SingleOutputStreamOperator<Row> mappedStream = this.dataStream.map((MapFunction<T, Row>) value -> { final Row row = new Row(2); row.setField(0, value); row.setField(1, new String[0]); return row; }).returns(rowTypeInformation); tableEnvironment.registerDataStream(ORIGIN_EVENT_STREAM_TABLE_NAME, mappedStream); So that worked well. - Giving last thought to it i didnt want to leave my users with a Row stream but indeed with a Pojo for it, so i ended up keeping the TaggedEventTypeInfo as an additional mapping past the SQL query. Therefore my process is: 1. Map DataStream<T> to Row, register as a table (code above) 2. Run SQL over the table 3. Transform result to an append stream of Row (trying to directly convert to my new type info results with *Requested result type is an atomic type but result[..] has more or less than a single field*. not sure if there's a better way 4. Map the Row object to TaggedEvent<T> explicitly implementing ResultTypeQueryable.getProducedType to use the TaggedEventTypeInfo Would love to get feedback on that, whether that seems like the best solution or can it be more efficient. Thanks! Shahar On Mon, May 13, 2019 at 9:25 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote: > Thanks for looking into it Hequn! > > I do not have a requirement to use TaggedEvent vs Row. But correct me if I > am wrong, creating a Row will require me knowing the internal fields of the > original event in compile time, is that correct? I do have a requirement to > support a generic original event type, so unless i can map T to a Row > without knowing the object fields, i wont be able to use it. Can you > confirm that? > I will look at MapView and let you know, thanks again! > > On Sun, May 12, 2019 at 1:30 AM Hequn Cheng <chenghe...@gmail.com> wrote: > >> Hi shahar, >> >> An easier way to solve your problem is to use a Row to store your data >> instead of the `TaggedEvent `. I think this is what Fabian means. In this >> way, you don't have to define the user-defined TypeFactory and use the Row >> type directly. Take `TaggedEvent<Car>` as an example, the corresponding row >> type is `Types.ROW(Types.ROW(Types.INT, Types.STRING), >> Types.OBJECT_ARRAY(Types.STRING))` in which Types is >> `org.apache.flink.table.api.Types`. Furthermore, row type is also easier to >> cooperate with Table API & SQL. >> >> However, if the `TaggedEvent` is a must-have for you, you can take a look >> at the MapView[1] as an example of how to define a user-defined table >> factory. >> >> Best, Hequn >> >> [1] >> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala >> >> On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky < >> shahar.kobrin...@gmail.com> wrote: >> >>> Hi Fabian, >>> >>> I have a trouble implementing the type for this operation, i wonder how >>> i can do that. >>> So given generic type T i want to create a TypeInformation for: >>> class TaggedEvent<T> { >>> String[] tags >>> T originalEvent >>> } >>> >>> Was trying a few different things but not sure how to do it. >>> Doesn't seem like i can use TypeHint as i need to know the actual >>> generics class for it, right? >>> Do i need a TaggedEventTypeFactory? If so, how do i create the >>> TaggedEventTypeInfo for it? do you have an example for it? was trying to >>> follow this[1] but doesn't seem to really work. I'm getting null as my >>> genericParameter for some reason. Also, how would you create the serializer >>> for the type info? can i reuse some builtin Kryo functionality? >>> >>> Thanks >>> >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer >>> >>> >>> >>> >>> >>> On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky < >>> shahar.kobrin...@gmail.com> wrote: >>> >>>> Thanks Fabian, >>>> >>>> I'm looking into a way to enrich it without having to know the internal >>>> fields of the original event type. >>>> Right now what I managed to do is to map Car into a TaggedEvent<Car> >>>> prior to the SQL query, tags being empty, then run the SQL query selecting >>>> *origin, >>>> enrich(.. ) as tags* >>>> Not sure there's a better way but i guess that works >>>> >>>> >>>> >>>> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> you can use the value construction function ROW to create a nested row >>>>> (or object). >>>>> However, you have to explicitly reference all attributes that you will >>>>> add. >>>>> >>>>> If you have a table Cars with (year, modelName) a query could look >>>>> like this: >>>>> >>>>> SELECT >>>>> ROW(year, modelName) AS car, >>>>> enrich(year, modelName) AS tags >>>>> FROM Cars; >>>>> >>>>> Handling many attributes is always a bit painful in SQL. >>>>> There is an effort to make the Table API easier to use for these use >>>>> cases (for example Column Operations [1]). >>>>> >>>>> Best, Fabian >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-11967 >>>>> >>>>> >>>>> >>>>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 < >>>>> shahar.kobrin...@gmail.com>: >>>>> >>>>>> Just to be more clear on my goal - >>>>>> Im trying to enrich the incoming stream with some meaningful tags >>>>>> based on >>>>>> conditions from the event itself. >>>>>> So the input stream could be an event looks like: >>>>>> Class Car { >>>>>> int year; >>>>>> String modelName; >>>>>> } >>>>>> >>>>>> i will have a config that are defining tags as: >>>>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0" >>>>>> >>>>>> So ideally my output will be in the structure of >>>>>> >>>>>> Class TaggedEvent<Car> { >>>>>> Car origin; >>>>>> String[] tags; >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Sent from: >>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>>>> >>>>>