rkhachatryan commented on a change in pull request #13209:
URL: https://github.com/apache/flink/pull/13209#discussion_r483057672



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
##########
@@ -76,32 +76,68 @@
 
        private final ShuffleMode shuffleMode;
 
-       public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-                       List<String> selectedNames, StreamPartitioner<?> 
outputPartitioner, OutputTag outputTag) {
-               this(sourceVertex,
-                               targetVertex,
-                               typeNumber,
-                               selectedNames,
-                               outputPartitioner,
-                               outputTag,
-                               ShuffleMode.UNDEFINED);
-       }
-
-       public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-                       List<String> selectedNames, StreamPartitioner<?> 
outputPartitioner, OutputTag outputTag,
-                       ShuffleMode shuffleMode) {
+       private long bufferTimeout;
+
+       public StreamEdge(
+               StreamNode sourceVertex,
+               StreamNode targetVertex,
+               int typeNumber,
+               List<String> selectedNames,
+               StreamPartitioner<?> outputPartitioner,
+               OutputTag outputTag) {

Review comment:
       nit: indentation (other constructors too).

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -609,6 +609,32 @@ public void testShuffleModeUndefined() {
                        
sourceAndMapVertex.getProducedDataSets().get(0).getResultType());
        }
 
+       @Test(expected = UnsupportedOperationException.class)
+       public void testConflictShuffleModeWithBufferTimeout() {
+               testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.BATCH);
+       }
+
+       @Test
+       public void testNormalShuffleModeWithBufferTimeout() {
+               
testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.PIPELINED);
+       }
+
+       private void testCompatibleShuffleModeWithBufferTimeout(ShuffleMode 
shuffleMode) {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setBufferTimeout(100);

Review comment:
       I'd also test for `0` timeout as it is treated specially.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -127,7 +124,7 @@
 
        private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
 
-       private long defaultBufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
+       private long defaultBufferTimeout = -1L;

Review comment:
       nit: use constant? (already defined below)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
##########
@@ -76,32 +76,68 @@
 
        private final ShuffleMode shuffleMode;
 
-       public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-                       List<String> selectedNames, StreamPartitioner<?> 
outputPartitioner, OutputTag outputTag) {
-               this(sourceVertex,
-                               targetVertex,
-                               typeNumber,
-                               selectedNames,
-                               outputPartitioner,
-                               outputTag,
-                               ShuffleMode.UNDEFINED);
-       }
-
-       public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int 
typeNumber,
-                       List<String> selectedNames, StreamPartitioner<?> 
outputPartitioner, OutputTag outputTag,
-                       ShuffleMode shuffleMode) {
+       private long bufferTimeout;
+
+       public StreamEdge(
+               StreamNode sourceVertex,
+               StreamNode targetVertex,
+               int typeNumber,
+               List<String> selectedNames,
+               StreamPartitioner<?> outputPartitioner,
+               OutputTag outputTag) {
+
+               this(
+                       sourceVertex,
+                       targetVertex,
+                       typeNumber,
+                       0,

Review comment:
       nit: define constant, e.g. `FLUSH_ALWAYS`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
##########
@@ -136,6 +172,14 @@ public void setPartitioner(StreamPartitioner<?> 
partitioner) {
                this.outputPartitioner = partitioner;
        }
 
+       public void setBufferTimeout(long bufferTimeout) {
+               this.bufferTimeout = bufferTimeout;

Review comment:
       nit: check that current buffer timeout is not set (-1)? or alternatively 
move `checkAndResetBufferTimeout` logic into `StreamEdge`

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##########
@@ -99,6 +99,10 @@
 
        private static final int MANAGED_MEMORY_FRACTION_SCALE = 16;
 
+       private static final long DEFAULT_PIPELINED_BUFFER_TIMEOUT = 100L;
+
+       private static final long DEFAULT_BLOCKING_BUFFER_TIMEOUT = -1L;

Review comment:
       nit: rename to `UNSPECIFIED` or `ON_BUFFER_FULL`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to