I've just looked at Robert presentation at FF [1] and that's exactly what I
was waiting for streaming planning/training...
Very useful ;)

[1] https://www.youtube.com/watch?v=8l8dCKMMWkw

On Wed, Sep 13, 2017 at 12:04 PM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Hi Gordon,
> thanks for your feedback. The main problem for me is that moving from
> batch to stream should be much easier IMHO.
>
> Rows should be a first class citizen in Flink and should be VERY easy to
> read/write them, while at the moment it seems that Tuples are the
> dominating type...I don't want to write a serializer/outputFormat to
> persist Rows as Parquet, Avro, Thrift, OCR, Kudu, Hive, etc..I expect to
> have some already existing (and mantained) connector already available
> somewhere. The case of the Parquet Rollink sink is just an example.
>
> Regarding state backends I think that its not so easy to understand how to
> design and monitor it properly: there are many parameters/variables to take
> into account and it would be helpful to have a proper hands-on training
> course/certification about this...
>
> About ES indexing monitoring see my discussion with Chesnay at
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> Streaming-job-monitoring-td13583.html: what I need is just to have
> recordsIn/recordsOut reflecting real values.
>
> Best,
> Flavio
>
> On Wed, Sep 13, 2017 at 10:56 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
> > wrote:
>
>> Hi Flavio,
>>
>> Let me try to understand / look at some of the problems you have
>> encountered.
>>
>>
>>    - checkpointing: it's not clear which checkpointing system to use and
>>    how to tune/monitor it and avoid OOM exceptions.
>>
>> What do you mean be which "checkpointing system” to use? Do you mean
>> state backends? Typically, you would only get OOM exceptions for
>> memory-backed state backends if the state size exceeds the memory capacity.
>> State sizes can be queried from the REST APIs / Web UI.
>>
>>
>>    - cleanup: BucketingSink doesn't always move to final state
>>
>> This sounds like a bug that we should look into. Do you have any logs on
>> which you observed this?
>>
>>
>>    - missing output formats: parquet support to write generic Rows not
>>    very well supported (at least out of the box) [1]
>>
>> Would you be up to opening up JIRAs for what you think is missing (if
>> there isn’t one already)?
>>
>>
>>    - progress monitoring: for example in the ES connector there's no way
>>    (apart from using accumulators) to monitor the progress of the indexing
>>
>> Maybe we can add some built-in metric in the ES sink connector that
>> tracks the number of successfully indexed elements, which can then be
>> queried from the REST API / Web UI. That wouldn’t be too much effort. What
>> do you think, would that be useful for your case?
>> Would be happy to hear your thoughts on this!
>>
>> Cheers,
>> Gordon
>>
>>
>> On 12 September 2017 at 11:36:27 AM, Flavio Pompermaier (
>> pomperma...@okkam.it) wrote:
>>
>> For the moment I give up with streaming...too many missing/unclear
>> features wrt batch.
>> For example:
>>
>>    - checkpointing: it's not clear which checkpointing system to use and
>>    how to tune/monitor it and avoid OOM exceptions. Moreover is it really
>>    necessary to use it? For example if I read a file from HDFS and I don't
>>    have a checkpoint it could be ok to re-run the job on all the data in case
>>    of errors (i.e. the stream is managed like a batch)
>>    - cleanup: BucketingSink doesn't always move to final state
>>    - missing output formats: parquet support to write generic Rows not
>>    very well supported (at least out of the box) [1]
>>    - progress monitoring: for example in the ES connector there's no way
>>    (apart from using accumulators) to monitor the progress of the indexing
>>
>> [1] https://stackoverflow.com/questions/41144659/flink-avro-
>> parquet-writer-in-rollingsink
>>
>> Maybe I'm wrong with those points but the attempt to replace my current
>> batch system with a streaming one had no luck with those points.
>>
>> Best,
>> Flavio
>>
>> On Fri, Sep 8, 2017 at 5:29 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Expanding a bit on Kostas' answer. Yes, your analysis is correct, the
>>> problem is that the job is shutting down before a last checkpoint can
>>> "confirm" the written bucket data by moving it to the final state. The
>>> problem, as Kostas noted is that a user function (and thus also
>>> BucketingSink) does not know whether close() is being called because of a
>>> failure or because normal job shutdown. Therefore, we cannot move data to
>>> the final stage there.
>>>
>>> Once we have the issue that Kostas posted resolve we can also resolve
>>> this problem for the BucketingSink.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 8. Sep 2017, at 16:48, Kostas Kloudas <k.klou...@data-artisans.com>
>>> wrote:
>>>
>>> Hi Flavio,
>>>
>>> If I understand correctly, I think you bumped into this issue:
>>> https://issues.apache.org/jira/browse/FLINK-2646
>>>
>>> There is also a similar discussion on the BucketingSink here:
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.c
>>> om/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td144
>>> 66.html#a14468
>>>
>>> Kostas
>>>
>>> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>> Hi to all,
>>> I'm trying to test a streaming job but the files written by
>>> the BucketingSink are never finalized (remains into the pending state).
>>> Is this caused by the fact that the job finishes before the checkpoint?
>>> Shouldn't the sink properly close anyway?
>>>
>>> This is my code:
>>>
>>>   @Test
>>>   public void testBucketingSink() throws Exception {
>>>     final StreamExecutionEnvironment senv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     final StreamTableEnvironment tEnv = TableEnvironment.getTableEnvir
>>> onment(senv);
>>>     senv.enableCheckpointing(5000);
>>>     DataStream<String> testStream = senv.fromElements(//
>>>         "1,aaa,white", //
>>>         "2,bbb,gray", //
>>>         "3,ccc,white", //
>>>         "4,bbb,gray", //
>>>         "5,bbb,gray" //
>>>     );
>>>     final RowTypeInfo rtf = new RowTypeInfo(
>>>         BasicTypeInfo.STRING_TYPE_INFO,
>>>         BasicTypeInfo.STRING_TYPE_INFO,
>>>         BasicTypeInfo.STRING_TYPE_INFO);
>>>     DataStream<Row> rows = testStream.map(new MapFunction<String, Row>()
>>> {
>>>
>>>       private static final long serialVersionUID = 1L;
>>>
>>>       @Override
>>>       public Row map(String str) throws Exception {
>>>         String[] split = str.split(Pattern.quote(","));
>>>         Row ret = new Row(3);
>>>         ret.setField(0, split[0]);
>>>         ret.setField(1, split[1]);
>>>         ret.setField(2, split[2]);
>>>         return ret;
>>>       }
>>>     }).returns(rtf);
>>>
>>>     String columnNames = "id,value,state";
>>>     final String dsName = "test";
>>>     tEnv.registerDataStream(dsName, rows, columnNames);
>>>     final String whiteAreaFilter = "state = 'white'";
>>>     DataStream<Row> grayArea = rows;
>>>     DataStream<Row> whiteArea = null;
>>>     if (whiteAreaFilter != null) {
>>>       String sql = "SELECT *, (%s) as _WHITE FROM %s";
>>>       sql = String.format(sql, whiteAreaFilter, dsName);
>>>       Table table = tEnv.sql(sql);
>>>       grayArea = tEnv.toDataStream(table.where(
>>> "!_WHITE").select(columnNames), rtf);
>>>       DataStream<Row> nw = 
>>> tEnv.toDataStream(table.where("_WHITE").select(columnNames),
>>> rtf);
>>>       whiteArea = whiteArea == null ? nw : whiteArea.union(nw);
>>>     }
>>>     Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n");
>>>
>>>     String datasetWhiteDir = "/tmp/bucket/white";
>>>     BucketingSink<Row> whiteAreaSink = new BucketingSink<>(datasetWhiteDi
>>> r.toString());
>>>     whiteAreaSink.setWriter(bucketSinkwriter);
>>>     whiteAreaSink.setBatchSize(10);
>>>     whiteArea.addSink(whiteAreaSink);
>>>
>>>     String datasetGrayDir = "/tmp/bucket/gray";
>>>     BucketingSink<Row> grayAreaSink = new BucketingSink<>(datasetGrayDir
>>> .toString());
>>>     grayAreaSink.setWriter(bucketSinkwriter);
>>>     grayAreaSink.setBatchSize(10);
>>>     grayArea.addSink(grayAreaSink);
>>>
>>>     JobExecutionResult jobInfo = senv.execute("Buketing sink test ");
>>>     System.out.printf("Job took %s minutes",
>>> jobInfo.getNetRuntime(TimeUnit.MINUTES));
>>>   }
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> public class RowCsvWriter extends StreamWriterBase<Row> {
>>>   private static final long serialVersionUID = 1L;
>>>
>>>   private final String charsetName;
>>>   private transient Charset charset;
>>>   private String fieldDelimiter;
>>>   private String recordDelimiter;
>>>   private boolean allowNullValues = true;
>>>   private boolean quoteStrings = false;
>>>
>>>   /**
>>>    * Creates a new {@code StringWriter} that uses {@code "UTF-8"}
>>> charset to convert strings to
>>>    * bytes.
>>>    */
>>>   public RowCsvWriter() {
>>>     this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER,
>>> CsvOutputFormat.DEFAULT_LINE_DELIMITER);
>>>   }
>>>
>>>   /**
>>>    * Creates a new {@code StringWriter} that uses the given charset to
>>> convert strings to bytes.
>>>    *
>>>    * @param charsetName Name of the charset to be used, must be valid
>>> input for
>>>    *        {@code Charset.forName(charsetName)}
>>>    */
>>>   public RowCsvWriter(String charsetName, String fieldDelimiter, String
>>> recordDelimiter) {
>>>     this.charsetName = charsetName;
>>>     this.fieldDelimiter = fieldDelimiter;
>>>     this.recordDelimiter = recordDelimiter;
>>>   }
>>>
>>>   @Override
>>>   public void open(FileSystem fs, Path path) throws IOException {
>>>     super.open(fs, path);
>>>
>>>     try {
>>>       this.charset = Charset.forName(charsetName);
>>>     } catch (IllegalCharsetNameException ex) {
>>>       throw new IOException("The charset " + charsetName + " is not
>>> valid.", ex);
>>>     } catch (UnsupportedCharsetException ex) {
>>>       throw new IOException("The charset " + charsetName + " is not
>>> supported.", ex);
>>>     }
>>>   }
>>>
>>>   @Override
>>>   public void write(Row element) throws IOException {
>>>     FSDataOutputStream outputStream = getStream();
>>>     writeRow(element, outputStream);
>>>   }
>>>
>>>   private void writeRow(Row element, FSDataOutputStream out) throws
>>> IOException {
>>>     int numFields = element.getArity();
>>>
>>>     for (int i = 0; i < numFields; i++) {
>>>       Object obj = element.getField(i);
>>>       if (obj != null) {
>>>         if (i != 0) {
>>>           out.write(this.fieldDelimiter.getBytes(charset));
>>>         }
>>>
>>>         if (quoteStrings) {
>>>           if (obj instanceof String || obj instanceof StringValue) {
>>>             out.write('"');
>>>             out.write(obj.toString().getBytes(charset));
>>>             out.write('"');
>>>           } else {
>>>             out.write(obj.toString().getBytes(charset));
>>>           }
>>>         } else {
>>>           out.write(obj.toString().getBytes(charset));
>>>         }
>>>       } else {
>>>         if (this.allowNullValues) {
>>>           if (i != 0) {
>>>             out.write(this.fieldDelimiter.getBytes(charset));
>>>           }
>>>         } else {
>>>           throw new RuntimeException("Cannot write tuple with <null>
>>> value at position: " + i);
>>>         }
>>>       }
>>>     }
>>>
>>>     // add the record delimiter
>>>     out.write(this.recordDelimiter.getBytes(charset));
>>>   }
>>>
>>>   @Override
>>>   public Writer<Row> duplicate() {
>>>     return new RowCsvWriter(charsetName, fieldDelimiter,
>>> recordDelimiter);
>>>   }
>>> }
>>>
>>>
>>>
>>> Any help is appreciated,
>>> Flavio
>>>
>>>
>>>
>>>
>>
>

Reply via email to