[ https://issues.apache.org/jira/browse/FLINK-4391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15642862#comment-15642862 ]
ASF GitHub Bot commented on FLINK-4391: --------------------------------------- Github user bjlovegithub commented on a diff in the pull request: https://github.com/apache/flink/pull/2629#discussion_r86706002 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncCollectorBuffer.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.async; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamElement; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * AsyncCollectorBuffer will hold all {@link AsyncCollector} in its internal buffer, + * and emit results from {@link AsyncCollector} to the next operators following it by + * calling {@link Output#collect(Object)} + */ +@Internal +public class AsyncCollectorBuffer<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(AsyncCollectorBuffer.class); + + /** + * Max number of {@link AsyncCollector} in the buffer. + */ + private final int bufferSize; + + private final AsyncDataStream.OutputMode mode; + + private final AsyncWaitOperator<IN, OUT> operator; + + /** + * {@link AsyncCollector} queue. + */ + private final SimpleLinkedList<AsyncCollector<IN, OUT>> queue = new SimpleLinkedList<>(); + /** + * A hash map keeping {@link AsyncCollector} and their corresponding {@link StreamElement} + */ + private final Map<AsyncCollector<IN, OUT>, StreamElement> collectorToStreamElement = new HashMap<>(); + /** + * A hash map keeping {@link AsyncCollector} and their node references in the #queue. + */ + private final Map<AsyncCollector<IN, OUT>, SimpleLinkedList.Node> collectorToQueue = new HashMap<>(); + + private final LinkedList<AsyncCollector> finishedCollectors = new LinkedList<>(); + + /** + * {@link TimestampedCollector} and {@link Output} to collect results and watermarks. + */ + private TimestampedCollector<OUT> timestampedCollector; + private Output<StreamRecord<OUT>> output; + + /** + * Locks and conditions to synchronize with main thread and emitter thread. + */ + private final Lock lock; + private final Condition notFull; + private final Condition taskDone; + private final Condition isEmpty; + + /** + * Error from user codes. + */ + private volatile Exception error; + + private final Emitter emitter; + private final Thread emitThread; + + private boolean isCheckpointing; + + public AsyncCollectorBuffer(int maxSize, AsyncDataStream.OutputMode mode, AsyncWaitOperator operator) { + Preconditions.checkArgument(maxSize > 0, "Future buffer size should be greater than 0."); + + this.bufferSize = maxSize; + this.mode = mode; + this.operator = operator; + + this.lock = new ReentrantLock(true); + this.notFull = this.lock.newCondition(); + this.taskDone = this.lock.newCondition(); + this.isEmpty = this.lock.newCondition(); + + this.emitter = new Emitter(); + this.emitThread = new Thread(emitter); + } + + /** + * Add an {@link StreamRecord} into the buffer. A new {@link AsyncCollector} will be created and returned + * corresponding to the input StreamRecord. + * <p> + * If buffer is full, caller will wait until new space is available. + * + * @param record StreamRecord + * @return An AsyncCollector + * @throws Exception InterruptedException or exceptions from AsyncCollector. + */ + public AsyncCollector<IN, OUT> add(StreamRecord<IN> record) throws Exception { + try { + lock.lock(); + + notifyCheckpointDone(); + + while (queue.size() >= bufferSize) { + notFull.await(); + } + + // propagate error to the main thread + if (error != null) { + throw error; + } + + AsyncCollector<IN, OUT> collector = new AsyncCollector(this); + + collectorToQueue.put(collector, queue.add(collector)); + collectorToStreamElement.put(collector, record); + + return collector; + } + finally { + lock.unlock(); + } + } + + /** + * Add a {@link Watermark} into queue. A new AsyncCollector will be created and returned. + * <p> + * If queue is full, caller will be blocked here. + * + * @param watermark Watermark + * @return AsyncCollector + * @throws Exception Exceptions from async operation. + */ + public AsyncCollector<IN, OUT> add(Watermark watermark) throws Exception { + return processMark(watermark); + } + + /** + * Add a {@link LatencyMarker} into queue. A new AsyncCollector will be created and returned. + * <p> + * If queue is full, caller will be blocked here. + * + * @param latencyMarker LatencyMarker + * @return AsyncCollector + * @throws Exception Exceptions from async operation. + */ + public AsyncCollector<IN, OUT> add(LatencyMarker latencyMarker) throws Exception { + return processMark(latencyMarker); + } + + private AsyncCollector<IN, OUT> processMark(StreamElement mark) throws Exception { + try { + lock.lock(); + + notifyCheckpointDone(); + + while (queue.size() >= bufferSize) + notFull.await(); + + if (error != null) { + throw error; + } + + AsyncCollector<IN, OUT> collector = new AsyncCollector(this, true); + + collectorToQueue.put(collector, queue.add(collector)); + collectorToStreamElement.put(collector, mark); + + // signal emitter thread that current collector is ready + mark(collector); + + return collector; + } + finally { + lock.unlock(); + } + } + + /** + * Notify the Emitter Thread that an AsyncCollector has completed. + * + * @param collector Completed AsyncCollector + */ + void mark(AsyncCollector<IN, OUT> collector) { + try { + lock.lock(); + + if (mode == AsyncDataStream.OutputMode.UNORDERED) { + finishedCollectors.add(collector); + } + + taskDone.signal(); + } + finally { + lock.unlock(); + } + } + + /** + * Caller will wait here if buffer is not empty, meaning that not all async i/o tasks have returned yet. + * + * @throws Exception InterruptedException or Exceptions from AsyncCollector. + */ + void waitEmpty() throws Exception { + try { + lock.lock(); + + notifyCheckpointDone(); + + while (queue.size() != 0) + isEmpty.await(); + + if (error != null) { + throw error; + } + } + finally { + lock.unlock(); + } + } + + public void startEmitterThread() { + this.emitThread.start(); + } + + public void stopEmitterThread() { + emitter.stop(); + + emitThread.interrupt(); + } + + /** + * Get all StreamElements in the AsyncCollector queue. + * <p> + * Emitter Thread can not output records and will wait for a while due to isCheckpointing flag + * until doing checkpoint has done. + * + * @return A List containing StreamElements. + */ + public List<StreamElement> getStreamElementsInBuffer() { + try { + lock.lock(); + + // stop emitter thread + isCheckpointing = true; + + List<StreamElement> ret = new ArrayList<>(); + for (int i = 0; i < queue.size(); ++i) { + AsyncCollector<IN, OUT> collector = queue.get(i); + ret.add(collectorToStreamElement.get(collector)); + } + + return ret; + } + finally { + lock.unlock(); + } + } + + public void setOutput(TimestampedCollector<OUT> collector, Output<StreamRecord<OUT>> output) { + this.timestampedCollector = collector; + this.output = output; + } + + public void notifyCheckpointDone() { + this.isCheckpointing = false; + this.taskDone.signalAll(); + } + + /** + * A working thread to output results from {@link AsyncCollector} to the next operator. + */ + private class Emitter implements Runnable { + private volatile boolean running = true; + + private void output(AsyncCollector collector) throws Exception { + List<OUT> result = collector.getResult(); + + // update timestamp for output stream records based on the input stream record. + StreamElement element = collectorToStreamElement.get(collector); + if (element == null) { + throw new Exception("No input stream record or watermark for current AsyncCollector: "+collector); + } + + if (element.isRecord()) { + if (result == null) { + throw new Exception("Result for stream record "+element+" is null"); + } + + timestampedCollector.setTimestamp(element.asRecord()); + for (OUT val : result) { + timestampedCollector.collect(val); + } + } + else if (element.isWatermark()) { + output.emitWatermark(element.asWatermark()); + } + else if (element.isLatencyMarker()) { + operator.sendLatencyMarker(element.asLatencyMarker()); + } + else { + throw new Exception("Unknown input record: "+element); + } + } + + private void clearInfoInMaps(AsyncCollector collector) { + collectorToStreamElement.remove(collector); + collectorToQueue.remove(collector); + } + + /** + * Emit results from the finished head collector and its following finished ones. + */ + private void orderedProcess() { + while (queue.size() > 0) { + try { + AsyncCollector collector = queue.get(0); + if (!collector.isDone()) { + break; + } + + output(collector); + + queue.remove(0); + clearInfoInMaps(collector); + + notFull.signal(); + } + catch (Exception e) { + error = e; + break; + } + } + } + + /** + * Emit results for each finished collector. + */ + private void unorderedProcess() { + AsyncCollector collector = finishedCollectors.pollFirst(); + while (collector != null) { + try { + output(collector); + + queue.remove(collectorToQueue.get(collector)); + clearInfoInMaps(collector); + + notFull.signal(); + + collector = finishedCollectors.pollFirst(); + } + catch (Exception e) { + error = e; + break; + } + } + } + + /** + * If some bad things happened(like exceptions from async i/o), the operator tries to fail + * itself at: + * {@link AsyncWaitOperator#processElement}, triggered by calling {@link AsyncCollectorBuffer#add}. + * {@link AsyncWaitOperator#snapshotState} + * {@link AsyncWaitOperator#close} while calling {@link AsyncCollectorBuffer#waitEmpty} + * + * It is necessary for Emitter Thread to notify methods blocking on notFull/isEmpty. + */ + private void processError() { + queue.clear(); + finishedCollectors.clear(); + collectorToQueue.clear(); + collectorToStreamElement.clear(); + + notFull.signalAll(); + isEmpty.signalAll(); + } + + /** + * If + * In ordered mode, there are some finished async collectors, and one of them is the first element in + * the queue. + * or + * In unordered mode, there are some finished async collectors. + * or + * The first element in the queue is Watermark. + * Then, the emitter thread should keep waiting, rather than waiting on the condition. + * + * Otherwise, the thread should stop for a while until being signalled. + */ + private boolean nothingToDo() { + if (queue.size() == 0) { + isEmpty.signalAll(); + return true; + } + + // while doing checkpoints, emitter thread should not try to output records. + if (isCheckpointing) { + return true; + } + + // check head element of the queue, it is OK to process Watermark or LatencyMarker + if (collectorToStreamElement.get(queue.get(0)).isWatermark() || + collectorToStreamElement.get(queue.get(0)).isLatencyMarker()) + { + return false; + } + + if (mode == AsyncDataStream.OutputMode.UNORDERED) { + // no finished async collector... + if (finishedCollectors.size() == 0) { + return true; + } + else { + return false; + } + } + else { + // for ORDERED mode, make sure the first collector in the queue has been done. + AsyncCollector collector = queue.get(0); + if (collector.isDone() == false) { + return true; + } + else { + return false; + } + } + } + + @Override + public void run() { + while (running) { + try { + lock.lock(); + + if (error != null) { + // stop processing finished async collectors, and try to wake up blocked main + // thread or checkpoint thread repeatedly. + processError(); + Thread.sleep(1000); --- End diff -- Sorry for the delayed response. Currently, the main thread will detect errors while: 1. `AsyncWaitOperator.processElement()`, thrown by `AsyncCollectorBuffer.addStreamRecord()`. 2. `AsyncWaitOperator.processWatermark()` and `AsyncWaitOperator.processLatencyMarker()`, thrown by `AsyncCollectorBuffer.processMark()`. 3. `AsyncWaitOperator.close()`, thrown by `AsyncWaitOperator.waitEmpty()`. The first two conditions are for the case when not all inputs have arrived yet. The last one is for closing the operator while processing finite streaming job. > Provide support for asynchronous operations over streams > -------------------------------------------------------- > > Key: FLINK-4391 > URL: https://issues.apache.org/jira/browse/FLINK-4391 > Project: Flink > Issue Type: New Feature > Components: DataStream API > Reporter: Jamie Grier > Assignee: david.wang > > Many Flink users need to do asynchronous processing driven by data from a > DataStream. The classic example would be joining against an external > database in order to enrich a stream with extra information. > It would be nice to add general support for this type of operation in the > Flink API. Ideally this could simply take the form of a new operator that > manages async operations, keeps so many of them in flight, and then emits > results to downstream operators as the async operations complete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)