Hi, Expanding a bit on Kostas' answer. Yes, your analysis is correct, the problem is that the job is shutting down before a last checkpoint can "confirm" the written bucket data by moving it to the final state. The problem, as Kostas noted is that a user function (and thus also BucketingSink) does not know whether close() is being called because of a failure or because normal job shutdown. Therefore, we cannot move data to the final stage there.
Once we have the issue that Kostas posted resolve we can also resolve this problem for the BucketingSink. Best, Aljoscha > On 8. Sep 2017, at 16:48, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > > Hi Flavio, > > If I understand correctly, I think you bumped into this issue: > https://issues.apache.org/jira/browse/FLINK-2646 > <https://issues.apache.org/jira/browse/FLINK-2646> > > There is also a similar discussion on the BucketingSink here: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468 > > <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468> > > Kostas > >> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pomperma...@okkam.it >> <mailto:pomperma...@okkam.it>> wrote: >> >> Hi to all, >> I'm trying to test a streaming job but the files written by the >> BucketingSink are never finalized (remains into the pending state). >> Is this caused by the fact that the job finishes before the checkpoint? >> Shouldn't the sink properly close anyway? >> >> This is my code: >> >> @Test >> public void testBucketingSink() throws Exception { >> final StreamExecutionEnvironment senv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> final StreamTableEnvironment tEnv = >> TableEnvironment.getTableEnvironment(senv); >> senv.enableCheckpointing(5000); >> DataStream<String> testStream = senv.fromElements(// >> "1,aaa,white", // >> "2,bbb,gray", // >> "3,ccc,white", // >> "4,bbb,gray", // >> "5,bbb,gray" // >> ); >> final RowTypeInfo rtf = new RowTypeInfo( >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO); >> DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() { >> >> private static final long serialVersionUID = 1L; >> >> @Override >> public Row map(String str) throws Exception { >> String[] split = str.split(Pattern.quote(",")); >> Row ret = new Row(3); >> ret.setField(0, split[0]); >> ret.setField(1, split[1]); >> ret.setField(2, split[2]); >> return ret; >> } >> }).returns(rtf); >> >> String columnNames = "id,value,state"; >> final String dsName = "test"; >> tEnv.registerDataStream(dsName, rows, columnNames); >> final String whiteAreaFilter = "state = 'white'"; >> DataStream<Row> grayArea = rows; >> DataStream<Row> whiteArea = null; >> if (whiteAreaFilter != null) { >> String sql = "SELECT *, (%s) as _WHITE FROM %s"; >> sql = String.format(sql, whiteAreaFilter, dsName); >> Table table = tEnv.sql(sql); >> grayArea = >> tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf); >> DataStream<Row> nw = >> tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf); >> whiteArea = whiteArea == null ? nw : whiteArea.union(nw); >> } >> Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n"); >> >> String datasetWhiteDir = "/tmp/bucket/white"; >> BucketingSink<Row> whiteAreaSink = new >> BucketingSink<>(datasetWhiteDir.toString()); >> whiteAreaSink.setWriter(bucketSinkwriter); >> whiteAreaSink.setBatchSize(10); >> whiteArea.addSink(whiteAreaSink); >> >> String datasetGrayDir = "/tmp/bucket/gray"; >> BucketingSink<Row> grayAreaSink = new >> BucketingSink<>(datasetGrayDir.toString()); >> grayAreaSink.setWriter(bucketSinkwriter); >> grayAreaSink.setBatchSize(10); >> grayArea.addSink(grayAreaSink); >> >> JobExecutionResult jobInfo = senv.execute("Buketing sink test "); >> System.out.printf("Job took %s minutes", >> jobInfo.getNetRuntime(TimeUnit.MINUTES)); >> } >> >> >> >> >> >> >> >> public class RowCsvWriter extends StreamWriterBase<Row> { >> private static final long serialVersionUID = 1L; >> >> private final String charsetName; >> private transient Charset charset; >> private String fieldDelimiter; >> private String recordDelimiter; >> private boolean allowNullValues = true; >> private boolean quoteStrings = false; >> >> /** >> * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to >> convert strings to >> * bytes. >> */ >> public RowCsvWriter() { >> this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, >> CsvOutputFormat.DEFAULT_LINE_DELIMITER); >> } >> >> /** >> * Creates a new {@code StringWriter} that uses the given charset to >> convert strings to bytes. >> * >> * @param charsetName Name of the charset to be used, must be valid input >> for >> * {@code Charset.forName(charsetName)} >> */ >> public RowCsvWriter(String charsetName, String fieldDelimiter, String >> recordDelimiter) { >> this.charsetName = charsetName; >> this.fieldDelimiter = fieldDelimiter; >> this.recordDelimiter = recordDelimiter; >> } >> >> @Override >> public void open(FileSystem fs, Path path) throws IOException { >> super.open(fs, path); >> >> try { >> this.charset = Charset.forName(charsetName); >> } catch (IllegalCharsetNameException ex) { >> throw new IOException("The charset " + charsetName + " is not valid.", >> ex); >> } catch (UnsupportedCharsetException ex) { >> throw new IOException("The charset " + charsetName + " is not >> supported.", ex); >> } >> } >> >> @Override >> public void write(Row element) throws IOException { >> FSDataOutputStream outputStream = getStream(); >> writeRow(element, outputStream); >> } >> >> private void writeRow(Row element, FSDataOutputStream out) throws >> IOException { >> int numFields = element.getArity(); >> >> for (int i = 0; i < numFields; i++) { >> Object obj = element.getField(i); >> if (obj != null) { >> if (i != 0) { >> out.write(this.fieldDelimiter.getBytes(charset)); >> } >> >> if (quoteStrings) { >> if (obj instanceof String || obj instanceof StringValue) { >> out.write('"'); >> out.write(obj.toString().getBytes(charset)); >> out.write('"'); >> } else { >> out.write(obj.toString().getBytes(charset)); >> } >> } else { >> out.write(obj.toString().getBytes(charset)); >> } >> } else { >> if (this.allowNullValues) { >> if (i != 0) { >> out.write(this.fieldDelimiter.getBytes(charset)); >> } >> } else { >> throw new RuntimeException("Cannot write tuple with <null> value >> at position: " + i); >> } >> } >> } >> >> // add the record delimiter >> out.write(this.recordDelimiter.getBytes(charset)); >> } >> >> @Override >> public Writer<Row> duplicate() { >> return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter); >> } >> } >> >> >> >> Any help is appreciated, >> Flavio >