I still have the same problem even when i extended GValue with comparable. I
think that the problem might be in the fact that Array[GValue] are not
compatible and not the GValues but i do not know how to fix it in flink, may be
some implicit ordering would work (and why the this field, Array[GValue], is
compared). I appreciate every and any help.
I still do not understand why this was not problem in previous versions.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException:
This type
(ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>)
cannot be used as key. at
org.apache.flink.api.java.operators.Keys$ExpressionKeys.<init>(Keys.java:308)
at
org.apache.flink.api.java.operators.DistinctOperator.<init>(DistinctOperator.java:56)
at org.apache.flink.api.scala.DataSet.distinct(DataSet.scala:740)
at
it.polimi.genomics.flink.FlinkImplementation.operator.region.GenometricMap4$.execute(GenometricMap4.scala:71)
sealed trait GValue extends Serializable with Comparable[GValue] with
Ordered[GValue]{
def compare(o : GValue) : Int = {
o match {
case GDouble(v) => this.asInstanceOf[GDouble].v compare v
case GString(v) => this.asInstanceOf[GString].v compare v
case GInt(v) => this.asInstanceOf[GInt].v compare v
case _ => 0
}
}
def equal(o : GValue) : Boolean = {
o match {
case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch {
case e : Throwable => false }
case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)} catch
{ case e : Throwable => false }
case GString(value) => try{value.equals(o.asInstanceOf[GString].v)} catch
{ case e : Throwable => false }
case GNull() => o.isInstanceOf[GNull]
case _ => false
}
}
override def compareTo(o: GValue): Int = {
o match {
case GInt(value) => try{value.compareTo(o.asInstanceOf[GInt].v)} catch {
case e : Throwable => 0 }
case GDouble(value) => try{value.compareTo(o.asInstanceOf[GDouble].v)}
catch { case e : Throwable => 0 }
case GString(value) => try{value.compareTo(o.asInstanceOf[GString].v)}
catch { case e : Throwable => 0 }
case GNull() => 0
case _ => 0
}
}
}
-----------------------------------------------------------------
Abdulrahman Kaitoua
-----------------------------------------------------------------
Ph.D. Candidate at Politecnico Di Milano
> Subject: Re: Serialisation problem
> From: [email protected]
> Date: Mon, 14 Dec 2015 10:42:22 +0100
> To: [email protected]
>
> Hi,
> the problem could be that GValue is not Comparable. Could you try making it
> extend Comparable (The Java Comparable).
>
> Cheers,
> Aljoscha
> > On 12 Dec 2015, at 20:43, Robert Metzger <[email protected]> wrote:
> >
> > Hi,
> >
> > Can you check the log output in your IDE or the log files of the Flink
> > client (./bin/flink). The TypeExtractor is logging why a POJO is not
> > recognized as a POJO.
> >
> > The log statements look like this:
> >
> > 20:42:43,465 INFO org.apache.flink.api.java.typeutils.TypeExtractor
> > - class com.dataartisans.debug.MyPojo must have a default constructor
> > to be used as a POJO.
> >
> >
> >
> > On Thu, Dec 10, 2015 at 11:24 PM, Abdulrahman kaitoua
> > <[email protected]> wrote:
> >
> >
> > Hello,
> >
> > I would like to hive directions to make my code work again (thanks in
> > advance). My code used to work on versions equal or less than 9.1 but when
> > i included 10 or 10.1 i got the following exception.
> >
> > This type
> > (ObjectArrayTypeInfo<GenericType<it.polimi.genomics.core.DataTypes.GValue>>)
> > cannot be used as key
> >
> > I understood that it is related to the serialisation of objects. I tried to
> > follow the POJO building directions in
> > https://cwiki.apache.org/confluence/display/FLINK/Type+System,+Type+Extraction,+Serialization
> > with no luck to make it work.
> >
> > my dataset contains a set of tuples as key and one array of GValues, this
> > is a snapshot of the GValue class.
> >
> >
> > sealed trait GValue extends Serializable with Ordered[GValue]{
> > def compare(o : GValue) : Int = {
> > o match {
> > case GDouble(v) => this.asInstanceOf[GDouble].v compare v
> > case GString(v) => this.asInstanceOf[GString].v compare v
> > case GInt(v) => this.asInstanceOf[GInt].v compare v
> > case GNull() => 0
> > }
> > }
> > def equal(o : GValue) : Boolean = {
> > o match {
> > case GInt(value) => try{value.equals(o.asInstanceOf[GInt].v)} catch {
> > case e : Throwable => false }
> > case GDouble(value) => try{value.equals(o.asInstanceOf[GDouble].v)}
> > catch { case e : Throwable => false }
> > case GString(value) => try{value.equals(o.asInstanceOf[GString].v)}
> > catch { case e : Throwable => false }
> > case GNull() => o.isInstanceOf[GNull]
> > case _ => false
> > }
> > }
> > }
> >
> > /**
> > * Represents a @GValue that contains an integer
> > * @deprecated
> > * @param v
> > */
> > case class GInt(v: Int) extends GValue{
> > def this() = this(0)
> > override def toString() : String = {
> > v.toString
> > }
> > override def equals(other : Any) : Boolean = {
> > other match {
> > case GInt(value) => value.equals(v)
> > case _ => false
> > }
> > }
> > }
> >
> > /**
> > * Represents a @GValue that contains a number as a @Double
> > * @param v number
> > */
> > case class GDouble(v: Double) extends GValue {//with Ordered[GDouble]{
> >
> > def this() = this(0.0)
> >
> > override def equals(other : Any) : Boolean = {
> > other match {
> > case GDouble(value) => value.equals(v)
> > case _ => false
> > }
> > }
> > }
> >
> > /**
> > * Represents a @GValue that contains a @String
> > * @param v string
> > */
> > case class GString(v: String) extends GValue{
> > def this() = this(".")
> > override def toString() : String = {
> > v.toString
> > }
> > override def equals(other : Any) : Boolean = {
> > other match {
> > case GString(value) => value.equals(v)
> > case _ => false
> > }
> > }
> > }
> >
> > Regards,
> >
> > -----------------------------------------------------------------
> > Abdulrahman Kaitoua
> > -----------------------------------------------------------------
> > Ph.D. Candidate at Politecnico Di Milano
> >
> >
>