It appears that even when I pass id through the map function and join back with the original table, it does not seem to think that the id passed through map is a unique key. Is there any way to solve this while still preserving the primary key?
On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <r...@remind101.com> wrote: > Even odder, if I pull the constructor of the function into its own > variable it "works" (though it appears that map only passes through the > fields mapped over which means I'll need an additional join, though now I > think I'm on the right path). > > I.e. > def splatFruits(table: Table, columnPrefix: String): Table = { > val func = new SplatFruitsFunc() > return table > .map(func($"fruits")) > .as( > s"${columnPrefix}_has_orange", > s"${columnPrefix}_has_banana", > s"${columnPrefix}_has_apple", > s"${columnPrefix}_has_watermelon" > ) > .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) > } > > ends up giving me the following error instead > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: Cannot resolve field [fruits], input field > list:[prefix_has_orange, prefix_has_banana, prefix_has_apple, > prefix_has_watermelon]. > > which implies I'll need to join back to the original table like I was > doing with the leftOuterJoinLateral originally I suppose. > > > On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <r...@remind101.com> wrote: > >> Looks like `as` needed to move outside of where it was before to fix that >> error. Though now I'm receiving >> >org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: Aliasing more fields than we actually have. >> >> Example code now: >> >> // table will always have pk id >> def splatFruits(table: Table, columnPrefix: String): Table = { >> return table >> .map( >> new SplatFruitsFunc()( >> $"fruits" >> ) >> ) >> .as( >> s"${columnPrefix}_has_orange", >> s"${columnPrefix}_has_banana", >> s"${columnPrefix}_has_apple", >> s"${columnPrefix}_has_watermelon" >> ) >> .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) >> } >> >> class SplatFruitsFunc extends ScalarFunction { >> def eval(fruits: Array[String]): Row = { >> val hasOrange: java.lang.Boolean = fruits.contains("Orange") >> val hasBanana: java.lang.Boolean = fruits.contains("Banana") >> val hasApple: java.lang.Boolean = fruits.contains("Apple") >> val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon") >> Row.of(hasOrange, hasBanana, hasApple, hasWatermelon) >> } >> >> override def getResultType(signature: Array[Class[_]]): >> TypeInformation[_] = >> Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN) >> } >> >> which afaict correctly follows the documentation. >> >> Anything here stand out? >> >> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <r...@remind101.com> wrote: >> >>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and >>> leftOuterJoinLateral to a map and I'm receiving: >>> > org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error: Only a scalar function can be used in the map >>> operator. >>> which seems odd because documentation says >>> >>> > Performs a map operation with a user-defined scalar function or >>> built-in scalar function. The output will be flattened if the output type >>> is a composite type. >>> >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations >>> >>> Shouldn't this work as an alternative? >>> >>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <r...@remind101.com> wrote: >>> >>>> Hello, >>>> >>>> I have a TableFunction and wherever it is applied with a >>>> leftOuterJoinLateral, my table loses any inference of there being a primary >>>> key. I see this because all subsequent joins end up with "NoUniqueKey" when >>>> I know a primary key of id should exist. >>>> >>>> I'm wondering if this is expected behavior and if it's possible to tell >>>> a table directly what the primary key should be? >>>> >>>> >>>> To demonstrate my example: >>>> My table function checks if an element of a certain type is in a string >>>> array, and depending on whether or not it is there, it appends a column >>>> with value true or false. For example, if array "fruits" which could >>>> possibly contain orange, banana, apple, and watermelon on a row contains >>>> only `["orange", "apple"]` then it will append `has_orange: true, >>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to >>>> the row. This example is essentially the same as my code, outside of having >>>> a much larger set of keys and not dealing with fruits. >>>> >>>> Example code: >>>> >>>> // table will always have pk id >>>> def splatFruits(table: Table, columnPrefix: String): Table = { >>>> return table >>>> .leftOuterJoinLateral( >>>> new SplatFruitsFunc()( >>>> $"fruits" >>>> ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", >>>> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon") >>>> ) >>>> .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) >>>> } >>>> >>>> @FunctionHint( >>>> output = new DataTypeHint( >>>> "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, >>>> has_watermelon BOOLEAN)" >>>> ) >>>> ) >>>> class SplatFruitsFunc >>>> extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] { >>>> >>>> def eval(fruits: Array[String]): Unit = { >>>> val hasOrange: java.lang.Boolean = fruits.contains("orange") >>>> val hasBanana: java.lang.Boolean = fruits.contains("banana") >>>> val hasApple: java.lang.Boolean = fruits.contains("apple") >>>> val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon") >>>> collect(hasOrange, hasBanana, hasApple, hasWatermelon) >>>> } >>>> } >>>> >>>> Thanks! >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>