[ https://issues.apache.org/jira/browse/FLINK-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531680#comment-16531680 ]
Josh Lemer commented on FLINK-9221: ----------------------------------- [~yanghua]You may be right. One reason why it may be wrong to put the method on `SinkFunction` is that the opposite methods (`map`, `flatMap`) are not on `SourceFunction` or `MapFunction`, but instead are on `DataStream`. However, there isn't really any other abstraction in flink where contramap can go on, since there's not an equivalent `DataSink` abstraction like there is in Akka Streams or similar, so putting it on SinkFunction may be the best option unless we come up with a proper `DataSink` abstraction. > Add method SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B] > ------------------------------------------------------------------- > > Key: FLINK-9221 > URL: https://issues.apache.org/jira/browse/FLINK-9221 > Project: Flink > Issue Type: Task > Components: DataSet API, DataStream API > Affects Versions: 1.5.0 > Reporter: Josh Lemer > Assignee: vinoyang > Priority: Minor > Labels: flink > > Just like it is very useful to use `DataStream[T]` as a sort of Functor or > Monad with `map`/`flatMap`/`filter` methods, it would be extremely handy to > have a `SinkFunction[A]#contramap[B](f: B => A): SinkFunction[B]` on > `SinkFunctions`, so that you can reuse existing complex sink functions, but > with a different input type. For example: > {code} > val bucketingStringSink: SinkFunction[String] = > new BucketingSink[String]("...") > .setBucketer(new DateTimeBucketer("yyyy-MM-dd-HHmm") > val bucketingIntListSink: SinkFunction[List[Int]] = > bucketingStringSink.contramap[List[Int]](_.mkString(",")) > {code} > For some more formal motivation behind this, > https://typelevel.org/cats/typeclasses/contravariant.html is definitely a > great place to start! -- This message was sent by Atlassian JIRA (v7.6.3#76005)