Hi to all,
I'm trying to test a streaming job but the files written by
the BucketingSink are never finalized (remains into the pending state).
Is this caused by the fact that the job finishes before the checkpoint?
Shouldn't the sink properly close anyway?

This is my code:

  @Test
  public void testBucketingSink() throws Exception {
    final StreamExecutionEnvironment senv =
StreamExecutionEnvironment.getExecutionEnvironment();
    final StreamTableEnvironment tEnv =
TableEnvironment.getTableEnvironment(senv);
    senv.enableCheckpointing(5000);
    DataStream<String> testStream = senv.fromElements(//
        "1,aaa,white", //
        "2,bbb,gray", //
        "3,ccc,white", //
        "4,bbb,gray", //
        "5,bbb,gray" //
    );
    final RowTypeInfo rtf = new RowTypeInfo(
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO);
    DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {

      private static final long serialVersionUID = 1L;

      @Override
      public Row map(String str) throws Exception {
        String[] split = str.split(Pattern.quote(","));
        Row ret = new Row(3);
        ret.setField(0, split[0]);
        ret.setField(1, split[1]);
        ret.setField(2, split[2]);
        return ret;
      }
    }).returns(rtf);

    String columnNames = "id,value,state";
    final String dsName = "test";
    tEnv.registerDataStream(dsName, rows, columnNames);
    final String whiteAreaFilter = "state = 'white'";
    DataStream<Row> grayArea = rows;
    DataStream<Row> whiteArea = null;
    if (whiteAreaFilter != null) {
      String sql = "SELECT *, (%s) as _WHITE FROM %s";
      sql = String.format(sql, whiteAreaFilter, dsName);
      Table table = tEnv.sql(sql);
      grayArea =
tEnv.toDataStream(table.where("!_WHITE").select(columnNames), rtf);
      DataStream<Row> nw =
tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf);
      whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
    }
    Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");

    String datasetWhiteDir = "/tmp/bucket/white";
    BucketingSink<Row> whiteAreaSink = new
BucketingSink<>(datasetWhiteDir.toString());
    whiteAreaSink.setWriter(bucketSinkwriter);
    whiteAreaSink.setBatchSize(10);
    whiteArea.addSink(whiteAreaSink);

    String datasetGrayDir = "/tmp/bucket/gray";
    BucketingSink<Row> grayAreaSink = new
BucketingSink<>(datasetGrayDir.toString());
    grayAreaSink.setWriter(bucketSinkwriter);
    grayAreaSink.setBatchSize(10);
    grayArea.addSink(grayAreaSink);

    JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
    System.out.printf("Job took %s minutes",
jobInfo.getNetRuntime(TimeUnit.MINUTES));
  }







public class RowCsvWriter extends StreamWriterBase<Row> {
  private static final long serialVersionUID = 1L;

  private final String charsetName;
  private transient Charset charset;
  private String fieldDelimiter;
  private String recordDelimiter;
  private boolean allowNullValues = true;
  private boolean quoteStrings = false;

  /**
   * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset
to convert strings to
   * bytes.
   */
  public RowCsvWriter() {
    this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER,
CsvOutputFormat.DEFAULT_LINE_DELIMITER);
  }

  /**
   * Creates a new {@code StringWriter} that uses the given charset to
convert strings to bytes.
   *
   * @param charsetName Name of the charset to be used, must be valid input
for
   *        {@code Charset.forName(charsetName)}
   */
  public RowCsvWriter(String charsetName, String fieldDelimiter, String
recordDelimiter) {
    this.charsetName = charsetName;
    this.fieldDelimiter = fieldDelimiter;
    this.recordDelimiter = recordDelimiter;
  }

  @Override
  public void open(FileSystem fs, Path path) throws IOException {
    super.open(fs, path);

    try {
      this.charset = Charset.forName(charsetName);
    } catch (IllegalCharsetNameException ex) {
      throw new IOException("The charset " + charsetName + " is not
valid.", ex);
    } catch (UnsupportedCharsetException ex) {
      throw new IOException("The charset " + charsetName + " is not
supported.", ex);
    }
  }

  @Override
  public void write(Row element) throws IOException {
    FSDataOutputStream outputStream = getStream();
    writeRow(element, outputStream);
  }

  private void writeRow(Row element, FSDataOutputStream out) throws
IOException {
    int numFields = element.getArity();

    for (int i = 0; i < numFields; i++) {
      Object obj = element.getField(i);
      if (obj != null) {
        if (i != 0) {
          out.write(this.fieldDelimiter.getBytes(charset));
        }

        if (quoteStrings) {
          if (obj instanceof String || obj instanceof StringValue) {
            out.write('"');
            out.write(obj.toString().getBytes(charset));
            out.write('"');
          } else {
            out.write(obj.toString().getBytes(charset));
          }
        } else {
          out.write(obj.toString().getBytes(charset));
        }
      } else {
        if (this.allowNullValues) {
          if (i != 0) {
            out.write(this.fieldDelimiter.getBytes(charset));
          }
        } else {
          throw new RuntimeException("Cannot write tuple with <null> value
at position: " + i);
        }
      }
    }

    // add the record delimiter
    out.write(this.recordDelimiter.getBytes(charset));
  }

  @Override
  public Writer<Row> duplicate() {
    return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter);
  }
}



Any help is appreciated,
Flavio

Reply via email to