I still have the same problem even when i extended GValue with comparable. I think that the problem might be in the fact that Array[GValue] are not compatible and not the GValues but i do not know how to fix it in flink, may be some implicit ordering would work (and why the this field, Array[GValue], is compared). I appreciate every and any help. I still do not understand why this was not problem in previous versions. Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>) cannot be used as key. at org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:308) at org.apache.flink.api.java.operators.DistinctOperator.<init>(DistinctOperator.java:56) at org.apache.flink.api.scala.DataSet.distinct(DataSet.scala:740) at it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricMap4$.execute(GenometricMap4.scala:71) sealed trait GValue extends Serializable with Comparable[GValue] with Ordered[GValue]{ def compare(o : GValue) : Int = { o match { case GDouble(v) => this.asInstanceOf[GDouble].v compare v case GString(v) => this.asInstanceOf[GString].v compare v case GInt(v) => this.asInstanceOf[GInt].v compare v case _ => 0 } } def equal(o : GValue) : Boolean = { o match { case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { case e : Throwable => false } case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => false } case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch { case e : Throwable => false } case GNull() => o.isInstanceOf[GNull] case _ => false } } override def compareTo(o: GValue): Int = { o match { case GInt(value) => try{value.compareTo(o.asInstanceOf[GInt].v)} catch { case e : Throwable => 0 } case GDouble(value) => try{value.compareTo(o.asInstanceOf[GDouble].v)} catch { case e : Throwable => 0 } case GString(value) => try{value.compareTo(o.asInstanceOf[GString].v)} catch { case e : Throwable => 0 } case GNull() => 0 case _ => 0 } } } ----------------------------------------------------------------- Abdulrahman Kaitoua ----------------------------------------------------------------- Ph.D. Candidate at Politecnico Di Milano
> Subject: Re: Serialisation problem > From: aljos...@apache.org > Date: Mon, 14 Dec 2015 10:42:22 +0100 > To: user@flink.apache.org > > Hi, > the problem could be that GValue is not Comparable. Could you try making it > extend Comparable (The Java Comparable). > > Cheers, > Aljoscha > > On 12 Dec 2015, at 20:43, Robert Metzger <rmetz...@apache.org> wrote: > > > > Hi, > > > > Can you check the log output in your IDE or the log files of the Flink > > client (./bin/flink). The TypeExtractor is logging why a POJO is not > > recognized as a POJO. > > > > The log statements look like this: > > > > 20:42:43,465 INFO org.apache.flink.api.java.typeutils.TypeExtractor > > - class com.dataartisans.debug.MyPojo must have a default constructor > > to be used as a POJO. > > > > > > > > On Thu, Dec 10, 2015 at 11:24 PM, Abdulrahman kaitoua > > <abdulrahman.kait...@outlook.com> wrote: > > > > > > Hello, > > > > I would like to hive directions to make my code work again (thanks in > > advance). My code used to work on versions equal or less than 9.1 but when > > i included 10 or 10.1 i got the following exception. > > > > This type > > (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>) > > cannot be used as key > > > > I understood that it is related to the serialisation of objects. I tried to > > follow the POJO building directions in > > https://cwiki.apache.org/confluence/display/FLINK/Type+System,+Type+Extraction,+Serialization > > with no luck to make it work. > > > > my dataset contains a set of tuples as key and one array of GValues, this > > is a snapshot of the GValue class. > > > > > > sealed trait GValue extends Serializable with Ordered[GValue]{ > > def compare(o : GValue) : Int = { > > o match { > > case GDouble(v) => this.asInstanceOf[GDouble].v compare v > > case GString(v) => this.asInstanceOf[GString].v compare v > > case GInt(v) => this.asInstanceOf[GInt].v compare v > > case GNull() => 0 > > } > > } > > def equal(o : GValue) : Boolean = { > > o match { > > case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch { > > case e : Throwable => false } > > case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} > > catch { case e : Throwable => false } > > case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} > > catch { case e : Throwable => false } > > case GNull() => o.isInstanceOf[GNull] > > case _ => false > > } > > } > > } > > > > /** > > * Represents a @GValue that contains an integer > > * @deprecated > > * @param v > > */ > > case class GInt(v: Int) extends GValue{ > > def this() = this(0) > > override def toString() : String = { > > v.toString > > } > > override def equals(other : Any) : Boolean = { > > other match { > > case GInt(value) => value.equals(v) > > case _ => false > > } > > } > > } > > > > /** > > * Represents a @GValue that contains a number as a @Double > > * @param v number > > */ > > case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{ > > > > def this() = this(0.0) > > > > override def equals(other : Any) : Boolean = { > > other match { > > case GDouble(value) => value.equals(v) > > case _ => false > > } > > } > > } > > > > /** > > * Represents a @GValue that contains a @String > > * @param v string > > */ > > case class GString(v: String) extends GValue{ > > def this() = this(".") > > override def toString() : String = { > > v.toString > > } > > override def equals(other : Any) : Boolean = { > > other match { > > case GString(value) => value.equals(v) > > case _ => false > > } > > } > > } > > > > Regards, > > > > ----------------------------------------------------------------- > > Abdulrahman Kaitoua > > ----------------------------------------------------------------- > > Ph.D. Candidate at Politecnico Di Milano > > > > >