[ https://issues.apache.org/jira/browse/FLINK-6439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010454#comment-16010454 ]
ASF GitHub Bot commented on FLINK-6439: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3904#discussion_r116483708 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java --- @@ -46,111 +46,113 @@ public static String getResourceFilename(String filename) { public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException { FileOutputStream out = new FileOutputStream(path); - DataOutputStream dos = new DataOutputStream(out); - - dos.writeInt(state.getOperatorChainIndex()); - - SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); - - Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState(); - if (rawOperatorState != null) { - dos.writeInt(rawOperatorState.size()); - for (OperatorStateHandle operatorStateHandle : rawOperatorState) { - SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + + try(DataOutputStream dos = new DataOutputStream(out)) { + + dos.writeInt(state.getOperatorChainIndex()); + + SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); + + Collection<OperatorStateHandle> rawOperatorState = state.getRawOperatorState(); + if (rawOperatorState != null) { + dos.writeInt(rawOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : rawOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no states, not even an empty list - dos.writeInt(-1); - } - Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState(); - if (managedOperatorState != null) { - dos.writeInt(managedOperatorState.size()); - for (OperatorStateHandle operatorStateHandle : managedOperatorState) { - SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + Collection<OperatorStateHandle> managedOperatorState = state.getManagedOperatorState(); + if (managedOperatorState != null) { + dos.writeInt(managedOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : managedOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no states, not even an empty list - dos.writeInt(-1); - } - Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState(); - if (rawKeyedState != null) { - dos.writeInt(rawKeyedState.size()); - for (KeyedStateHandle keyedStateHandle : rawKeyedState) { - SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + Collection<KeyedStateHandle> rawKeyedState = state.getRawKeyedState(); + if (rawKeyedState != null) { + dos.writeInt(rawKeyedState.size()); + for (KeyedStateHandle keyedStateHandle : rawKeyedState) { + SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + } + } else { + // this means no operator states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no operator states, not even an empty list - dos.writeInt(-1); - } - Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState(); - if (managedKeyedState != null) { - dos.writeInt(managedKeyedState.size()); - for (KeyedStateHandle keyedStateHandle : managedKeyedState) { - SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + Collection<KeyedStateHandle> managedKeyedState = state.getManagedKeyedState(); + if (managedKeyedState != null) { + dos.writeInt(managedKeyedState.size()); + for (KeyedStateHandle keyedStateHandle : managedKeyedState) { + SavepointV1Serializer.serializeKeyedStateHandle(keyedStateHandle, dos); + } + } else { + // this means no operator states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no operator states, not even an empty list - dos.writeInt(-1); - } - dos.flush(); - out.close(); + dos.flush(); + } } public static OperatorStateHandles readStateHandle(String path) throws IOException, ClassNotFoundException { FileInputStream in = new FileInputStream(path); - DataInputStream dis = new DataInputStream(in); - int index = dis.readInt(); + try(DataInputStream dis = new DataInputStream(in)) { --- End diff -- missing space after try. > Unclosed InputStream in OperatorSnapshotUtil#readStateHandle() > -------------------------------------------------------------- > > Key: FLINK-6439 > URL: https://issues.apache.org/jira/browse/FLINK-6439 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing > Reporter: Ted Yu > Assignee: Fang Yong > Priority: Minor > > {code} > FileInputStream in = new FileInputStream(path); > DataInputStream dis = new DataInputStream(in); > {code} > None of the in / dis is closed upon return from the method. > In writeStateHandle(), OutputStream should be closed in finally block. -- This message was sent by Atlassian JIRA (v6.3.15#6346)