Re: Dataset sampling

2019-02-18 Thread Fabian Hueske
Hi Flavio, I'm not aware of any particular plan to add sampling operators to the Table API or SQL. However, I agree. It would be a good feature. Best, Fabian Am Mo., 18. Feb. 2019 um 15:44 Uhr schrieb Flavio Pompermaier < pomperma...@okkam.it>: > Hi to all, > is there any plan to support differ

Re: subscribe

2019-02-18 Thread Fabian Hueske
Hi Artur, In order to subscribe to Flink's user mailing list you need to send a mail to user-subscr...@flink.apache.org Best, Fabian Am Mo., 18. Feb. 2019 um 20:34 Uhr schrieb Artur Mrozowski : > art...@gmail.com >

Re: Assigning timestamps and watermarks several times, several datastreams?

2019-02-20 Thread Fabian Hueske
Hi, Watermarks of streams are independent as long as the streams are not connected with each other. When you union, join, or connect two streams in any other way, their watermarks are fused, which means that they are synced to the "slower" stream, i.e., the stream with the earlier watermarks. Bes

Re: KeyBy distribution across taskslots

2019-02-28 Thread Fabian Hueske
Hi, The answer is in fact no. Flink hash-partitions keys into Key Groups [1] which are uniformly assigned to tasks, i.e., a task can process more than one key group. AFAIK, there are no plans to change this behavior. Stefan (in CC) might be able to give more details on this. Something that might

Re: Using Flink in an university course

2019-03-04 Thread Fabian Hueske
Hi Wouter, We are using Docker Compose (Flink JM, Flink TM, Kafka, Zookeeper) setups for our trainings and it is working very well. We have an additional container that feeds a Kafka topic via the commandline producer to simulate a somewhat realistic behavior. Of course, you can do it without Kafk

Re: EventCountJob for Flink 1.7.2

2019-03-05 Thread Fabian Hueske
Thanks Flavio! Am Di., 5. März 2019 um 11:23 Uhr schrieb Flavio Pompermaier < pomperma...@okkam.it>: > I discovered that now (in Flink 1.7.2( queryable state server is enabed if > queryable state client is found on the classpath, i.e.: > > > org.apache.flink > flink-queryable-state-client-java_$

Re: Joining two streams of different priorities

2019-03-11 Thread Fabian Hueske
Hi, This is not possible with Flink. Events in transport channels cannot be reordered and function cannot pick which input to read from. There are some upcoming changes for the unified batch-stream integration that enable to chose which input to read from, but this is not there yet, AFAIK. Best,

Re: Set partition number of Flink DataSet

2019-03-15 Thread Fabian Hueske
Hi, Flink works a bit differently than Spark. By default, Flink uses pipelined shuffles which push results of the sender immediately to the receivers (btw. this is one of the building blocks for stream processing). However, pipelined shuffles require that all receivers are online. Hence, there num

Re: Schema Evolution on Dynamic Schema

2019-03-19 Thread Fabian Hueske
Hi, Restarting a changed query from a savepoint is currently not supported. In general this is a very difficult problem as new queries might result in completely different execution plans. The special case of adding and removing aggregates is easier to solve, but the schema of the stored state cha

Re: Set partition number of Flink DataSet

2019-03-20 Thread Fabian Hueske
mber in > Flink may be compelling in batch processing. > > Could you help explain a bit more on which works are needed to be done, so > Flink can support custom partition numbers numbers? We would be willing to > help improve this area. > > Thanks, > Qi > > On Mar 15, 2019,

Re: Schema Evolution on Dynamic Schema

2019-03-20 Thread Fabian Hueske
p doesnt work for me. >> Trying something like >> Select a, map(sum(b) as metric_sum_c ,sum(c) as metric_sum_c) as >> metric_map >> group by a >> >> results with "Non-query expression encountered in illegal context" >> is my train of thought the rig

Re: How to split tuple2 returned by UDAF into two columns in a result table

2019-03-20 Thread Fabian Hueske
Hi Dongwon, Couldn't you just return a tuple from the aggregation function and extract the fields from the nested tuple using a value access function [1]? table table2 = table1 .window(Slide.over("3.rows").every("1.rows").on("time").as("w")) .groupBy("w, name") .select("name, my

[REMINDER] Flink Forward San Francisco in a few days

2019-03-20 Thread Fabian Hueske
Hi everyone, *Flink Forward San Francisco 2019 will take place in a few days on April 1st and 2nd.* If you haven't done so already and are planning to attend, you should register soon at: -> https://sf-2019.flink-forward.org/register Don't forget to use the 25% discount code *MailingList* for ma

Re: Support for custom triggers in Table / SQL

2019-03-29 Thread Fabian Hueske
Hi Piyush, Custom triggers (or early firing) is currently not supported by SQL or the Table API. It is also not on the roadmap [1]. Currently, most efforts on the relational API are focused on restructuring the code and working towards the integration of the Blink contribution [2]. AFAIK, there a

Re: Source reinterpretAsKeyedStream

2019-04-05 Thread Fabian Hueske
Hi, Konstantin is right. reinterpreteAsKeyedStream only works if you call it on a DataStream that was keyBy'ed before (with the same parallelism). Flink cannot reuse the partioning of another system like Kafka. Best, Fabian Adrienne Kole schrieb am Do., 4. Apr. 2019, 14:33: > Thanks a lot for

Re: InvalidProgramException when trying to sort a group within a dataset

2019-04-05 Thread Fabian Hueske
Hi, You POJO should implement the Serializable interface. Otherwise it's not considered to be serializable. Best, Fabian Papadopoulos, Konstantinos schrieb am Mi., 3. Apr. 2019, 07:22: > Hi Chesnay, > > > > Thanks for your support. ThresholdAcvFact class is a simple POJO with the > following d

Re: Partitioning key range

2019-04-08 Thread Fabian Hueske
Hi Davood, Flink uses hash partitioning to assign keys to key groups. Each key group is then assigned to a task for processing (a task might process multiple key groups). There is no way to directly assign a key to a particular key group or task. All you can do is to experiment with different cust

Re: Lantency caised Flink Checkpoint on EXACTLY_ONCE mode

2019-04-08 Thread Fabian Hueske
Hi Min, Guowei is right, the comment in the documentation about exactly-once in embarrassingly parallel data flows refers to exactly-once *state consistency*, not *end-to-end* exactly-once. However, in strictly forwarding pipelines, enabling exactly-once checkpoints should not have drawbacks compa

Re: InvalidProgramException when trying to sort a group within a dataset

2019-04-08 Thread Fabian Hueske
o success. > > I got the same NotSerializableException. > > > > Best, > > Konstantinos > > > > *From:* Fabian Hueske > *Sent:* Σάββατο, 6 Απριλίου 2019 2:26 πμ > *To:* Papadopoulos, Konstantinos > > *Cc:* Chesnay Schepler ; user > *Subject:* Re: InvalidProgramExcep

Re: Schema Evolution on Dynamic Schema

2019-04-08 Thread Fabian Hueske
the map elements > when doing *SELECT map['a', sum(a), 'b', sum(b).. ] FROM.. group by .. *? > > On Wed, Mar 20, 2019 at 2:00 AM Fabian Hueske wrote: > >> Hi, >> >> I think this would work. >> However, you should be aware that all keys

Re: End-to-end exactly-once semantics in simple streaming app

2019-04-08 Thread Fabian Hueske
Hi Patrick, In general, you could also implement the 2PC logic in a regular operator. It does not have to be a sink. You would need to add the logic of TwoPhaseCommitSinkFunction to your operator. However, the TwoPhaseCommitSinkFunction does not work well with JDBC. The problem is that you would n

Re: Flink SQL hangs in StreamTableEnvironment.sqlUpdate, keeps executing and seems never stop, finally lead to java.lang.OutOfMemoryError: GC overhead limit exceeded

2019-04-08 Thread Fabian Hueske
Hi Henry, It seem that the optimizer is not handling this case well. The search space might be too large (or rather the optimizer explores too much of the search space). Can you share the query? Did you add any optimization rules? Best, Fabian Am Mi., 3. Apr. 2019 um 12:36 Uhr schrieb 徐涛 : > Hi

Re: HA and zookeeper

2019-04-08 Thread Fabian Hueske
Hi Boris, ZooKeeper is also used by the JobManager to store metadata about the running job. The JM writes information like the JobGraph, JAR file, checkpoint metadata to a persistent storage (like HDFS, S3, ...) and a pointer to this information to ZooKeeper. In case of a recovery, the new JM look

Re: Schema Evolution on Dynamic Schema

2019-04-09 Thread Fabian Hueske
s again, and congrats on an awesome conference, I had learned a lot > Shahar > > From: Fabian Hueske > Sent: Monday, April 8, 02:54 > Subject: Re: Schema Evolution on Dynamic Schema > To: Shahar Cizer Kobrinsky > Cc: Rong Rong, user > > > Hi Shahar, > > Sorry for

Re: [ANNOUNCE] Apache Flink 1.8.0 released

2019-04-10 Thread Fabian Hueske
Congrats to everyone! Thanks Aljoscha and all contributors. Cheers, Fabian Am Mi., 10. Apr. 2019 um 11:54 Uhr schrieb Congxian Qiu < qcx978132...@gmail.com>: > Cool! > > Thanks Aljoscha a lot for being our release manager, and all the others > who make this release possible. > > Best, Congxian

Re: Is copying flink-hadoop-compatibility jar to FLINK_HOME/lib the only way to make it work?

2019-04-10 Thread Fabian Hueske
Hi, Packaging the flink-hadoop-compatibility dependency with your code into a "fat" job jar should work as well. Best, Fabian Am Mi., 10. Apr. 2019 um 15:08 Uhr schrieb Morven Huang < morven.hu...@gmail.com>: > Hi, > > > > I’m using Flink 1.5.6 and Hadoop 2.7.1. > > > > *My requirement is to re

Re: Does HadoopOutputFormat create MapReduce job internally?

2019-04-10 Thread Fabian Hueske
Hi, Flink's Hadoop compatibility functions just wrap functions that were implemented against Hadoop's interfaces in wrapper functions that are implemented against Flink's interfaces. There is no Hadoop cluster started or MapReduce job being executed. Job is just a class of the Hadoop API. It does

Re: What is the best way to handle data skew processing in Data Stream applications?

2019-04-11 Thread Fabian Hueske
Hi Felipe, three comments: 1) applying rebalance(), shuffle(), or rescale() before a keyBy() has no effect: keyBy() introduces a hash partitioning such that any data partitioning that you do immediately before keyBy() is destroyed. You only change the distribution for the call of the key extracto

Re: Default Kafka producers pool size for FlinkKafkaProducer.Semantic.EXACTLY_ONCE

2019-04-11 Thread Fabian Hueske
Hi Min, I think the pool size is per parallel sink task, i.e., it should be independent of the parallelism of the sink operator. >From my understanding a pool size of 5 should be fine if the maximum number of concurrent checkpoints is 1. Running out of connections would mean that there are 5 in-fl

Re: Apache Flink - Question about broadcast state pattern usage

2019-04-11 Thread Fabian Hueske
Hi, you would simply pass multiple MapStateDescriptors to the broadcast method: MapStateDescriptor bcState1 = ... MapStateDescriptor bcState2 = ... DataStream stream = ... BroadcastStream bcStream = stream.broadcast(bcState1, bcState2); Best, Fabian Am Mi., 10. Apr. 2019 um 19:44 Uhr schrieb

Re: Apache Flink - How to destroy global window and release it's resources

2019-04-11 Thread Fabian Hueske
Hi, As far as I know, a window is only completely removed when time (event or processing time, depending on the window type) passes the window's end timestamp. Since, GlobalWindow's end timestamp is Long.MAX_VALUE, it is never completely removed. I'm not 100% sure what state is kept around. It mig

Re: FlinkCEP and SQL?

2019-04-11 Thread Fabian Hueske
Hi Esa, Flink's implementation of SQL MATCH_RECOGNIZE is based on it's CEP library, i.e., they share the same implementation. Best, Fabian Am Do., 11. Apr. 2019 um 10:29 Uhr schrieb Esa Heikkinen (TAU) < esa.heikki...@tuni.fi>: > Hi > > > > Is SQL CEP based (old) FlinkCEP at all and are SQL CEP

Flink Forward Europe 2019 - Call for Presentations open until 17th May

2019-04-11 Thread Fabian Hueske
Hi all, Flink Forward Europe returns to Berlin on October 7-9th, 2019. We are happy to announce that the Call for Presentations is open! Please submit a proposal if you'd like to present your Apache Flink experience, best practices, new features, or use cases in front of an international audience

Re: Does HadoopOutputFormat create MapReduce job internally?

2019-04-15 Thread Fabian Hueske
Hi Morven, You posted the same question a few days ago and it was also answered correctly. Please do not repost the same question again. You can reply to the earlier thread if you have a follow up question. To answer your question briefly: No, Flink does not trigger a MapReduce job. The whole job

Re: Apache Flink - How to destroy global window and release it's resources

2019-04-15 Thread Fabian Hueske
is Long.MAX_VALUE, and that is my concern. So, is > there any other way of clean up the now purged global windows ? > > Thanks again. > > > > On Thursday, April 11, 2019, 4:16:24 AM EDT, Fabian Hueske < > fhue...@gmail.com> wrote: > > > Hi, > > As far as I

Re: Apache Flink - Question about broadcast state pattern usage

2019-04-15 Thread Fabian Hueske
s-release-1.8/dev/stream/state/broadcast_state.html) > and still not sure how that will help ? > > Thanks for your help. > > Mans > > > > > > On Thursday, April 11, 2019, 3:53:59 AM EDT, Fabian Hueske < > fhue...@gmail.com> wrote: > > > Hi, > > you would s

Re: Join of DataStream and DataSet

2019-04-15 Thread Fabian Hueske
Hi Reminia, What Hequn said is correct. However, I would *not* use a regular but model the problem as a time-versioned table join. A regular join will materialize both inputs which is probably not want you want to do for a stream. For a time-versioned table join, only the time-versioned table wou

Re: Flink JDBC: Disable auto-commit mode

2019-04-15 Thread Fabian Hueske
Hi Konstantinos, This sounds like a useful extension to me. Would you like to create a Jira issue and contribute the improvement? In the meantime, you can just fork the code of JDBCInputFormat and adjust it to your needs. Best, Fabian Am Mo., 15. Apr. 2019 um 08:53 Uhr schrieb Papadopoulos, Kon

Re: Hbase Connector failed when deployed to yarn

2019-04-15 Thread Fabian Hueske
Hi, The Jira issue is still unassigned. Would you be up to work on a fix? Best, Fabian Am Fr., 12. Apr. 2019 um 05:07 Uhr schrieb hai : > Hi, Tang: > > > Thaks for your reply, will this issue fix soon?I don’t think put > flink-hadoop-compatibility > jar under FLINK_HOME/lib is a elegant soluti

Re: Hbase Connector failed when deployed to yarn

2019-04-15 Thread Fabian Hueske
That's great! Thank you. Let me know if you have any questions. Fabian Am Mo., 15. Apr. 2019 um 11:32 Uhr schrieb Hai : > Hi Fabian: > > > OK ,I am glad to do that. > > > Regards > > Original Message > *Sender:* Fabian Hueske > *Recipient:* hai > *Cc:

Re: Flink JDBC: Disable auto-commit mode

2019-04-15 Thread Fabian Hueske
se/FLINK-12198 > > > > Best, > > Konstantinos > > > > *From:* Papadopoulos, Konstantinos > > *Sent:* Δευτέρα, 15 Απριλίου 2019 12:30 μμ > *To:* Fabian Hueske > *Cc:* Rong Rong ; user > *Subject:* RE: Flink JDBC: Disable auto-commit mode > > > > Hi F

Re: How to calculate moving average result using flink sql ?

2019-04-16 Thread Fabian Hueske
hi Lifei, This sounds to me like you need an OVER window aggregation. OVER is a standard SQL clause to compute aggregates for each row over a group of surrounding rows (defined by ordering and partitioning). Check out the documentation [1]. The example only shows ROW based windows, but Flink also

Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-04-29 Thread Fabian Hueske
Hi Juan, count() and collect() trigger the execution of a job. Since Flink does not cache intermediate results (yet), all operations from the sink (count()/collect()) to the sources are executed. So in a sense a DataSet is immutable (given that the input of the sources do not change) but completel

Re: Working around lack of SQL triggers

2019-04-29 Thread Fabian Hueske
Hi, I don't think that (the current state of) Flink SQL is a good fit for your requirements. Each query will be executed as an independent job. So there won't be any sharing of intermediate results. You can do some of this manually if you use the Table API, but even then it won't allow for early r

Re: Emitting current state to a sink

2019-04-29 Thread Fabian Hueske
Hi Avi, I'm not sure if you cannot emit data from the keyed state when you receive a broadcasted message. The Context parameter of the processBroadcastElement() method in the KeyedBroadcastProcessFunction has the applyToKeyedState() method. The method takes a KeyedStateFunction that is applied to

Re: FileInputFormat that processes files in chronological order

2019-04-29 Thread Fabian Hueske
Hi Sergei, It depends whether you want to process the file with the DataSet (batch) or DataStream (stream) API. Averell's answer was addressing the DataStream API part. The DataSet API does not have any built-in support to distinguish files (or file splits) by folders and process them in order. F

Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-29 Thread Fabian Hueske
Hi Mans, I don't know if that would work or not. Would need to dig into the source code for that. TBH, I would recommend to check if you can implement the logic using a (Keyed-)ProcessFunction. IMO, process functions are a lot easier to reason about than Flink's windowing framework. You can manag

Re: Data Locality in Flink

2019-04-29 Thread Fabian Hueske
from local file system I guess every line of the file will > be read by a slot (according to the job parallelism) for applying the map > logic. > > In reading from HDFS I read this > <https://stackoverflow.com/a/39153402/8110607> answer by Fabian Hueske > <https://stackover

Re: Emitting current state to a sink

2019-04-29 Thread Fabian Hueske
Nice! Thanks for the confirmation :-) Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi : > Thanks! Works like a charm :) > > On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske wrote: > >> *This Message originated outside your organization.* >> ---

Re: Data Locality in Flink

2019-04-29 Thread Fabian Hueske
> will be read by a slot (according to the job parallelism) for applying the >>> map logic. >>> >>> In reading from HDFS I read this >>> <https://stackoverflow.com/a/39153402/8110607> answer by Fabian Hueske >>> <https://stackoverflow.com/users/3609571/fabian-hueske> and i want to >>> know is that still the Flink strategy fro reading from distributed system >>> file? >>> >>> thanks >>> >> > >

Re: kafka partitions, data locality

2019-04-30 Thread Fabian Hueske
Hi Sergey, You are right, keys are managed in key groups. Each key belongs to a key group and one or more key groups are assigned to each parallel task of an operator. Key groups are not exposed to users and the assignments of keys -> key-groups and key-groups -> tasks cannot be changed without ch

Re: Data Locality in Flink

2019-04-30 Thread Fabian Hueske
sing whether a Dataset could benefit from a > rebalance or not could be VERY nice (at least for batch) but I fear this > would be very hard to implement..am I wrong? > > On Mon, Apr 29, 2019 at 3:10 PM Fabian Hueske wrote: > >> Hi Flavio, >> >> These typos of race c

Re: Working around lack of SQL triggers

2019-04-30 Thread Fabian Hueske
You could implement aggregation functions that just do AVG, COUNT, etc. and a parameterizable aggregation function that can be configured to call the avg, count, etc. functions. When configuring, you would specify the input and output, for example like this: input: [int, int, double] key: input.1

Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-30 Thread Fabian Hueske
An operator task broadcasts its current watermark to all downstream tasks that might receive its records. If you have an the following code: DataStream a = ... a.map(A).map(B).keyBy().window(C) and execute this with parallelism 2, your plan looks like this A.1 -- B.1 --\--/-- C.1

Re: How to verify what maxParallelism is set to?

2019-04-30 Thread Fabian Hueske
Hi Sean, I was looking for the max-parallelism value in the UI, but couldn't find it. Also the REST API does not seem to provide it. Would you mind opening a Jira issue for adding it to the REST API and the Web UI? Thank you, Fabian Am Di., 30. Apr. 2019 um 06:36 Uhr schrieb Sean Bollin : > Tha

Re: can we do Flink CEP on event stream or batch or both?

2019-04-30 Thread Fabian Hueske
Hi, Stateful streaming applications are typically designed to run continuously (i.e., until forever or until they are not needed anymore or replaced). May jobs run for weeks or months. IMO, using CEP for "simple" equality matches would add too much complexity for a use case that can be easily sol

Re: Timestamp and key preservation over operators

2019-04-30 Thread Fabian Hueske
Hi, Actually all operators should preserve record timestamps if set the correct TimeCharacteritics to event time. A window operator will set the timestamp of all emitted records to the end-timestamp of the window. Not sure what happens if you use a processing time window in an event time applicati

Re: Flink orc file write example

2019-04-30 Thread Fabian Hueske
Hi, I had a look but couldn't find an ORC writer in flink-orc, only an InputFormat and TableSource to read ORC data into DataSet programs or Table / SQL queries. Where did you find the ORC writer? Thanks, Fabian Am Di., 30. Apr. 2019 um 09:09 Uhr schrieb Hai : > Hi, > > > I found flink now supp

Re: Flink dashboard+ program arguments

2019-04-30 Thread Fabian Hueske
Hi, With Flink 1.5.0, we introduced a new distributed architecture (see release announcement [1] and FLIP-6 [2]). >From what you describe, I cannot tell what is going wrong. How do you submit your application? Which action resulted in the error message you shared? Btw. why do you go for Flink 1.

Re: Preserve accumulators after failure in DataStream API

2019-05-02 Thread Fabian Hueske
Hi Wouter, The DataStream API accumulators of the AggregateFunction [1] are stored in state and should be recovered in case of a failure as well. If this does not work, it would be a serious bug. What's the type of your accumulator? Can you maybe share the code? How to you apply the AggregateFunc

Re: Flink dashboard+ program arguments

2019-05-02 Thread Fabian Hueske
Hi, The SQL client can be started with > ./bin/sql-client.sh embedded Best, Fabian Am Di., 30. Apr. 2019 um 20:13 Uhr schrieb Rad Rad : > Thanks, Fabian. > > The problem was incorrect java path. Now, everything works fine. > > I would ask about the command for running sql-client.sh > > These

Re: Filter push-down not working for a custom BatchTableSource

2019-05-02 Thread Fabian Hueske
Hi Josh, Does your TableSource also implement ProjectableTableSource? If yes, you need to make sure that the filter information is also forwarded if ProjectableTableSource.projectFields() is called after FilterableTableSource.applyPredicate(). Also make sure to correctly implement FilterableTableS

Re: Timestamp and key preservation over operators

2019-05-02 Thread Fabian Hueske
Hi Averell, The watermark of a stream is always the low watermark of all its input streams. If one of the input streams does not have watermarks, Flink does not compute a watermark for the merged stream. If you do not need time-based operations on streams 3 and 4, setting the watermark to MAX_WATE

Re: Preserve accumulators after failure in DataStream API

2019-05-02 Thread Fabian Hueske
e40762e98f8d010/src/main/scala/org/codefeedr/experimental/stats/CommitsStatsProcess.scala#L34 > > > > Op do 2 mei 2019 om 09:36 schreef Fabian Hueske : > >> Hi Wouter, >> >> The DataStream API accumulators of the AggregateFunction [1] are stored >> in state and

Re: Preserve accumulators after failure in DataStream API

2019-05-02 Thread Fabian Hueske
gt; Hi Wouter, > > I've met the same issue and finally managed to use operator states to back > the accumulators, so they can be restored after restarts. > The downside is that we have to update the values in both accumulators and > states to make them consistent. FYI. > >

Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-05-03 Thread Fabian Hueske
ortant rule documented anywhere in the official document? > > On 2019/04/30 08:47:29, Fabian Hueske wrote: > > An operator task broadcasts its current watermark to all downstream tasks > > that might receive its records. > > If you have an the following code: > > > &g

Re: Timestamp and key preservation over operators

2019-05-03 Thread Fabian Hueske
Hi Averell, Yes, timestamps and watermarks do not (completely) move together. The watermark should always be lower than the timestamps of the currently processed records. Otherwise, the records might be processed as late records (depending on the logic). The easiest way to check the timestamp of

Re: Filter push-down not working for a custom BatchTableSource

2019-05-03 Thread Fabian Hueske
@Override > public DataSet getDataSet(ExecutionEnvironment execEnv) { > return execEnv.fromCollection(resourceIterator, modelClass); > } > > @Override > public TypeInformation getReturnType() { > return TypeInformation.of(modelClass); > } > > @Overri

Re: Timestamp and key preservation over operators

2019-05-03 Thread Fabian Hueske
The window operator cannot configured to use the max timestamp of the events in the window as the timestamp of the output record. The reason is that such a behavior can produce late records. If you want to do that, you have to track the max timestamp and assign it yourself with a timestamp assigne

Re: taskmanager.tmp.dirs vs io.tmp.dirs

2019-05-09 Thread Fabian Hueske
Hi, I created FLINK-12460 to update the documentation. Cheers, Fabian Am Mi., 8. Mai 2019 um 17:48 Uhr schrieb Flavio Pompermaier < pomperma...@okkam.it>: > Great, thanks Till! > > On Wed, May 8, 2019 at 4:20 PM Till Rohrmann wrote: > >> Hi Flavio, >> >> taskmanager.tmp.dirs is the deprecated

Re: I want to use MapState on an unkeyed stream

2019-05-09 Thread Fabian Hueske
Hi, Yes, IMO it is more clear. However, you should be aware that operator state is maintained on heap only (not in RocksDB). Best, Fabian Am Mi., 8. Mai 2019 um 20:44 Uhr schrieb an0 : > I switched to using operator list state. It is more clear. It is also > supported by RocksDBKeyedStateBacke

Re: Reconstruct object through partial select query

2019-05-09 Thread Fabian Hueske
Hi, you can use the value construction function ROW to create a nested row (or object). However, you have to explicitly reference all attributes that you will add. If you have a table Cars with (year, modelName) a query could look like this: SELECT ROW(year, modelName) AS car, enrich(year, m

Re: How to export all not-null keyed ValueState

2019-05-09 Thread Fabian Hueske
Hi, The KeyedBroadcastProcessFunction has a method to iterate over all keys of a keyed state. This function is available via the Context object of the processBroadcast() method. Hence you need a broadcasted message to trigger the operation. Best, Fabian Am Do., 9. Mai 2019 um 08:46 Uhr schrieb C

Re: How to export all not-null keyed ValueState

2019-05-09 Thread Fabian Hueske
Hi, Passing a Context through a DataStream definitely does not work. You'd need to have the keyed state that you want to scan over in the KeyedBroadcastProcessFunction. For the toggle filter use case, you would need to have a unioned stream with Toggle and StateReport events. For the output, you

Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-05-09 Thread Fabian Hueske
guishing receiving (different) watermarks and emitting (the same) watermarks. Best, Fabian > On 2019/05/03 07:32:07, Fabian Hueske wrote: > > Hi, > > > > this should be covered here: > > > https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time

Re: Checkpointing and save pointing

2019-05-10 Thread Fabian Hueske
Hi Boris, Is your question is in the context of replacing Zookeeper by a different service for highly-available setups or are you setting up a regular Flink cluster? Best, Fabian Am Mi., 8. Mai 2019 um 06:20 Uhr schrieb Congxian Qiu < qcx978132...@gmail.com>: > Hi, Boris > > TM will also need

Re: I want to use MapState on an unkeyed stream

2019-05-10 Thread Fabian Hueske
reatment of operator state documented anywhere? > > On 2019/05/09 07:39:34, Fabian Hueske wrote: > > Hi, > > > > Yes, IMO it is more clear. > > However, you should be aware that operator state is maintained on heap > only > > (not in RocksDB). > > &g

Re: Reduce key state

2019-05-10 Thread Fabian Hueske
Hi Frank, By default, Flink does not remove any state. It is the responsibility of the developer to ensure that an application does not leak state. Typically, you would use timers [1] to discard state that expired and is not useful anymore. In the last release 1.8, we added lazy cleanup strategie

Re:

2019-05-10 Thread Fabian Hueske
any point in time. Also Flink does does not give any guarantees about how keys (or rather key groups) are assigned to tasks. If you rescale the application to a parallelism of 3, the active key group might be scheduled to C.2 or C.3. Long story short, D makes progress in event time because watermarks

Re: How to export all not-null keyed ValueState

2019-05-10 Thread Fabian Hueske
context.applyToKeyedState(toggleStateDescriptor, (k: Key, s: > ValueState[Boolean]) => > if (s != null) context.output(outputTag, (k, s.value( >} > } > > Thanks for your help. > Regards, > Averell > > On Thu, May 9, 2019 at 7:31 PM Fabian Hueske wrote: &g

Re: How to export all not-null keyed ValueState

2019-05-10 Thread Fabian Hueske
method? > > Thanks a lot for your help. > > Regards, > Averell > > > On Fri, May 10, 2019 at 8:52 PM Fabian Hueske wrote: > >> Hi Averell, >> >> Ah, sorry. I had assumed the toggle events where broadcasted anyway. >> Since you had both streams keyed, your c

Re: Checkpointing and save pointing

2019-05-10 Thread Fabian Hueske
g checkpointing location. > > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com > https://www.lightbend.com/ > > On May 10, 2019, at 2:47 AM, Fabian Hueske wrote: > > Hi Boris, > > Is your question is in the context of replacing Zookeeper by a different &

Re:

2019-05-13 Thread Fabian Hueske
marks, it creates watermarks from it's received data. Since it doesn't > receive any data, it doesn't create any watermarks. D couldn't make > progress because one of its inputs, C2, doesn't make progress. Is this > understand correct? > > Yes, I think that&#

Re: Reconstruct object through partial select query

2019-05-14 Thread Fabian Hueske
cParameter for some reason. Also, how would you create the serializer >>>> for the type info? can i reuse some builtin Kryo functionality? >>>> >>>> Thanks >>>> >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-d

Re: RichAsyncFunction for Scala?

2019-05-16 Thread Fabian Hueske
Hi Shannon, That's a good observation. To be honest, I know why the Scala AsyncFunction does not implement RichFunction. Maybe this was not intentional and just overlooked when porting the functionality to Scala. Would you mind creating a Jira ticket for this? Thank you, Fabian Am Di., 14. Mai

Re: Flink and Prometheus setup in K8s

2019-05-16 Thread Fabian Hueske
Thanks for sharing your solution Wouter! Best, Fabian Am Mi., 15. Mai 2019 um 15:28 Uhr schrieb Wouter Zorgdrager < w.d.zorgdra...@tudelft.nl>: > Hi all, > > To answer my own questions I worked on the following solution: > > 1) Custom Docker image which pulls the Flink image and moves Prometheus

Re: FlinkSQL fails when rowtime meets dirty data

2019-05-16 Thread Fabian Hueske
Hi, I'm afraid I don't see another solution than touching the Flink code for this and adding a try catch block around the timestamp conversion. It would be great if you could create a Jira issue reporting this problem. IMO, we should have a configuration switch (either per Table or query) to eith

Re: Flink ML Use cases

2019-05-25 Thread Fabian Hueske
Hi Abhishek, Your observation is correct. Right now, the Flink ML module is in a half-baked state and is only supported in batch mode. It is not integrated with the DataStream API. FLIP-23 proposes a feature that allows to evaluated an externally trained model (stored as PMML) on a stream of data.

Re: Question regarding date conditions/row expirations on Dynamic Tables

2019-05-27 Thread Fabian Hueske
Hi Wayne, Long story short, this is not possible with Flink yet. I posted a more detailed answer to your question on SO. Best, Fabian Am Di., 21. Mai 2019 um 19:24 Uhr schrieb Wayne Heaney < wayne.hea...@gmail.com>: > I'm trying to build a Dynamic table that will be updated when records > haven

Re: FileInputFormat that processes files in chronological order

2019-05-27 Thread Fabian Hueske
Configuring the split assigner wasn't a common requirement so far. You can just implement your own format extending from FileInputFormat (or any of its subclasses) and override the getInputSplitAssigner() method. Best, Fabian Am Mo., 27. Mai 2019 um 15:30 Uhr schrieb spoganshev : > Why is FileIn

Re: FileInputFormat that processes files in chronological order

2019-05-27 Thread Fabian Hueske
I see, that's unfortunate. Both classes are also tagged with @Public, making them unchangeable until Flink 2.0. Nonetheless, feel free to open a Jira issue to improve the situation for a future release. Best, Fabian Am Mo., 27. Mai 2019 um 16:55 Uhr schrieb spoganshev : > I've tried that, but t

[ANNOUNCE] Munich meetup: "Let's talk about "Stream Processing with Apache Flink"

2019-05-28 Thread Fabian Hueske
Hi folks, Next Tuesday (June, 4th), Vasia and I will be speaking at a meetup in Munich about Flink and how we wrote our book "Stream Processing with Apache Flink". We will also raffle a few copies of the book. Please RSVP if you'd like to attend: -> https://www.meetup.com/inovex-munich/events/26

Re: count(DISTINCT) in flink SQL

2019-06-03 Thread Fabian Hueske
Hi Vinod, IIRC, support for DISTINCT aggregates was added in Flink 1.6.0 (released August, 9th 2018) [1]. Also note that by default, this query will accumulate more and more state, i.e., for each grouping key it will hold all unique event_ids. You could configure an idle state retention time to cl

Re: can flink sql handle udf-generated timestamp field

2019-06-06 Thread Fabian Hueske
Hi Yu, When you register a DataStream as a Table, you can create a new attribute that contains the event timestamp of the DataStream records. For that, you would need to assign timestamps and generate watermarks before registering the stream: FlinkKafkaConsumer kafkaConsumer = new FlinkKa

Re: Weird behavior with CoFlatMapFunction

2019-06-06 Thread Fabian Hueske
Hi, There are a few things to point out about your example: 1. The the CoFlatMapFunction is probably executed in parallel. The configuration is only applied to one of the parallel function instances. You probably want to broadcast the configuration changes to all function instances. Have a look a

Re: Does Flink Kafka connector has max_pending_offsets concept?

2019-06-06 Thread Fabian Hueske
Hi Ben, Flink correctly maintains the offsets of all partitions that are read by a Kafka consumer. A checkpoint is only complete when all functions successful checkpoint their state. For a Kafka consumer, this state is the current reading offset. In case of a failure the offsets and the state of a

Re: Change sink topology

2019-06-06 Thread Fabian Hueske
Hi Sergey, I would not consider this to be a topology change (the sink operator would still be a Kafka producer). It seems that dynamic topic selection is possible with a KeyedSerializationSchema (Check "Advanced Serialization Schema: [1]). Best, Fabian [1] https://ci.apache.org/projects/flink/f

Re: Ipv6 supported?

2019-06-07 Thread Fabian Hueske
Hi, The networking libraries that Flink uses (Netty & Akka) support seem to support IPv6. So, it might work. However, I'm not aware of anybody running Flink on IPv6. Maybe somebody with more info could help out here? Best, Fabian Am Do., 6. Juni 2019 um 16:25 Uhr schrieb Siew Wai Yow : > Hi gu

Re: Flink 1.7.1 flink-s3-fs-hadoop-1.7.1 doesn't delete older chk- directories

2019-06-07 Thread Fabian Hueske
Hi, I found a few issues in Jira that are related to not deleted checkpoint directories, but only FLINK-10855 [1] seems to be a possible reason in your case. Is it possible that the checkpoints of the remaining directories failed? If that's not the case, would you mind creating a Jira issue and d

Re: NotSerializableException: DropwizardHistogramWrapper inside AggregateFunction

2019-06-07 Thread Fabian Hueske
Hi, There are two ways: 1. make the non-serializable member variable transient (meaning that it won't be serialized) and check in the aggregate call if it has been initialized or not. 2. implement your own serialization logic by overriding readObject() and writeObject() [1]. Best, Fabian [1] ht

<    2   3   4   5   6   7   8   9   10   11   >