Hello Fabian, Thank you for your response. I tried your recommendation but I’m getting the same issue. Here’s the altered MakeRow MapFunction I tried:
class MakeRow extends MapFunction[(Integer, Integer), Row] { implicit val rowType: TypeInformation[Row] = new RowTypeInfo( Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), Array("id", "value") ) override def map(tuple: (Integer, Integer)): Row = tuple match { case (value, id) => Row.of(id, value) } } In stepping through the code execution, it looks like the problem is that Row.isKeyType() returns false <https://github.com/apache/flink/blob/release-1.3.1-rc2/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java#L98-L100>. Any recommendations? Thanks, Joshua > On Jul 10, 2017, at 11:42 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Joshua, > > thanks for reporting this issue. You code is fine but IMO there is a bug in > the Scala DataSet API. > It simply does not respect the type information provided by the > ResultTypeQueryable[Row] interface and defaults to a GenericType. > > I think this should be fix. I'll open a JIRA issue for that. > > You can explicitly declare types with implicits if you put the following > lines above the lines in which you apply the rowFn on the DataSet. > > implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO), > Array("id", "value") > ) > When you do this, you can also remove move the ResultTypeQueryable interface > from the MapFunction. > > Cheers, Fabian > > > > 2017-07-10 18:10 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com > <mailto:jgriff...@campuslabs.com>>: > Thank you for your response Nico. Below is a simple case where I’m trying to > join on Row fields: > > package com.github.hadronzoo.rowerror > > import org.apache.flink.api.common.functions.MapFunction > import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} > import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} > import org.apache.flink.api.scala._ > import org.apache.flink.types.Row > > object Main { > > class MakeRow extends MapFunction[(Integer, Integer), Row] with > ResultTypeQueryable[Row] { > override def map(tuple: (Integer, Integer)): Row = tuple match { > case (value, id) => Row.of(id, value) > } > > override def getProducedType: TypeInformation[Row] = > new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO), > Array("id", "value") > ) > } > > def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple match > { case (a, b) => (a, b) } > > def main(args: Array[String]): Unit = { > val env = ExecutionEnvironment.createLocalEnvironment() > val rowFn = new MakeRow > > val ints = 0 until 1000 > val evenIntegers = (ints filter (_ % 2 == > 0)).zipWithIndex.map(integerTuple) > val oddIntegers = (ints filter (_ % 2 == > 1)).zipWithIndex.map(integerTuple) > > val evenRows = env.fromCollection(evenIntegers).map(rowFn) > val oddRows = env.fromCollection(oddIntegers).map(rowFn) > > evenRows.join(oddRows).where("id").equalTo("id").print() > } > } > > Executing the above yields the following error: > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: This type > (GenericType<org.apache.flink.types.Row>) cannot be used as key. > at > org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330) > at > org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72) > at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36) > at com.github.hadronzoo.rowerror.Main.main(Main.scala) > > For my application I only have TypeInformation at runtime (before the > execution graph is built). Is it possible to use Row fields in join > operations or is there an error with my implementation? > > Thanks for your help, > > Joshua > >> On Jul 10, 2017, at 9:09 AM, Nico Kruber <n...@data-artisans.com >> <mailto:n...@data-artisans.com>> wrote: >> >> Can you show a minimal example of the query you are trying to run? >> Maybe Timo or Fabian (cc'd) can help. >> >> >> Nico >> >> On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote: >>> Hello, >>> >>> When using nested field expressions like “Account.Id" with nested rows, I >>> get the following error, “This type >>> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there >>> a way to make nested field expressions work with nested rows? >> >>> Thanks, >>> >>> Joshua >> > >
signature.asc
Description: Message signed with OpenPGP