[ 
https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17291993#comment-17291993
 ] 

Iaroslav Zeigerman commented on FLINK-21507:
--------------------------------------------

The alternative (simpler) workaround is to insert a dummy map() operator right 
before reinterpreting the stream as keyed stream:

{code:scala}
val firstPartitioning = env
      .addSource(testRecordSoruce)
      .partitionCustom(randomPartitioner, _.key)
      .map(identity(_))
      .name("dummy-map")
      .uid("dummy-map")

    val keyedStream = new KeyedStream(
      DataStreamUtils.reinterpretAsKeyedStream(
        firstPartitioning.javaStream,
        new KeySelector[TestRecord, String] {
          override def getKey(value: TestRecord): String = value.key
        }
      )
    )
{code}


> Reinterpreting stream as keyed breaks the upstream partitioning
> ---------------------------------------------------------------
>
>                 Key: FLINK-21507
>                 URL: https://issues.apache.org/jira/browse/FLINK-21507
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, API / Scala
>    Affects Versions: 1.11.0
>            Reporter: Iaroslav Zeigerman
>            Priority: Major
>
> When I use multiple custom partitioning operations in a row like this:
> {code:java}
> stream
>   .partitionCustom(<custom_partitioner1>, _.key1)
>   .mapWithState(...)
>   .partitionCustom(<custom_partitioner2>, _.key2)
>   .map(...)
>   ....{code}
> I see that only last partitioning operation (custom_partitioner2) is 
> reflected in the DAG while the 1st one is ignored entirely.
> I've also confirmed that the 1st partitioning wasn't applied at runtime from 
> application logs.
> *UPD*
> Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
> {code:scala}
>     case class TestRecord(key: String)
>     
>     val env = StreamExecutionEnvironment.getExecutionEnvironment 
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
>     env.setParallelism(1)
>     val testRecordSoruce = ...
>     val randomPartitioner = new Partitioner[String] {
>       override def partition(key: String, numPartitions: Int): Int = 
> math.abs(key.hashCode) % numPartitions
>     }
>     val firstPartitioning = env
>       .addSource(testRecordSoruce)
>       .partitionCustom(randomPartitioner, _.key)
>     val keyedStream = new KeyedStream(
>       DataStreamUtils.reinterpretAsKeyedStream(
>         firstPartitioning.javaStream,
>         new KeySelector[TestRecord, String] {
>           override def getKey(value: TestRecord): String = value.key
>         }
>       )
>     )
>     keyedStream
>       .map(identity(_))
>       .partitionCustom(randomPartitioner, _.key)
>       .map(identity(_))
>  
>  {code}
> This code produces the following DAG:
> {code:javascript}
> {
>   "nodes" : [ {
>     "id" : 22,
>     "type" : "Source: Custom Source",
>     "pact" : "Data Source",
>     "contents" : "Source: Custom Source",
>     "parallelism" : 1
>   }, {
>     "id" : 25,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 22,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 27,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 25,
>       "ship_strategy" : "CUSTOM",
>       "side" : "second"
>     } ]
>   } ]
> }
> {code}
> The expected behavior to have CUSTOM connection in both cases vs FORWARD then 
> CUSTOM.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to