Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1668#discussion_r107968567
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
    @@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
         * Creates the identification string with which head and tail task find 
the shared blocking
         * queue for the back channel. The identification string is unique per 
parallel head/tail pair
         * per iteration per job.
    -    * 
    -    * @param jid The job ID.
    -    * @param iterationID The id of the iteration in the job.
    +    *
    +    * @param jid          The job ID.
    +    * @param iterationID  The id of the iteration in the job.
         * @param subtaskIndex The parallel subtask number
         * @return The identification string.
         */
        public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
                return jid + "-" + iterationID + "-" + subtaskIndex;
        }
    +
    +   /**
    +    * An internal operator that solely serves as a state logging facility 
for persisting,
    +    * partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
    +    * logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
    +    * output logs to be uniquely identified and cleared after each 
complete checkpoint.
    +    * <p>
    +    * The design is based on the following assumptions:
    +    * <p>
    +    * - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
    +    * - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
    +    * - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
    +    * gives a singular view of the log.
    +    * <p>
    +    * TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
    +    *
    +    * @param <IN>
    +    */
    +   public static class UpstreamLogger<IN> extends 
AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    +
    +           private final StreamConfig config;
    +
    +           private LinkedList<ListState<StreamRecord<IN>>> slicedLog = new 
LinkedList<>();
    +
    +           private UpstreamLogger(StreamConfig config) {
    +                   this.config = config;
    +           }
    +
    +           public void logRecord(StreamRecord<IN> record) throws Exception 
{
    +                   if (!slicedLog.isEmpty()) {
    +                           slicedLog.getLast().add(record);
    +                   }
    +           }
    +
    +           public void createSlice(String sliceID) throws Exception {
    +                   ListState<StreamRecord<IN>> nextSlice =
    +                           getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
    +                                   
config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader())));
    +                   slicedLog.addLast(nextSlice);
    +           }
    +
    +           public void discardSlice() {
    +                   ListState<StreamRecord<IN>> logToEvict = 
slicedLog.pollFirst();
    +                   logToEvict.clear();
    +           }
    +
    +           public Iterable<StreamRecord<IN>> getReplayLog() throws 
Exception {
    +                   final List<String> logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
    +                   Collections.sort(logSlices, new Comparator<String>() {
    +                           @Override
    +                           public int compare(String o1, String o2) {
    +                                   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
    +                           }
    +                   });
    +
    +                   final List<Iterator<StreamRecord<IN>>> wrappedIterators 
= new ArrayList<>();
    +                   for (String splitID : logSlices) {
    +                           wrappedIterators.add(getOperatorStateBackend()
    +                                   .getOperatorState(new 
ListStateDescriptor<>(splitID,
    +                                           
config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))).get().iterator());
    +                   }
    +
    +                   if (wrappedIterators.size() == 0) {
    +                           return new Iterable<StreamRecord<IN>>() {
    +                                   @Override
    +                                   public Iterator<StreamRecord<IN>> 
iterator() {
    +                                           return 
Collections.emptyListIterator();
    +                                   }
    +                           };
    +                   }
    +
    +                   return new Iterable<StreamRecord<IN>>() {
    +                           @Override
    +                           public Iterator<StreamRecord<IN>> iterator() {
    +
    +                                   return new Iterator<StreamRecord<IN>>() 
{
    +                                           int indx = 0;
    +                                           Iterator<StreamRecord<IN>> 
currentIterator = wrappedIterators.get(0);
    +
    +                                           @Override
    +                                           public boolean hasNext() {
    +                                                   if 
(!currentIterator.hasNext()) {
    +                                                           progressLog();
    +                                                   }
    +                                                   return 
currentIterator.hasNext();
    +                                           }
    +
    +                                           @Override
    +                                           public StreamRecord<IN> next() {
    +                                                   if 
(!currentIterator.hasNext() && indx < wrappedIterators.size()) {
    +                                                           progressLog();
    +                                                   }
    +                                                   return 
currentIterator.next();
    +                                           }
    +
    +                                           private void progressLog() {
    +                                                   while 
(!currentIterator.hasNext() && ++indx < wrappedIterators.size()) {
    +                                                           currentIterator 
= wrappedIterators.get(indx);
    +                                                   }
    +                                           }
    +
    +                                           @Override
    +                                           public void remove() {
    +                                                   throw new 
UnsupportedOperationException();
    +                                           }
    +
    +                                   };
    +                           }
    +                   };
    +           }
    +
    +           public void clearLog() throws Exception {
    +                   for (String outputLogs : 
getOperatorStateBackend().getRegisteredStateNames()) {
    --- End diff --
    
    It's kind of bad that we can't remove the state completely and keep 
iterating over them when replaying the log...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to