Ah, sorry, one correction. Just realized there’s already some analysis of the 
BucketingSink closing issue in this mail thread.
Please ignore my request for relevant logs :)


On 13 September 2017 at 10:56:10 AM, Tzu-Li (Gordon) Tai (tzuli...@apache.org) 
wrote:

Hi Flavio,

Let me try to understand / look at some of the problems you have encountered.
checkpointing: it's not clear which checkpointing system to use and how to 
tune/monitor it and avoid OOM exceptions.
What do you mean be which "checkpointing system” to use? Do you mean state 
backends? Typically, you would only get OOM exceptions for memory-backed state 
backends if the state size exceeds the memory capacity. State sizes can be 
queried from the REST APIs / Web UI.
cleanup: BucketingSink doesn't always move to final state
This sounds like a bug that we should look into. Do you have any logs on which 
you observed this?

missing output formats: parquet support to write generic Rows not very well 
supported (at least out of the box) [1]
Would you be up to opening up JIRAs for what you think is missing (if there 
isn’t one already)?

progress monitoring: for example in the ES connector there's no way (apart from 
using accumulators) to monitor the progress of the indexing
Maybe we can add some built-in metric in the ES sink connector that tracks the 
number of successfully indexed elements, which can then be queried from the 
REST API / Web UI. That wouldn’t be too much effort. What do you think, would 
that be useful for your case?

Would be happy to hear your thoughts on this!

Cheers,
Gordon


On 12 September 2017 at 11:36:27 AM, Flavio Pompermaier (pomperma...@okkam.it) 
wrote:

For the moment I give up with streaming...too many missing/unclear features wrt 
batch. 
For example:
checkpointing: it's not clear which checkpointing system to use and how to 
tune/monitor it and avoid OOM exceptions. Moreover is it really necessary to 
use it? For example if I read a file from HDFS and I don't have a checkpoint it 
could be ok to re-run the job on all the data in case of errors (i.e. the 
stream is managed like a batch)
cleanup: BucketingSink doesn't always move to final state
missing output formats: parquet support to write generic Rows not very well 
supported (at least out of the box) [1]
progress monitoring: for example in the ES connector there's no way (apart from 
using accumulators) to monitor the progress of the indexing
[1] 
https://stackoverflow.com/questions/41144659/flink-avro-parquet-writer-in-rollingsink

Maybe I'm wrong with those points but the attempt to replace my current batch 
system with a streaming one had no luck with those points.

Best,
Flavio

On Fri, Sep 8, 2017 at 5:29 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
Hi,

Expanding a bit on Kostas' answer. Yes, your analysis is correct, the problem 
is that the job is shutting down before a last checkpoint can "confirm" the 
written bucket data by moving it to the final state. The problem, as Kostas 
noted is that a user function (and thus also BucketingSink) does not know 
whether close() is being called because of a failure or because normal job 
shutdown. Therefore, we cannot move data to the final stage there.

Once we have the issue that Kostas posted resolve we can also resolve this 
problem for the BucketingSink.

Best,
Aljoscha

On 8. Sep 2017, at 16:48, Kostas Kloudas <k.klou...@data-artisans.com> wrote:

Hi Flavio,

If I understand correctly, I think you bumped into this issue: 
https://issues.apache.org/jira/browse/FLINK-2646

There is also a similar discussion on the BucketingSink here: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td14466.html#a14468

Kostas

On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote:

Hi to all,
I'm trying to test a streaming job but the files written by the BucketingSink 
are never finalized (remains into the pending state).
Is this caused by the fact that the job finishes before the checkpoint?
Shouldn't the sink properly close anyway?

This is my code:

  @Test
  public void testBucketingSink() throws Exception {
    final StreamExecutionEnvironment senv = 
StreamExecutionEnvironment.getExecutionEnvironment();
    final StreamTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(senv);
    senv.enableCheckpointing(5000);
    DataStream<String> testStream = senv.fromElements(//
        "1,aaa,white", //
        "2,bbb,gray", //
        "3,ccc,white", //
        "4,bbb,gray", //
        "5,bbb,gray" //
    );
    final RowTypeInfo rtf = new RowTypeInfo(
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO, 
        BasicTypeInfo.STRING_TYPE_INFO);
    DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() {

      private static final long serialVersionUID = 1L;

      @Override
      public Row map(String str) throws Exception {
        String[] split = str.split(Pattern.quote(","));
        Row ret = new Row(3);
        ret.setField(0, split[0]);
        ret.setField(1, split[1]);
        ret.setField(2, split[2]);
        return ret;
      }
    }).returns(rtf);

    String columnNames = "id,value,state";
    final String dsName = "test";
    tEnv.registerDataStream(dsName, rows, columnNames);
    final String whiteAreaFilter = "state = 'white'";
    DataStream<Row> grayArea = rows;
    DataStream<Row> whiteArea = null;
    if (whiteAreaFilter != null) {
      String sql = "SELECT *, (%s) as _WHITE FROM %s";
      sql = String.format(sql, whiteAreaFilter, dsName);
      Table table = tEnv.sql(sql);
      grayArea = tEnv.toDataStream(table.where("!_WHITE").select(columnNames), 
rtf);
      DataStream<Row> nw = 
tEnv.toDataStream(table.where("_WHITE").select(columnNames), rtf);
      whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
    }
    Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");

    String datasetWhiteDir = "/tmp/bucket/white";
    BucketingSink<Row> whiteAreaSink = new 
BucketingSink<>(datasetWhiteDir.toString());
    whiteAreaSink.setWriter(bucketSinkwriter);
    whiteAreaSink.setBatchSize(10);
    whiteArea.addSink(whiteAreaSink);

    String datasetGrayDir = "/tmp/bucket/gray";
    BucketingSink<Row> grayAreaSink = new 
BucketingSink<>(datasetGrayDir.toString());
    grayAreaSink.setWriter(bucketSinkwriter);
    grayAreaSink.setBatchSize(10);
    grayArea.addSink(grayAreaSink);

    JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
    System.out.printf("Job took %s minutes", 
jobInfo.getNetRuntime(TimeUnit.MINUTES));
  }







public class RowCsvWriter extends StreamWriterBase<Row> {
  private static final long serialVersionUID = 1L;

  private final String charsetName;
  private transient Charset charset;
  private String fieldDelimiter;
  private String recordDelimiter;
  private boolean allowNullValues = true;
  private boolean quoteStrings = false;

  /**
   * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to 
convert strings to
   * bytes.
   */
  public RowCsvWriter() {
    this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, 
CsvOutputFormat.DEFAULT_LINE_DELIMITER);
  }

  /**
   * Creates a new {@code StringWriter} that uses the given charset to convert 
strings to bytes.
   *
   * @param charsetName Name of the charset to be used, must be valid input for
   *        {@code Charset.forName(charsetName)}
   */
  public RowCsvWriter(String charsetName, String fieldDelimiter, String 
recordDelimiter) {
    this.charsetName = charsetName;
    this.fieldDelimiter = fieldDelimiter;
    this.recordDelimiter = recordDelimiter;
  }

  @Override
  public void open(FileSystem fs, Path path) throws IOException {
    super.open(fs, path);

    try {
      this.charset = Charset.forName(charsetName);
    } catch (IllegalCharsetNameException ex) {
      throw new IOException("The charset " + charsetName + " is not valid.", 
ex);
    } catch (UnsupportedCharsetException ex) {
      throw new IOException("The charset " + charsetName + " is not 
supported.", ex);
    }
  }

  @Override
  public void write(Row element) throws IOException {
    FSDataOutputStream outputStream = getStream();
    writeRow(element, outputStream);
  }

  private void writeRow(Row element, FSDataOutputStream out) throws IOException 
{
    int numFields = element.getArity();

    for (int i = 0; i < numFields; i++) {
      Object obj = element.getField(i);
      if (obj != null) {
        if (i != 0) {
          out.write(this.fieldDelimiter.getBytes(charset));
        }

        if (quoteStrings) {
          if (obj instanceof String || obj instanceof StringValue) {
            out.write('"');
            out.write(obj.toString().getBytes(charset));
            out.write('"');
          } else {
            out.write(obj.toString().getBytes(charset));
          }
        } else {
          out.write(obj.toString().getBytes(charset));
        }
      } else {
        if (this.allowNullValues) {
          if (i != 0) {
            out.write(this.fieldDelimiter.getBytes(charset));
          }
        } else {
          throw new RuntimeException("Cannot write tuple with <null> value at 
position: " + i);
        }
      }
    }

    // add the record delimiter
    out.write(this.recordDelimiter.getBytes(charset));
  }

  @Override
  public Writer<Row> duplicate() {
    return new RowCsvWriter(charsetName, fieldDelimiter, recordDelimiter);
  }
}



Any help is appreciated,
Flavio



Reply via email to