[ 
https://issues.apache.org/jira/browse/FLINK-13992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930355#comment-16930355
 ] 

TisonKun commented on FLINK-13992:
----------------------------------

Thanks for your clarification [~azagrebin]! Now I understand the trade-off here 
and agree only mark {{@Nonnull}} in a context where one would expect a nullable 
value.

> Refactor Optional parameter in InputGateWithMetrics#updateMetrics
> -----------------------------------------------------------------
>
>                 Key: FLINK-13992
>                 URL: https://issues.apache.org/jira/browse/FLINK-13992
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.10.0
>            Reporter: TisonKun
>            Assignee: TisonKun
>            Priority: Major
>             Fix For: 1.10.0
>
>
> As consensus from community code style discussion, in 
> {{InputGateWithMetrics#updateMetrics}} we can refactor to reduce the usage of 
> Optional parameter.
> cc [~azagrebin]
> {code:java}
> diff --git 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
>  
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> index 5d2cfd95c4..e548fbf02b 100644
> --- 
> a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> +++ 
> b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/InputGateWithMetrics.java
> @@ -24,6 +24,8 @@ import 
> org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
>  import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
>  import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
>  
> +import javax.annotation.Nonnull;
> +
>  import java.io.IOException;
>  import java.util.Optional;
>  import java.util.concurrent.CompletableFuture;
> @@ -67,12 +69,12 @@ public class InputGateWithMetrics extends InputGate {
>  
>       @Override
>       public Optional<BufferOrEvent> getNext() throws IOException, 
> InterruptedException {
> -             return updateMetrics(inputGate.getNext());
> +             return inputGate.getNext().map(this::updateMetrics);
>       }
>  
>       @Override
>       public Optional<BufferOrEvent> pollNext() throws IOException, 
> InterruptedException {
> -             return updateMetrics(inputGate.pollNext());
> +             return inputGate.pollNext().map(this::updateMetrics);
>       }
>  
>       @Override
> @@ -85,8 +87,8 @@ public class InputGateWithMetrics extends InputGate {
>               inputGate.close();
>       }
>  
> -     private Optional<BufferOrEvent> updateMetrics(Optional<BufferOrEvent> 
> bufferOrEvent) {
> -             bufferOrEvent.ifPresent(b -> numBytesIn.inc(b.getSize()));
> +     private BufferOrEvent updateMetrics(@Nonnull BufferOrEvent 
> bufferOrEvent) {
> +             numBytesIn.inc(bufferOrEvent.getSize());
>               return bufferOrEvent;
>       }
>  }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to