Hi shahar, An easier way to solve your problem is to use a Row to store your data instead of the `TaggedEvent `. I think this is what Fabian means. In this way, you don't have to define the user-defined TypeFactory and use the Row type directly. Take `TaggedEvent<Car>` as an example, the corresponding row type is `Types.ROW(Types.ROW(Types.INT, Types.STRING), Types.OBJECT_ARRAY(Types.STRING))` in which Types is `org.apache.flink.table.api.Types`. Furthermore, row type is also easier to cooperate with Table API & SQL.
However, if the `TaggedEvent` is a must-have for you, you can take a look at the MapView[1] as an example of how to define a user-defined table factory. Best, Hequn [1] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala On Sat, May 11, 2019 at 1:20 AM Shahar Cizer Kobrinsky < shahar.kobrin...@gmail.com> wrote: > Hi Fabian, > > I have a trouble implementing the type for this operation, i wonder how i > can do that. > So given generic type T i want to create a TypeInformation for: > class TaggedEvent<T> { > String[] tags > T originalEvent > } > > Was trying a few different things but not sure how to do it. > Doesn't seem like i can use TypeHint as i need to know the actual generics > class for it, right? > Do i need a TaggedEventTypeFactory? If so, how do i create the > TaggedEventTypeInfo for it? do you have an example for it? was trying to > follow this[1] but doesn't seem to really work. I'm getting null as my > genericParameter for some reason. Also, how would you create the serializer > for the type info? can i reuse some builtin Kryo functionality? > > Thanks > > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#creating-a-typeinformation-or-typeserializer > > > > > > On Thu, May 9, 2019 at 9:08 AM Shahar Cizer Kobrinsky < > shahar.kobrin...@gmail.com> wrote: > >> Thanks Fabian, >> >> I'm looking into a way to enrich it without having to know the internal >> fields of the original event type. >> Right now what I managed to do is to map Car into a TaggedEvent<Car> >> prior to the SQL query, tags being empty, then run the SQL query selecting >> *origin, >> enrich(.. ) as tags* >> Not sure there's a better way but i guess that works >> >> >> >> On Thu, May 9, 2019 at 12:50 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> you can use the value construction function ROW to create a nested row >>> (or object). >>> However, you have to explicitly reference all attributes that you will >>> add. >>> >>> If you have a table Cars with (year, modelName) a query could look like >>> this: >>> >>> SELECT >>> ROW(year, modelName) AS car, >>> enrich(year, modelName) AS tags >>> FROM Cars; >>> >>> Handling many attributes is always a bit painful in SQL. >>> There is an effort to make the Table API easier to use for these use >>> cases (for example Column Operations [1]). >>> >>> Best, Fabian >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-11967 >>> >>> >>> >>> Am Do., 9. Mai 2019 um 01:44 Uhr schrieb shkob1 < >>> shahar.kobrin...@gmail.com>: >>> >>>> Just to be more clear on my goal - >>>> Im trying to enrich the incoming stream with some meaningful tags based >>>> on >>>> conditions from the event itself. >>>> So the input stream could be an event looks like: >>>> Class Car { >>>> int year; >>>> String modelName; >>>> } >>>> >>>> i will have a config that are defining tags as: >>>> "NiceCar" -> "year > 2015 AND position("Luxury" in modelName) > 0" >>>> >>>> So ideally my output will be in the structure of >>>> >>>> Class TaggedEvent<Car> { >>>> Car origin; >>>> String[] tags; >>>> } >>>> >>>> >>>> >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>>