Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107968436 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. - * - * @param jid The job ID. - * @param iterationID The id of the iteration in the job. + * + * @param jid The job ID. + * @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** + * An internal operator that solely serves as a state logging facility for persisting, + * partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, + * logs are being sliced proportionally to the number of concurrent snapshots. This allows committed + * output logs to be uniquely identified and cleared after each complete checkpoint. + * <p> + * The design is based on the following assumptions: + * <p> + * - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. + * - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. + * - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that + * gives a singular view of the log. + * <p> + * TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. + * + * @param <IN> + */ + public static class UpstreamLogger<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> { + + private final StreamConfig config; + + private LinkedList<ListState<StreamRecord<IN>>> slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord<IN> record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState<StreamRecord<IN>> nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))); + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState<StreamRecord<IN>> logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable<StreamRecord<IN>> getReplayLog() throws Exception { + final List<String> logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List<Iterator<StreamRecord<IN>>> wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))).get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable<StreamRecord<IN>>() { + @Override + public Iterator<StreamRecord<IN>> iterator() { + return Collections.emptyListIterator(); + } + }; + } + + return new Iterable<StreamRecord<IN>>() { + @Override + public Iterator<StreamRecord<IN>> iterator() { + + return new Iterator<StreamRecord<IN>>() { + int indx = 0; + Iterator<StreamRecord<IN>> currentIterator = wrappedIterators.get(0); + + @Override + public boolean hasNext() { + if (!currentIterator.hasNext()) { + progressLog(); + } + return currentIterator.hasNext(); + } + + @Override + public StreamRecord<IN> next() { + if (!currentIterator.hasNext() && indx < wrappedIterators.size()) { + progressLog(); + } + return currentIterator.next(); + } + + private void progressLog() { + while (!currentIterator.hasNext() && ++indx < wrappedIterators.size()) { --- End diff -- You do a lot of checking against !currentIterator.hasNext() you could probably have it only in progressLog and call that without the if in the other places (maybe rename to progressIfNecessary)
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---