[ https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17291993#comment-17291993 ]
Iaroslav Zeigerman commented on FLINK-21507: -------------------------------------------- The alternative (simpler) workaround is to insert a dummy map() operator right before reinterpreting the stream as keyed stream: {code:scala} val firstPartitioning = env .addSource(testRecordSoruce) .partitionCustom(randomPartitioner, _.key) .map(identity(_)) .name("dummy-map") .uid("dummy-map") val keyedStream = new KeyedStream( DataStreamUtils.reinterpretAsKeyedStream( firstPartitioning.javaStream, new KeySelector[TestRecord, String] { override def getKey(value: TestRecord): String = value.key } ) ) {code} > Reinterpreting stream as keyed breaks the upstream partitioning > --------------------------------------------------------------- > > Key: FLINK-21507 > URL: https://issues.apache.org/jira/browse/FLINK-21507 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Scala > Affects Versions: 1.11.0 > Reporter: Iaroslav Zeigerman > Priority: Major > > When I use multiple custom partitioning operations in a row like this: > {code:java} > stream > .partitionCustom(<custom_partitioner1>, _.key1) > .mapWithState(...) > .partitionCustom(<custom_partitioner2>, _.key2) > .map(...) > ....{code} > I see that only last partitioning operation (custom_partitioner2) is > reflected in the DAG while the 1st one is ignored entirely. > I've also confirmed that the 1st partitioning wasn't applied at runtime from > application logs. > *UPD* > Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream: > {code:scala} > case class TestRecord(key: String) > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setParallelism(1) > val testRecordSoruce = ... > val randomPartitioner = new Partitioner[String] { > override def partition(key: String, numPartitions: Int): Int = > math.abs(key.hashCode) % numPartitions > } > val firstPartitioning = env > .addSource(testRecordSoruce) > .partitionCustom(randomPartitioner, _.key) > val keyedStream = new KeyedStream( > DataStreamUtils.reinterpretAsKeyedStream( > firstPartitioning.javaStream, > new KeySelector[TestRecord, String] { > override def getKey(value: TestRecord): String = value.key > } > ) > ) > keyedStream > .map(identity(_)) > .partitionCustom(randomPartitioner, _.key) > .map(identity(_)) > > {code} > This code produces the following DAG: > {code:javascript} > { > "nodes" : [ { > "id" : 22, > "type" : "Source: Custom Source", > "pact" : "Data Source", > "contents" : "Source: Custom Source", > "parallelism" : 1 > }, { > "id" : 25, > "type" : "Map", > "pact" : "Operator", > "contents" : "Map", > "parallelism" : 1, > "predecessors" : [ { > "id" : 22, > "ship_strategy" : "FORWARD", > "side" : "second" > } ] > }, { > "id" : 27, > "type" : "Map", > "pact" : "Operator", > "contents" : "Map", > "parallelism" : 1, > "predecessors" : [ { > "id" : 25, > "ship_strategy" : "CUSTOM", > "side" : "second" > } ] > } ] > } > {code} > The expected behavior to have CUSTOM connection in both cases vs FORWARD then > CUSTOM. > -- This message was sent by Atlassian Jira (v8.3.4#803005)