Hi to all, I'm trying to test a streaming job but the files written by the BucketingSink are never finalized (remains into the pending state). Is this caused by the fact that the job finishes before the checkpoint? Shouldn't the sink properly close anyway?
This is my code: @Test public void testBucketingSink() throws Exception { final StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(senv); senv.enableCheckpointing(5000); DataStream<String> testStream = senv.fromElements(// "1,aaa,white", // "2,bbb,gray", // "3,ccc,white", // "4,bbb,gray", // "5,bbb,gray" // ); final RowTypeInfo rtf = new RowTypeInfo( BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() { private static final long serialVersionUID = 1L; @Override public Row map(String str) throws Exception { String[] split = str.split(Pattern.quote(",")); Row ret = new Row(3); ret.setField(0, split[0]); ret.setField(1, split[1]); ret.setField(2, split[2]); return ret; } }).returns(rtf); String columnNames = "id,value,state"; final String dsName = "test"; tEnv.registerDataStream(dsName, rows, columnNames); final String whiteAreaFilter = "state = 'white'"; DataStream<Row> grayArea = rows; DataStream<Row> whiteArea = null; if (whiteAreaFilter != null) { String sql = "SELECT *, (%s) as _WHITE FROM %s"; sql = String.format(sql, whiteAreaFilter, dsName); Table table = tEnv.sql(sql); grayArea = tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf); DataStream<Row> nw = tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf); whiteArea = whiteArea == null ? nw : whiteArea.union(nw); } Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n"); String datasetWhiteDir = "/tmp/bucket/white"; BucketingSink<Row> whiteAreaSink = new BucketingSink<>(datasetWhiteDir.toString()); whiteAreaSink.setWriter(bucketSinkwriter); whiteAreaSink.setBatchSize(10); whiteArea.addSink(whiteAreaSink); String datasetGrayDir = "/tmp/bucket/gray"; BucketingSink<Row> grayAreaSink = new BucketingSink<>(datasetGrayDir.toString()); grayAreaSink.setWriter(bucketSinkwriter); grayAreaSink.setBatchSize(10); grayArea.addSink(grayAreaSink); JobExecutionResult jobInfo = senv.execute("Buketing sink test "); System.out.printf("Job took %s minutes", jobInfo.getNetRuntime(TimeUnit.MINUTES)); } public class RowCsvWriter extends StreamWriterBase<Row> { private static final long serialVersionUID = 1L; private final String charsetName; private transient Charset charset; private String fieldDelimiter; private String recordDelimiter; private boolean allowNullValues = true; private boolean quoteStrings = false; /** * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert strings to * bytes. */ public RowCsvWriter() { this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, CsvOutputFormat.DEFAULT_LINE_DELIMITER); } /** * Creates a new {@code StringWriter} that uses the given charset to convert strings to bytes. * * @param charsetName Name of the charset to be used, must be valid input for * {@code Charset.forName(charsetName)} */ public RowCsvWriter(String charsetName, String fieldDelimiter, String recordDelimiter) { this.charsetName = charsetName; this.fieldDelimiter = fieldDelimiter; this.recordDelimiter = recordDelimiter; } @Override public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); try { this.charset = Charset.forName(charsetName); } catch (IllegalCharsetNameException ex) { throw new IOException("The charset " + charsetName + " is not valid.", ex); } catch (UnsupportedCharsetException ex) { throw new IOException("The charset " + charsetName + " is not supported.", ex); } } @Override public void write(Row element) throws IOException { FSDataOutputStream outputStream = getStream(); writeRow(element, outputStream); } private void writeRow(Row element, FSDataOutputStream out) throws IOException { int numFields = element.getArity(); for (int i = 0; i < numFields; i++) { Object obj = element.getField(i); if (obj != null) { if (i != 0) { out.write(this.fieldDelimiter.getBytes(charset)); } if (quoteStrings) { if (obj instanceof String || obj instanceof StringValue) { out.write('"'); out.write(obj.toString().getBytes(charset)); out.write('"'); } else { out.write(obj.toString().getBytes(charset)); } } else { out.write(obj.toString().getBytes(charset)); } } else { if (this.allowNullValues) { if (i != 0) { out.write(this.fieldDelimiter.getBytes(charset)); } } else { throw new RuntimeException("Cannot write tuple with <null> value at position: " + i); } } } // add the record delimiter out.write(this.recordDelimiter.getBytes(charset)); } @Override public Writer<Row> duplicate() { return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter); } } Any help is appreciated, Flavio