Hello Fabian,

Thank you for your response. I tried your recommendation but I’m getting the 
same issue. Here’s the altered MakeRow MapFunction I tried:

  class MakeRow extends MapFunction[(Integer, Integer), Row] {
    implicit val rowType: TypeInformation[Row] = new RowTypeInfo(
      Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO),
      Array("id", "value")
    )
    override def map(tuple: (Integer, Integer)): Row = tuple match {
      case (value, id) => Row.of(id, value)
    }
  }

In stepping through the code execution, it looks like the problem is that 
Row.isKeyType() returns false 
<https://github.com/apache/flink/blob/release-1.3.1-rc2/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java#L98-L100>.
 Any recommendations?

Thanks,

Joshua


> On Jul 10, 2017, at 11:42 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi Joshua,
> 
> thanks for reporting this issue. You code is fine but IMO there is a bug in 
> the Scala DataSet API.
> It simply does not respect the type information provided by the 
> ResultTypeQueryable[Row] interface and defaults to a GenericType.
> 
> I think this should be fix. I'll open a JIRA issue for that.
> 
> You can explicitly declare types with implicits if you put the following 
> lines above the lines in which you apply the rowFn on the DataSet.
> 
> implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
>   Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.INT_TYPE_INFO),
>   Array("id", "value")
> )
> When you do this, you can also remove move the ResultTypeQueryable interface 
> from the MapFunction.
> 
> Cheers, Fabian
> 
> 
> 
> 2017-07-10 18:10 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com 
> <mailto:jgriff...@campuslabs.com>>:
> Thank you for your response Nico. Below is a simple case where I’m trying to 
> join on Row fields:
> 
> package com.github.hadronzoo.rowerror
> 
> import org.apache.flink.api.common.functions.MapFunction
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
> import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
> import org.apache.flink.api.scala._
> import org.apache.flink.types.Row
> 
> object Main {
> 
>   class MakeRow extends MapFunction[(Integer, Integer), Row] with 
> ResultTypeQueryable[Row] {
>     override def map(tuple: (Integer, Integer)): Row = tuple match {
>       case (value, id) => Row.of(id, value)
>     }
> 
>     override def getProducedType: TypeInformation[Row] =
>       new RowTypeInfo(
>         Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.INT_TYPE_INFO),
>         Array("id", "value")
>       )
>   }
> 
>   def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match 
> { case (a, b) => (a, b) }
> 
>   def main(args: Array[String]): Unit = {
>     val env = ExecutionEnvironment.createLocalEnvironment()
>     val rowFn = new MakeRow
> 
>     val ints = 0 until 1000
>     val evenIntegers = (ints filter (_ % 2 == 
> 0)).zipWithIndex.map(integerTuple)
>     val oddIntegers = (ints filter (_ % 2 == 
> 1)).zipWithIndex.map(integerTuple)
> 
>     val evenRows = env.fromCollection(evenIntegers).map(rowFn)
>     val oddRows = env.fromCollection(oddIntegers).map(rowFn)
> 
>     evenRows.join(oddRows).where("id").equalTo("id").print()
>   }
> }
> 
> Executing the above yields the following error:
> 
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType<org.apache.flink.types.Row>) cannot be used as key.
>       at 
> org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
>       at 
> org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
>       at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
>       at com.github.hadronzoo.rowerror.Main.main(Main.scala)
> 
> For my application I only have TypeInformation at runtime (before the 
> execution graph is built). Is it possible to use Row fields in join 
> operations or is there an error with my implementation?
> 
> Thanks for your help,
> 
> Joshua
> 
>> On Jul 10, 2017, at 9:09 AM, Nico Kruber <n...@data-artisans.com 
>> <mailto:n...@data-artisans.com>> wrote:
>> 
>> Can you show a minimal example of the query you are trying to run?
>> Maybe Timo or Fabian (cc'd) can help.
>> 
>> 
>> Nico
>> 
>> On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
>>> Hello,
>>> 
>>> When using nested field expressions like “Account.Id" with nested rows, I
>>> get the following error, “This type
>>> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
>>> a way to make nested field expressions work with nested rows?
>> 
>>> Thanks,
>>> 
>>> Joshua
>> 
> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to