It appears that even when I pass id through the map function and join back
with the original table, it does not seem to think that the id passed
through map is a unique key. Is there any way to solve this while still
preserving the primary key?

On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <r...@remind101.com> wrote:

> Even odder, if I pull the constructor of the function into its own
> variable it "works" (though it appears that map only passes through the
> fields mapped over which means I'll need an additional join, though now I
> think I'm on the right path).
>
> I.e.
> def splatFruits(table: Table, columnPrefix: String): Table = {
>   val func = new SplatFruitsFunc()
>   return table
>     .map(func($"fruits"))
>     .as(
>       s"${columnPrefix}_has_orange",
>       s"${columnPrefix}_has_banana",
>       s"${columnPrefix}_has_apple",
>       s"${columnPrefix}_has_watermelon"
>    )
>    .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
> }
>
> ends up giving me the following error instead
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Cannot resolve field [fruits], input field
> list:[prefix_has_orange, prefix_has_banana, prefix_has_apple,
> prefix_has_watermelon].
>
> which implies I'll need to join back to the original table like I was
> doing with the leftOuterJoinLateral originally I suppose.
>
>
> On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <r...@remind101.com> wrote:
>
>> Looks like `as` needed to move outside of where it was before to fix that
>> error. Though now I'm receiving
>> >org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: Aliasing more fields than we actually have.
>>
>> Example code now:
>>
>> // table will always have pk id
>> def splatFruits(table: Table, columnPrefix: String): Table = {
>> return table
>>  .map(
>>    new SplatFruitsFunc()(
>>      $"fruits"
>>    )
>>  )
>>  .as(
>>   s"${columnPrefix}_has_orange",
>>   s"${columnPrefix}_has_banana",
>>   s"${columnPrefix}_has_apple",
>>   s"${columnPrefix}_has_watermelon"
>>  )
>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>> }
>>
>> class SplatFruitsFunc extends ScalarFunction {
>>   def eval(fruits: Array[String]): Row = {
>>     val hasOrange: java.lang.Boolean = fruits.contains("Orange")
>>     val hasBanana: java.lang.Boolean = fruits.contains("Banana")
>>     val hasApple: java.lang.Boolean = fruits.contains("Apple")
>>     val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
>>     Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
>>   }
>>
>>   override def getResultType(signature: Array[Class[_]]):
>> TypeInformation[_] =
>>     Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN)
>> }
>>
>> which afaict correctly follows the documentation.
>>
>> Anything here stand out?
>>
>> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <r...@remind101.com> wrote:
>>
>>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
>>> leftOuterJoinLateral to a map and I'm receiving:
>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Only a scalar function can be used in the map
>>> operator.
>>> which seems odd because documentation says
>>>
>>> > Performs a map operation with a user-defined scalar function or
>>> built-in scalar function. The output will be flattened if the output type
>>> is a composite type.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>>>
>>> Shouldn't this work as an alternative?
>>>
>>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <r...@remind101.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a TableFunction and wherever it is applied with a
>>>> leftOuterJoinLateral, my table loses any inference of there being a primary
>>>> key. I see this because all subsequent joins end up with "NoUniqueKey" when
>>>> I know a primary key of id should exist.
>>>>
>>>> I'm wondering if this is expected behavior and if it's possible to tell
>>>> a table directly what the primary key should be?
>>>>
>>>>
>>>> To demonstrate my example:
>>>> My table function checks if an element of a certain type is in a string
>>>> array, and depending on whether or not it is there, it appends a column
>>>> with value true or false. For example, if array "fruits" which could
>>>> possibly contain orange, banana, apple, and watermelon on a row contains
>>>> only `["orange", "apple"]` then it will append `has_orange: true,
>>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to
>>>> the row. This example is essentially the same as my code, outside of having
>>>> a much larger set of keys and not dealing with fruits.
>>>>
>>>> Example code:
>>>>
>>>> // table will always have pk id
>>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>>> return table
>>>>  .leftOuterJoinLateral(
>>>>    new SplatFruitsFunc()(
>>>>      $"fruits"
>>>>    ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
>>>> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>>>>  )
>>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>>> }
>>>>
>>>> @FunctionHint(
>>>>   output = new DataTypeHint(
>>>>     "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
>>>> has_watermelon BOOLEAN)"
>>>>   )
>>>> )
>>>> class SplatFruitsFunc
>>>>     extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>>>>
>>>>   def eval(fruits: Array[String]): Unit = {
>>>>     val hasOrange: java.lang.Boolean = fruits.contains("orange")
>>>>     val hasBanana: java.lang.Boolean = fruits.contains("banana")
>>>>     val hasApple: java.lang.Boolean = fruits.contains("apple")
>>>>     val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
>>>>     collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>>   }
>>>> }
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to