[ https://issues.apache.org/jira/browse/FLINK-19325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Arvid Heise reassigned FLINK-19325: ----------------------------------- Assignee: Arvid Heise > Optimize the consumed time for checkpoint completion > ---------------------------------------------------- > > Key: FLINK-19325 > URL: https://issues.apache.org/jira/browse/FLINK-19325 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Reporter: Congxian Qiu > Assignee: Arvid Heise > Priority: Major > Labels: pull-request-available > > Currently when completing a checkpoint, we'll write out the state handle out > in {{MetadataV2V3SerializerBase.java#serializeStreamStateHandle}} > {code:java} > static void serializeStreamStateHandle(StreamStateHandle stateHandle, > DataOutputStream dos) throws IOException { > if (stateHandle == null) { > dos.writeByte(NULL_HANDLE); > } else if (stateHandle instanceof RelativeFileStateHandle) { > dos.writeByte(RELATIVE_STREAM_STATE_HANDLE); > RelativeFileStateHandle relativeFileStateHandle = > (RelativeFileStateHandle) stateHandle; > dos.writeUTF(relativeFileStateHandle.getRelativePath()); > dos.writeLong(relativeFileStateHandle.getStateSize()); > } else if (stateHandle instanceof FileStateHandle) { > dos.writeByte(FILE_STREAM_STATE_HANDLE); > FileStateHandle fileStateHandle = (FileStateHandle) stateHandle; > dos.writeLong(stateHandle.getStateSize()); > dos.writeUTF(fileStateHandle.getFilePath().toString()); > } else if (stateHandle instanceof ByteStreamStateHandle) { > dos.writeByte(BYTE_STREAM_STATE_HANDLE); > ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) > stateHandle; > dos.writeUTF(byteStreamStateHandle.getHandleName()); > byte[] internalData = byteStreamStateHandle.getData(); > dos.writeInt(internalData.length); > dos.write(byteStreamStateHandle.getData()); > } else { > throw new IOException("Unknown implementation of StreamStateHandle: " + > stateHandle.getClass()); > } > dos.flush(); > } > {code} > We'll call {{dos.flush()}} after every state handle written out. But this may > consume too much time and is not needed, because we'll close the outputstream > after all things have been written out. > I propose to remove the {{dos.flush()}} here to optimize the consumed time > for checkpoint completion -- This message was sent by Atlassian Jira (v8.3.4#803005)