Re: [re-cont] map and flatMap

2014-03-12 Thread Pascal Voitot Dev
On Wed, Mar 12, 2014 at 3:06 PM, andy petrella wrote:

> Folks,
>
> I want just to pint something out...
> I didn't had time yet to sort it out and to think enough to give valuable
> strict explanation of -- event though, intuitively I feel they are a lot
> ===> need spark people or time to move forward.
> But here is the thing regarding *flatMap*.
>
> Actually, it looks like (and again intuitively makes sense) that RDD (and
> of course DStream) aren't monadic and it is reflected in the implementation
> (and signature) of flatMap.
>
> >
> > *  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = **
> > new FlatMappedRDD(this, sc.clean(f))*
>
>
> There!? flatMap (or bind, >>=) should take a function that use the same
> Higher level abstraction in order to be considered as such right?
>
>
I had remarked exactly the same thing and asked myself the same question...

In this case, it takes a function that returns a TraversableOnce which is
> the type of the content of the RDD, and what represent the output is more
> the content of the RDD than the RDD itself (still right?).
>
> This actually breaks the understand of map and flatMap
>
> > *def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this,
> > sc.clean(f))*
>
>
> Indeed, RDD is a functor and the underlying reason for flatMap to not take
> A => RDD[B] doesn't show up in map.
>
> This has a lot of consequence actually, because at first one might want to
> create for-comprehension over RDDs, of even Traversable[F[_]] functions
> like sequence -- and he will get stuck since the signature aren't
> compliant.
> More importantly, Scala uses convention on the structure of a type to allow
> for-comp... so where Traversable[F[_]] will fail on type, for-comp will
> failed weirdly.
>

+1


>
> Again this signature sounds normal, because my intuitive feeling about RDDs
> is that they *only can* be monadic but the composition would depend on the
> use case and might have heavy consequences (unioning the RDDs for instance
> => this happening behind the sea can be a big pain, since it wouldn't be
> efficient at all).
>
> So Yes, RDD could be monadic but with care.
>

At least we can say, it is a Functor...
Actually, I had imagined studying the monadic aspect of RDDs but as you
said, it's not so easy...
So for now, I consider them as pseudo-monadic ;)



> So what exposes this signature is a way to flatMap over the inner value,
> like it is almost the case for Map (flatMapValues)
>
> So, wouldn't be better to rename flatMap as flatMapData (or whatever better
> name)? Or to have flatMap requiring a Monad instance of RDD?
>
>
renaming is to flatMapData or flatTraversableMap sounds good to me (even if
lots of people will hate it...)
flatMap requiring a Monad would make it impossible to use with
for-comprehension certainly no?


> Sorry for the prose, just dropped my thoughts and feelings at once :-/
>
>
I agree with you in case it can help not to feel alone ;)

Pascal

Cheers,
> andy
>
> PS: and my English maybe, although my name's Andy I'm a native Belgian ^^.
>


[spark-streaming] Is this LocalInputDStream useful to someone?

2014-03-20 Thread Pascal Voitot Dev
Hi guys,

In my recent blog post (http://mandubian.com/2014/03/08/zpark-ml-nio-1/), I
needed to have an InputDStream helper looking like NetworkInputDStream to
be able to push my data into DStream in an async way. But I didn't want the
remoting aspect as my data source runs locally and nowhere else. I didn't
want my InputDStream to be considered as a NetworkInputDStream as they have
a special management in DStream scheduler to be potentially remoted.

So I've implemented this LocalInputDStream that provides simple push with
an receiver based on an actor, creating BlockRDD but ensures it won't be
remoted:

https://github.com/mandubian/zpark-ztream/blob/master/src/main/scala/LocalInputDStream.scala

(the code is just a hack of NetworkInputDStream)

and a instance of it:
https://github.com/mandubian/zpark-ztream/blob/master/src/main/scala/ZparkInputDStream.scala

Is it something useful for the spark-streaming project that I could
contribute to the project (in a PR) or have I totally missed something that
would do the same in current project code?

Best regards
Pascal


Re: Making RDDs Covariant

2014-03-22 Thread Pascal Voitot Dev
Hi,

Covariance always seems like a good idea at first but you must be really
careful as it always has unexpected consequences...
>From my experience, covariance often becomes a pain when dealing with
serialization/deserialization (I've experienced a few cases while
developing play-json & datomisca).
Moreover, if you have implicits, variance often becomes a headache...

I'm not against necessarily but my experience proved it wasn't always a so
good idea. The solution is to test in depth...

Pascal



On Sat, Mar 22, 2014 at 6:31 AM, Jey Kottalam  wrote:

> That would be awesome. I support this!
>
> On Fri, Mar 21, 2014 at 7:28 PM, Michael Armbrust
>  wrote:
> > Hey Everyone,
> >
> > Here is a pretty major (but source compatible) change we are considering
> > making to the RDD API for 1.0.  Java and Python APIs would remain the
> same,
> > but users of Scala would likely need to use less casts.  This would be
> > especially true for libraries whose functions take RDDs as parameters.
>  Any
> > comments would be appreciated!
> >
> > https://spark-project.atlassian.net/browse/SPARK-1296
> >
> > Michael
>


Re: Making RDDs Covariant

2014-03-22 Thread Pascal Voitot Dev
On Sat, Mar 22, 2014 at 3:45 PM, Michael Armbrust wrote:

> >
> > From my experience, covariance often becomes a pain when dealing with
> > serialization/deserialization (I've experienced a few cases while
> > developing play-json & datomisca).
> > Moreover, if you have implicits, variance often becomes a headache...
>
>
> This is exactly the kind of feedback I was hoping for!  Can you be any more
> specific about the kinds of problems you ran into here?
>

I've been rethinking about this topic after writing my first mail.

The problem I was talking about is when you try to use typeclass converters
and make them contravariant/covariant for input/output. Something like:

Reader[-I, +O] { def read(i:I): O }

Doing this, you soon have implicit collisions and philosophical concerns
about what it means to serialize/deserialize a Parent class and a Child
class...

For ex, if you have a Reader[I, Dog], you also have a Reader[I, Mammal] by
covariance.
Then you use this Reader[I, Mammal] to read a Cat because it's a Mammal.
But it fails as the original Reader expects the representation of a full
Dog, not only a part of it corresponding to the Mammal...

So you see here that the problem is on deserialization/deserialization
mechanism itself.

In your case, you don't have this kind of concerns as JavaSerializer and
KryoSerializer are more about basic marshalling that operates at low-level
class representation and you don't rely on implicit typeclasses...

So let's consider what you really want, RDD[+T] and see whether it will
have bad impacts.

if you do:

val rddChild: RDD[Child] = sc.parallelize(Seq(Child(...), Child(...), ...))

If you perform map/reduce ops on this rddChild, when remoting objects,
spark context will serialize all sequence elements as Child.

But if you do that:
val rddParent: RDD[Parent] = rddChild

If you perform ops on rddParent, I believe that the serializer should
serialize elements as Parent elements, certainly losing some data from
Child.
On the remote node, they will be deserialized as Parent too but they
shouldn't be Child elements anymore.

So, here, if it works as I say (I'm not sure), it would mean the following:
you have created a RDD from some data and just by invoking covariance, you
might have lost data through the remoting mechanism.

Is it something bad in your opinion? (I'm thinking aloud)

Pascal


Re: Making RDDs Covariant

2014-03-22 Thread Pascal Voitot Dev
On Sat, Mar 22, 2014 at 8:38 PM, David Hall  wrote:

> On Sat, Mar 22, 2014 at 8:59 AM, Pascal Voitot Dev <
> pascal.voitot@gmail.com> wrote:
>
> > The problem I was talking about is when you try to use typeclass
> converters
> > and make them contravariant/covariant for input/output. Something like:
> >
> > Reader[-I, +O] { def read(i:I): O }
> >
> > Doing this, you soon have implicit collisions and philosophical concerns
> > about what it means to serialize/deserialize a Parent class and a Child
> > class...
> >
>
>
> You should (almost) never make a typeclass param contravariant. It's almost
> certainly not what you want:
>
> https://issues.scala-lang.org/browse/SI-2509
>
> -- David
>

I confirm that it's a pain and I must say I never do it but I've inherited
historical code that did it :)


Re: Making RDDs Covariant

2014-03-22 Thread Pascal Voitot Dev
Ok guys, I've read spark code a bit deeper on serialization side...
You're right, Java & Kryo serialization are runtime only so yes this isn't
really a problem.

A few weeks ago, I've studied a bit how we could integrate Pickling to
spark but currently it's not really possible as Pickling is based on macro.
In this context, types would be meaningful and covariance could have
impacts too.

So for now, I don't see anything against trying RDD[+T].

Pascal

On Sat, Mar 22, 2014 at 9:01 PM, Michael Armbrust wrote:

> Hi Pascal,
>
> Thanks for the input.  I think we are going to be okay here since, as Koert
> said, the current serializers use runtime type information.  We could also
> keep at ClassTag around for the original type when the RDD was created.
>  Good things to be aware of though.
>
> Michael
>
> On Sat, Mar 22, 2014 at 12:42 PM, Pascal Voitot Dev <
> pascal.voitot@gmail.com> wrote:
>
> > On Sat, Mar 22, 2014 at 8:38 PM, David Hall 
> wrote:
> >
> > > On Sat, Mar 22, 2014 at 8:59 AM, Pascal Voitot Dev <
> > > pascal.voitot@gmail.com> wrote:
> > >
> > > > The problem I was talking about is when you try to use typeclass
> > > converters
> > > > and make them contravariant/covariant for input/output. Something
> like:
> > > >
> > > > Reader[-I, +O] { def read(i:I): O }
> > > >
> > > > Doing this, you soon have implicit collisions and philosophical
> > concerns
> > > > about what it means to serialize/deserialize a Parent class and a
> Child
> > > > class...
> > > >
> > >
> > >
> > > You should (almost) never make a typeclass param contravariant. It's
> > almost
> > > certainly not what you want:
> > >
> > > https://issues.scala-lang.org/browse/SI-2509
> > >
> > > -- David
> > >
> >
> > I confirm that it's a pain and I must say I never do it but I've
> inherited
> > historical code that did it :)
> >
>


Re: Making RDDs Covariant

2014-03-22 Thread Pascal Voitot Dev
On Sat, Mar 22, 2014 at 7:45 PM, andy petrella wrote:

> Dear,
> I'm pretty much following the Pascal's advices, since I've myseelf
> encoutered some problems with implicits (when playing the same kind of game
> with my Neo4J Scala API).
>
> Nevertheless, one remark regarding the serialization, the lost of data
> shouldn't arrive in the case whenimplicit typeclasses aren't involved. Of
> course using Typeclasses means that the instance will be chosen at compile
> time. Without them it will behave like classical use cases where the
> serializer will do the dirty work at runtime and using the current class
> :/.
>
> Now, imho, I'd be interested to have RDD covariant on the content type,
> this because I have an API that I should be able to share with you soon or
> sooner where we are trying to bind the two worlds (rdd+SparkCtx and
> dstream+StreamingCtx) and also to combine and chain job components.
> In a nutshell, it will able to define Source, Process and Sink of Container
> of Wagons (Rdds Dstreams themselves) to compose a Job using a (to be
> defined) DSLs.
>

You can't give information like that and stop too soon :)
You know that I've been struggling for some time playing with spark &
scalaz-stream and I'm curious ;)


So without covariance I cannot for now define a generic noop Sink.
>
> My0.02c
> Andy
>
> Sent from Tab, sorry for the typos...
>  Le 22 mars 2014 17:00, "Pascal Voitot Dev" 
> a
> écrit :
>
> > On Sat, Mar 22, 2014 at 3:45 PM, Michael Armbrust <
> mich...@databricks.com
> > >wrote:
> >
> > > >
> > > > From my experience, covariance often becomes a pain when dealing with
> > > > serialization/deserialization (I've experienced a few cases while
> > > > developing play-json & datomisca).
> > > > Moreover, if you have implicits, variance often becomes a headache...
> > >
> > >
> > > This is exactly the kind of feedback I was hoping for!  Can you be any
> > more
> > > specific about the kinds of problems you ran into here?
> > >
> >
> > I've been rethinking about this topic after writing my first mail.
> >
> > The problem I was talking about is when you try to use typeclass
> converters
> > and make them contravariant/covariant for input/output. Something like:
> >
> > Reader[-I, +O] { def read(i:I): O }
> >
> > Doing this, you soon have implicit collisions and philosophical concerns
> > about what it means to serialize/deserialize a Parent class and a Child
> > class...
> >
> > For ex, if you have a Reader[I, Dog], you also have a Reader[I, Mammal]
> by
> > covariance.
> > Then you use this Reader[I, Mammal] to read a Cat because it's a Mammal.
> > But it fails as the original Reader expects the representation of a full
> > Dog, not only a part of it corresponding to the Mammal...
> >
> > So you see here that the problem is on deserialization/deserialization
> > mechanism itself.
> >
> > In your case, you don't have this kind of concerns as JavaSerializer and
> > KryoSerializer are more about basic marshalling that operates at
> low-level
> > class representation and you don't rely on implicit typeclasses...
> >
> > So let's consider what you really want, RDD[+T] and see whether it will
> > have bad impacts.
> >
> > if you do:
> >
> > val rddChild: RDD[Child] = sc.parallelize(Seq(Child(...), Child(...),
> ...))
> >
> > If you perform map/reduce ops on this rddChild, when remoting objects,
> > spark context will serialize all sequence elements as Child.
> >
> > But if you do that:
> > val rddParent: RDD[Parent] = rddChild
> >
> > If you perform ops on rddParent, I believe that the serializer should
> > serialize elements as Parent elements, certainly losing some data from
> > Child.
> > On the remote node, they will be deserialized as Parent too but they
> > shouldn't be Child elements anymore.
> >
> > So, here, if it works as I say (I'm not sure), it would mean the
> following:
> > you have created a RDD from some data and just by invoking covariance,
> you
> > might have lost data through the remoting mechanism.
> >
> > Is it something bad in your opinion? (I'm thinking aloud)
> >
> > Pascal
> >
>


Re: [spark-streaming] Is this LocalInputDStream useful to someone?

2014-03-27 Thread Pascal Voitot Dev
Hi guys,
Nobody saw/answered my question so I resend it ;)

Thanks
Pascal


On Thu, Mar 20, 2014 at 9:17 AM, Pascal Voitot Dev <
pascal.voitot@gmail.com> wrote:

> Hi guys,
>
> In my recent blog post (http://mandubian.com/2014/03/08/zpark-ml-nio-1/),
> I needed to have an InputDStream helper looking like NetworkInputDStream to
> be able to push my data into DStream in an async way. But I didn't want the
> remoting aspect as my data source runs locally and nowhere else. I didn't
> want my InputDStream to be considered as a NetworkInputDStream as they have
> a special management in DStream scheduler to be potentially remoted.
>
> So I've implemented this LocalInputDStream that provides simple push with
> an receiver based on an actor, creating BlockRDD but ensures it won't be
> remoted:
>
>
> https://github.com/mandubian/zpark-ztream/blob/master/src/main/scala/LocalInputDStream.scala
>
> (the code is just a hack of NetworkInputDStream)
>
> and a instance of it:
>
> https://github.com/mandubian/zpark-ztream/blob/master/src/main/scala/ZparkInputDStream.scala
>
> Is it something useful for the spark-streaming project that I could
> contribute to the project (in a PR) or have I totally missed something that
> would do the same in current project code?
>
> Best regards
> Pascal
>