I apologize, that was the wrong link. Here’s where the exception is thrown: 
https://github.com/apache/flink/blob/release-1.3.1-rc2/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java#L329-L331
 
<https://github.com/apache/flink/blob/release-1.3.1-rc2/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java#L329-L331>

> On Jul 10, 2017, at 11:54 AM, Joshua Griffith <jgriff...@campuslabs.com> 
> wrote:
> 
> Hello Fabian,
> 
> Thank you for your response. I tried your recommendation but I’m getting the 
> same issue. Here’s the altered MakeRow MapFunction I tried:
> 
>   class MakeRow extends MapFunction[(Integer, Integer), Row] {
>     implicit val rowType: TypeInformation[Row] = new RowTypeInfo(
>       Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.INT_TYPE_INFO),
>       Array("id", "value")
>     )
>     override def map(tuple: (Integer, Integer)): Row = tuple match {
>       case (value, id) => Row.of(id, value)
>     }
>   }
> 
> In stepping through the code execution, it looks like the problem is that 
> Row.isKeyType() returns false 
> <https://github.com/apache/flink/blob/release-1.3.1-rc2/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java#L98-L100>.
>  Any recommendations?
> 
> Thanks,
> 
> Joshua
> 
> 
>> On Jul 10, 2017, at 11:42 AM, Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi Joshua,
>> 
>> thanks for reporting this issue. You code is fine but IMO there is a bug in 
>> the Scala DataSet API.
>> It simply does not respect the type information provided by the 
>> ResultTypeQueryable[Row] interface and defaults to a GenericType.
>> 
>> I think this should be fix. I'll open a JIRA issue for that.
>> 
>> You can explicitly declare types with implicits if you put the following 
>> lines above the lines in which you apply the rowFn on the DataSet.
>> 
>> implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
>>   Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
>> BasicTypeInfo.INT_TYPE_INFO),
>>   Array("id", "value")
>> )
>> When you do this, you can also remove move the ResultTypeQueryable interface 
>> from the MapFunction.
>> 
>> Cheers, Fabian
>> 
>> 
>> 
>> 2017-07-10 18:10 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com 
>> <mailto:jgriff...@campuslabs.com>>:
>> Thank you for your response Nico. Below is a simple case where I’m trying to 
>> join on Row fields:
>> 
>> package com.github.hadronzoo.rowerror
>> 
>> import org.apache.flink.api.common.functions.MapFunction
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
>> import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
>> import org.apache.flink.api.scala._
>> import org.apache.flink.types.Row
>> 
>> object Main {
>> 
>>   class MakeRow extends MapFunction[(Integer, Integer), Row] with 
>> ResultTypeQueryable[Row] {
>>     override def map(tuple: (Integer, Integer)): Row = tuple match {
>>       case (value, id) => Row.of(id, value)
>>     }
>> 
>>     override def getProducedType: TypeInformation[Row] =
>>       new RowTypeInfo(
>>         Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
>> BasicTypeInfo.INT_TYPE_INFO),
>>         Array("id", "value")
>>       )
>>   }
>> 
>>   def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple 
>> match { case (a, b) => (a, b) }
>> 
>>   def main(args: Array[String]): Unit = {
>>     val env = ExecutionEnvironment.createLocalEnvironment()
>>     val rowFn = new MakeRow
>> 
>>     val ints = 0 until 1000
>>     val evenIntegers = (ints filter (_ % 2 == 
>> 0)).zipWithIndex.map(integerTuple)
>>     val oddIntegers = (ints filter (_ % 2 == 
>> 1)).zipWithIndex.map(integerTuple)
>> 
>>     val evenRows = env.fromCollection(evenIntegers).map(rowFn)
>>     val oddRows = env.fromCollection(oddIntegers).map(rowFn)
>> 
>>     evenRows.join(oddRows).where("id").equalTo("id").print()
>>   }
>> }
>> 
>> Executing the above yields the following error:
>> 
>> Exception in thread "main" 
>> org.apache.flink.api.common.InvalidProgramException: This type 
>> (GenericType<org.apache.flink.types.Row>) cannot be used as key.
>>      at 
>> org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
>>      at 
>> org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
>>      at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
>>      at com.github.hadronzoo.rowerror.Main.main(Main.scala)
>> 
>> For my application I only have TypeInformation at runtime (before the 
>> execution graph is built). Is it possible to use Row fields in join 
>> operations or is there an error with my implementation?
>> 
>> Thanks for your help,
>> 
>> Joshua
>> 
>>> On Jul 10, 2017, at 9:09 AM, Nico Kruber <n...@data-artisans.com 
>>> <mailto:n...@data-artisans.com>> wrote:
>>> 
>>> Can you show a minimal example of the query you are trying to run?
>>> Maybe Timo or Fabian (cc'd) can help.
>>> 
>>> 
>>> Nico
>>> 
>>> On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
>>>> Hello,
>>>> 
>>>> When using nested field expressions like “Account.Id" with nested rows, I
>>>> get the following error, “This type
>>>> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
>>>> a way to make nested field expressions work with nested rows?
>>> 
>>>> Thanks,
>>>> 
>>>> Joshua
>>> 
>> 
>> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to