I've just looked at Robert presentation at FF [1] and that's exactly what I was waiting for streaming planning/training... Very useful ;)
[1] https://www.youtube.com/watch?v=8l8dCKMMWkw On Wed, Sep 13, 2017 at 12:04 PM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Hi Gordon, > thanks for your feedback. The main problem for me is that moving from > batch to stream should be much easier IMHO. > > Rows should be a first class citizen in Flink and should be VERY easy to > read/write them, while at the moment it seems that Tuples are the > dominating type...I don't want to write a serializer/outputFormat to > persist Rows as Parquet, Avro, Thrift, OCR, Kudu, Hive, etc..I expect to > have some already existing (and mantained) connector already available > somewhere. The case of the Parquet Rollink sink is just an example. > > Regarding state backends I think that its not so easy to understand how to > design and monitor it properly: there are many parameters/variables to take > into account and it would be helpful to have a proper hands-on training > course/certification about this... > > About ES indexing monitoring see my discussion with Chesnay at > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > Streaming-job-monitoring-td13583.html: what I need is just to have > recordsIn/recordsOut reflecting real values. > > Best, > Flavio > > On Wed, Sep 13, 2017 at 10:56 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org > > wrote: > >> Hi Flavio, >> >> Let me try to understand / look at some of the problems you have >> encountered. >> >> >> - checkpointing: it's not clear which checkpointing system to use and >> how to tune/monitor it and avoid OOM exceptions. >> >> What do you mean be which "checkpointing system” to use? Do you mean >> state backends? Typically, you would only get OOM exceptions for >> memory-backed state backends if the state size exceeds the memory capacity. >> State sizes can be queried from the REST APIs / Web UI. >> >> >> - cleanup: BucketingSink doesn't always move to final state >> >> This sounds like a bug that we should look into. Do you have any logs on >> which you observed this? >> >> >> - missing output formats: parquet support to write generic Rows not >> very well supported (at least out of the box) [1] >> >> Would you be up to opening up JIRAs for what you think is missing (if >> there isn’t one already)? >> >> >> - progress monitoring: for example in the ES connector there's no way >> (apart from using accumulators) to monitor the progress of the indexing >> >> Maybe we can add some built-in metric in the ES sink connector that >> tracks the number of successfully indexed elements, which can then be >> queried from the REST API / Web UI. That wouldn’t be too much effort. What >> do you think, would that be useful for your case? >> Would be happy to hear your thoughts on this! >> >> Cheers, >> Gordon >> >> >> On 12 September 2017 at 11:36:27 AM, Flavio Pompermaier ( >> pomperma...@okkam.it) wrote: >> >> For the moment I give up with streaming...too many missing/unclear >> features wrt batch. >> For example: >> >> - checkpointing: it's not clear which checkpointing system to use and >> how to tune/monitor it and avoid OOM exceptions. Moreover is it really >> necessary to use it? For example if I read a file from HDFS and I don't >> have a checkpoint it could be ok to re-run the job on all the data in case >> of errors (i.e. the stream is managed like a batch) >> - cleanup: BucketingSink doesn't always move to final state >> - missing output formats: parquet support to write generic Rows not >> very well supported (at least out of the box) [1] >> - progress monitoring: for example in the ES connector there's no way >> (apart from using accumulators) to monitor the progress of the indexing >> >> [1] https://stackoverflow.com/questions/41144659/flink-avro- >> parquet-writer-in-rollingsink >> >> Maybe I'm wrong with those points but the attempt to replace my current >> batch system with a streaming one had no luck with those points. >> >> Best, >> Flavio >> >> On Fri, Sep 8, 2017 at 5:29 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> >>> Expanding a bit on Kostas' answer. Yes, your analysis is correct, the >>> problem is that the job is shutting down before a last checkpoint can >>> "confirm" the written bucket data by moving it to the final state. The >>> problem, as Kostas noted is that a user function (and thus also >>> BucketingSink) does not know whether close() is being called because of a >>> failure or because normal job shutdown. Therefore, we cannot move data to >>> the final stage there. >>> >>> Once we have the issue that Kostas posted resolve we can also resolve >>> this problem for the BucketingSink. >>> >>> Best, >>> Aljoscha >>> >>> On 8. Sep 2017, at 16:48, Kostas Kloudas <k.klou...@data-artisans.com> >>> wrote: >>> >>> Hi Flavio, >>> >>> If I understand correctly, I think you bumped into this issue: >>> https://issues.apache.org/jira/browse/FLINK-2646 >>> >>> There is also a similar discussion on the BucketingSink here: >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.c >>> om/DISCUSS-Adding-a-dispose-method-in-the-RichFunction-td144 >>> 66.html#a14468 >>> >>> Kostas >>> >>> On Sep 8, 2017, at 4:27 PM, Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>> Hi to all, >>> I'm trying to test a streaming job but the files written by >>> the BucketingSink are never finalized (remains into the pending state). >>> Is this caused by the fact that the job finishes before the checkpoint? >>> Shouldn't the sink properly close anyway? >>> >>> This is my code: >>> >>> @Test >>> public void testBucketingSink() throws Exception { >>> final StreamExecutionEnvironment senv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> final StreamTableEnvironment tEnv = TableEnvironment.getTableEnvir >>> onment(senv); >>> senv.enableCheckpointing(5000); >>> DataStream<String> testStream = senv.fromElements(// >>> "1,aaa,white", // >>> "2,bbb,gray", // >>> "3,ccc,white", // >>> "4,bbb,gray", // >>> "5,bbb,gray" // >>> ); >>> final RowTypeInfo rtf = new RowTypeInfo( >>> BasicTypeInfo.STRING_TYPE_INFO, >>> BasicTypeInfo.STRING_TYPE_INFO, >>> BasicTypeInfo.STRING_TYPE_INFO); >>> DataStream<Row> rows = testStream.map(new MapFunction<String, Row>() >>> { >>> >>> private static final long serialVersionUID = 1L; >>> >>> @Override >>> public Row map(String str) throws Exception { >>> String[] split = str.split(Pattern.quote(",")); >>> Row ret = new Row(3); >>> ret.setField(0, split[0]); >>> ret.setField(1, split[1]); >>> ret.setField(2, split[2]); >>> return ret; >>> } >>> }).returns(rtf); >>> >>> String columnNames = "id,value,state"; >>> final String dsName = "test"; >>> tEnv.registerDataStream(dsName, rows, columnNames); >>> final String whiteAreaFilter = "state = 'white'"; >>> DataStream<Row> grayArea = rows; >>> DataStream<Row> whiteArea = null; >>> if (whiteAreaFilter != null) { >>> String sql = "SELECT *, (%s) as _WHITE FROM %s"; >>> sql = String.format(sql, whiteAreaFilter, dsName); >>> Table table = tEnv.sql(sql); >>> grayArea = tEnv.toDataStream(table.where( >>> "!_WHITE").select(columnNames), rtf); >>> DataStream<Row> nw = >>> tEnv.toDataStream(table.where("_WHITE").select(columnNames), >>> rtf); >>> whiteArea = whiteArea == null ? nw : whiteArea.union(nw); >>> } >>> Writer<Row> bucketSinkwriter = new RowCsvWriter("UTF-8", "\t", "\n"); >>> >>> String datasetWhiteDir = "/tmp/bucket/white"; >>> BucketingSink<Row> whiteAreaSink = new BucketingSink<>(datasetWhiteDi >>> r.toString()); >>> whiteAreaSink.setWriter(bucketSinkwriter); >>> whiteAreaSink.setBatchSize(10); >>> whiteArea.addSink(whiteAreaSink); >>> >>> String datasetGrayDir = "/tmp/bucket/gray"; >>> BucketingSink<Row> grayAreaSink = new BucketingSink<>(datasetGrayDir >>> .toString()); >>> grayAreaSink.setWriter(bucketSinkwriter); >>> grayAreaSink.setBatchSize(10); >>> grayArea.addSink(grayAreaSink); >>> >>> JobExecutionResult jobInfo = senv.execute("Buketing sink test "); >>> System.out.printf("Job took %s minutes", >>> jobInfo.getNetRuntime(TimeUnit.MINUTES)); >>> } >>> >>> >>> >>> >>> >>> >>> >>> public class RowCsvWriter extends StreamWriterBase<Row> { >>> private static final long serialVersionUID = 1L; >>> >>> private final String charsetName; >>> private transient Charset charset; >>> private String fieldDelimiter; >>> private String recordDelimiter; >>> private boolean allowNullValues = true; >>> private boolean quoteStrings = false; >>> >>> /** >>> * Creates a new {@code StringWriter} that uses {@code "UTF-8"} >>> charset to convert strings to >>> * bytes. >>> */ >>> public RowCsvWriter() { >>> this("UTF-8", CsvOutputFormat.DEFAULT_FIELD_DELIMITER, >>> CsvOutputFormat.DEFAULT_LINE_DELIMITER); >>> } >>> >>> /** >>> * Creates a new {@code StringWriter} that uses the given charset to >>> convert strings to bytes. >>> * >>> * @param charsetName Name of the charset to be used, must be valid >>> input for >>> * {@code Charset.forName(charsetName)} >>> */ >>> public RowCsvWriter(String charsetName, String fieldDelimiter, String >>> recordDelimiter) { >>> this.charsetName = charsetName; >>> this.fieldDelimiter = fieldDelimiter; >>> this.recordDelimiter = recordDelimiter; >>> } >>> >>> @Override >>> public void open(FileSystem fs, Path path) throws IOException { >>> super.open(fs, path); >>> >>> try { >>> this.charset = Charset.forName(charsetName); >>> } catch (IllegalCharsetNameException ex) { >>> throw new IOException("The charset " + charsetName + " is not >>> valid.", ex); >>> } catch (UnsupportedCharsetException ex) { >>> throw new IOException("The charset " + charsetName + " is not >>> supported.", ex); >>> } >>> } >>> >>> @Override >>> public void write(Row element) throws IOException { >>> FSDataOutputStream outputStream = getStream(); >>> writeRow(element, outputStream); >>> } >>> >>> private void writeRow(Row element, FSDataOutputStream out) throws >>> IOException { >>> int numFields = element.getArity(); >>> >>> for (int i = 0; i < numFields; i++) { >>> Object obj = element.getField(i); >>> if (obj != null) { >>> if (i != 0) { >>> out.write(this.fieldDelimiter.getBytes(charset)); >>> } >>> >>> if (quoteStrings) { >>> if (obj instanceof String || obj instanceof StringValue) { >>> out.write('"'); >>> out.write(obj.toString().getBytes(charset)); >>> out.write('"'); >>> } else { >>> out.write(obj.toString().getBytes(charset)); >>> } >>> } else { >>> out.write(obj.toString().getBytes(charset)); >>> } >>> } else { >>> if (this.allowNullValues) { >>> if (i != 0) { >>> out.write(this.fieldDelimiter.getBytes(charset)); >>> } >>> } else { >>> throw new RuntimeException("Cannot write tuple with <null> >>> value at position: " + i); >>> } >>> } >>> } >>> >>> // add the record delimiter >>> out.write(this.recordDelimiter.getBytes(charset)); >>> } >>> >>> @Override >>> public Writer<Row> duplicate() { >>> return new RowCsvWriter(charsetName, fieldDelimiter, >>> recordDelimiter); >>> } >>> } >>> >>> >>> >>> Any help is appreciated, >>> Flavio >>> >>> >>> >>> >> >