sjwiesman commented on a change in pull request #13752: URL: https://github.com/apache/flink/pull/13752#discussion_r511957050
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ########## @@ -1294,6 +1303,92 @@ public ExecutionConfig getExecutionConfig() { return sink; } + /** + * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements + * of the given DataStream. + * + * <p>The DataStream application is executed in the regular distributed manner on the target environment, + * and the events from the stream are polled back to this application process and thread through + * Flink's REST API. + * + *<p><b>IMPORTANT</b> The returned iterator must be closed to free all cluster resources. + */ + public CloseableIterator<T> executeAndCollect() throws Exception { + return executeAndCollect("DataStream Collect"); + } + + /** + * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements + * of the given DataStream. + * + * <p>The DataStream application is executed in the regular distributed manner on the target environment, + * and the events from the stream are polled back to this application process and thread through + * Flink's REST API. + * + *<p><b>IMPORTANT</b> The returned iterator must be closed to free all cluster resources. + */ + public CloseableIterator<T> executeAndCollect(String jobExecutionName) throws Exception { + return executeAndCollectWithClient(jobExecutionName).iterator; + } + + /** + * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements + * of the given DataStream. + * + * <p>The DataStream application is executed in the regular distributed manner on the target environment, + * and the events from the stream are polled back to this application process and thread through + * Flink's REST API. + */ + public List<T> executeAndCollect(int limit) throws Exception { + return executeAndCollect("DataStream Collect", limit); + } + + /** + * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements + * of the given DataStream. + * + * <p>The DataStream application is executed in the regular distributed manner on the target environment, + * and the events from the stream are polled back to this application process and thread through + * Flink's REST API. + */ + public List<T> executeAndCollect(String jobExecutionName, int limit) throws Exception { + Preconditions.checkState(limit > 0, "Limit must be greater than 0"); + + ClientAndIterator<T> clientAndIterator = executeAndCollectWithClient(jobExecutionName); + + try { + List<T> results = new ArrayList<>(limit); + while (clientAndIterator.iterator.hasNext() && limit > 0) { + results.add(clientAndIterator.iterator.next()); + limit--; + } + + return results; + } finally { + clientAndIterator.iterator.close(); + clientAndIterator.client.cancel(); + } + } + + private ClientAndIterator<T> executeAndCollectWithClient(String jobExecutionName) throws Exception { Review comment: `DataStream` depending on `DataStreamUtils` seemed dirty to me. I realized I could have the dependency go the other way so I removed the duplication. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org