Yes, this might be nice. Till and I had similar ideas about using the
pattern to make broadcast variables more useable in Scala, in fact. :D

On Fri, 24 Jul 2015 at 17:39 Gyula Fóra <gyf...@apache.org> wrote:

> Hey,
>
> I would like to propose a way to extend the standard Streaming Scala API
> methods (map, flatmap, filter etc) with versions that take stateful
> functions as lambdas. I think this would eliminate the awkwardness of
> implementing RichFunctions in Scala and make statefulness more explicit:
>
> *For example:*
> def map( statefulMap: (I, Option[S]) => (O, Option[S]) )
> def flatMap( statefulFlatMap: (I, Option[S] ) => (Traversable[O],
> Option[S]))
>
> This would be translated into RichMap and RichFlatMapFunctions that store
> Option[S] as OperatorState for fault tolerance.
>
> *Example rolling sum by key:*
> val input: DataStream[Long] = ...
> val sumByKey: DataStream[Long] =
>     input.keyBy(...).map( (next: Long, sum: Option[Long]) =>
>          sum match {
>                    case Some(s) => (next + s, Some(next + s))
>                    case None => (next, Some(next))
>           })
>
> What do you think?
>
> Gyula
>

Reply via email to