rkhachatryan commented on a change in pull request #13209:
URL: https://github.com/apache/flink/pull/13209#discussion_r477537338



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -285,11 +285,7 @@ public StreamGraph generate() {
                        alreadyTransformed.put(transform, transformedIds);
                }
 
-               if (transform.getBufferTimeout() >= 0) {
-                       streamGraph.setBufferTimeout(transform.getId(), 
transform.getBufferTimeout());
-               } else {
-                       streamGraph.setBufferTimeout(transform.getId(), 
defaultBufferTimeout);

Review comment:
       Can you explain the reasoning behind removing this branch?
   
   I think we still need it, because if the user doesn't set a timeout for 
transformation then default job timeout should be used.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -591,6 +593,13 @@ private void connect(Integer headOfChain, StreamEdge edge) 
{
                }
        }
 
+       private void checkCompatible(ResultPartitionType type, long 
bufferTimeout) {
+               if (type.isBlocking() && bufferTimeout != -1) {
+                       throw new UnsupportedOperationException("Blocking 
partition does not support buffer timeout at the" +

Review comment:
       I think adding the timeout value and `edge.toString` could help to find 
out misconfguration to the user.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -609,6 +609,33 @@ public void testShuffleModeUndefined() {
                        
sourceAndMapVertex.getProducedDataSets().get(0).getResultType());
        }
 
+       @Test(expected = UnsupportedOperationException.class)
+       public void testConflictShuffleModeWithBufferTimeout() {
+               testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.BATCH);
+       }
+
+       @Test
+       public void testNormalShuffleModeWithBufferTimeout() {
+               
testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.PIPELINED);
+       }
+
+       private void testCompatibleShuffleModeWithBufferTimeout(ShuffleMode 
shuffleMode) {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 
3);
+               sourceDataStream.getTransformation().setBufferTimeout(100);

Review comment:
       If we keep using the default timeout (discussion above),
   then the issue also happens with 
`StreamExecutionEnvironment.setBufferTimeout(100)` or even default, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to