Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205108561 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.tests; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer) + * <ul> + * <li>String: key, can be repeated.</li> + * <li>Integer: uniformly distributed int between 0 and 127</li> + * </ul> + * + * <p>If control path was provided, as long as this file is empty dummy elements with value equal to -1 will be emitted. + */ +public class Generator implements InputFormat<Tuple2<String, Integer>, GenericInputSplit> { + + // total number of records + private final long numRecords; + // total number of keys + private final long numKeys; + + // records emitted per partition + private long recordsPerPartition; + // number of keys per partition + private long keysPerPartition; + + // number of currently emitted records + private long recordCnt; + + // id of current partition + private int partitionId; + + private final boolean infinite; + + public Generator(long numKeys, int recordsPerKey) { + this(numKeys, recordsPerKey, false); + } + + private Generator(long numKeys, int recordsPerKey, boolean infinite) { + this.numKeys = numKeys; + this.numRecords = numKeys * recordsPerKey; + this.infinite = infinite; + } + + public static Generator infinite() { + return new Generator(Long.MAX_VALUE, 1, true); + } + + @Override + public void configure(Configuration parameters) { } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + return null; + } + + @Override + public GenericInputSplit[] createInputSplits(int minNumSplits) { + + GenericInputSplit[] splits = new GenericInputSplit[minNumSplits]; + for (int i = 0; i < minNumSplits; i++) { + splits[i] = new GenericInputSplit(i, minNumSplits); + } + return splits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + @Override + public void open(GenericInputSplit split) throws IOException { + this.partitionId = split.getSplitNumber(); + // total number of partitions + int numPartitions = split.getTotalNumberOfSplits(); + + // ensure even distribution of records and keys + Preconditions.checkArgument( + numRecords % numPartitions == 0, + "Records cannot be evenly distributed among partitions"); + Preconditions.checkArgument( + numKeys % numPartitions == 0, + "Keys cannot be evenly distributed among partitions"); + + this.recordsPerPartition = numRecords / numPartitions; + this.keysPerPartition = numKeys / numPartitions; + + this.recordCnt = 0; + } + + @Override + public boolean reachedEnd() { + return this.recordCnt >= this.recordsPerPartition; + } + + @Override + public Tuple2<String, Integer> nextRecord(Tuple2<String, Integer> reuse) throws IOException { + + if (infinite) { --- End diff -- I think we should either modify `reachedEnd` to `this.recordCnt >= this.recordsPerPartition || infinite` or introduce a separate generator class, as in the current state the infinite/finite behaviors are entirely different.
---