Re: Flink Checkpoint runs slow for low load stream

2017-01-04 Thread Fabian Hueske
>>> >>>>>>>>> >>>>>>>>> What Flink should fix: >>>>>>>>> - The KafkaConsumer should run the commit operations >>>>>>>>> asynchronously, to not block the "notifyCheckp

Re: Som question about Flink stream sql

2017-01-05 Thread Fabian Hueske
Hi Yuhong, as you noticed, FLIP-11 is about the window operations on the Table API and does not include SQL. The reason is that the Table API is completely Flink domain, i.e., we can design and implement the API. For SQL we have a dependency on Calcite. You are right, that Calcite's JIRA issue fo

Re: Caching collected objects in .apply()

2017-01-05 Thread Fabian Hueske
Hi Matt, I think your approach should be fine. Although the second keyBy is logically a shuffle, the data will not be sent of the wire to a different machine if the parallelism of the first and second window operator are identical. It only cost one serialization / deserialization step. I would be

Re: Speedup of Flink Applications

2017-01-05 Thread Fabian Hueske
Hi Hanna, I assume you are asking about the possible speed up of batch analysis programs and not about streaming applications (please correct me if I'm wrong). Timur raised very good points about data size and skew. Given evenly distributed data (no skewed key distribution for a grouping or join

Re: 答复: Som question about Flink stream sql

2017-01-05 Thread Fabian Hueske
he reply. > > As you noticed, row windows are already supported by Calcite and FLIP-11 > has planned, > > Can you tell something about the progress of the row windows in Table API? > > > > Regards. > > Yuhong > > > > > > > > > > *发件人

Re: Regarding ordering of events

2017-01-05 Thread Fabian Hueske
Flink is a distributed system and does not preserve order across partitions. The number prefix (e.g., 1>, 2>, ...) tells you the parallel instance of the printing operator. You can set the parallelism to 1 to have the stream in order. Fabian 2017-01-05 12:16 GMT+01:00 Kostas Kloudas : > Hi Abdu

Re: Sequential/ordered map

2017-01-05 Thread Fabian Hueske
Please avoid collecting the data to the client using collect(). This operation looks convenient but is only meant for super small data and would be a lot slower and less robust even if it would work for large data sets. Rather set the parallelism of the operator to 1. Fabian 2017-01-05 13:18 GMT+

Re: Changing parallelism

2017-01-10 Thread Fabian Hueske
Hi Abhishek, state can be emitted from funtions as regular records. There is no way to share state the local state of a task with other tasks of the same operator or with other operators. Flink's key-partitioned state is always scoped to the key of the current record. It is not possible to iterate

Re: manual scaling with savepoint

2017-01-11 Thread Fabian Hueske
Hi, Flink supports two types of state: 1) Key-partitioned state 2) Non-partitioned operator state (Checkpointed interface) Key-partitioned state is internally organized by key and can be "simply" rehashed. The actual implementation is more involved to make this efficient. This document contains d

Re: Custom writer with Rollingsink

2017-01-11 Thread Fabian Hueske
Hi, the exception says "org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): Failed to CREATE_FILE /y=2017/m=01/d=10/H=18/M=12/_part-0-0.in-progress for DFSClient_NONMAPREDUCE_1062142735_3". I would assume that your output format tries to create a file that already exists. Maybe you need

Re: How to get help on ClassCastException when re-submitting a job

2017-01-11 Thread Fabian Hueske
Hi Guiliano, thanks for bringing up this issue. A "ClassCastException: X cannot be cast to X" often points to a classloader issue. So it might actually be a bug in Flink. I assume you submit the same application (same jar file) with the same command right? Did you cancel the job before resubmitti

Re: Making batches of small messages

2017-01-11 Thread Fabian Hueske
Hi, I think this is a case for the ProcessFunction that was recently added and will be included in Flink 1.2. ProcessFunction allows to register timers (so the 5 secs timeout can be addressed). You can maintain the fault tolerance guarantees if you collect the records in managed state. That way th

Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-12 Thread Fabian Hueske
I have another bugfix for 1.2.: https://issues.apache.org/jira/browse/FLINK-2662 (pending PR) 2017-01-10 15:16 GMT+01:00 Robert Metzger : > Hi, > > this depends a lot on the number of issues we find during the testing. > > > These are the issues I found so far: > > https://issues.apache.org/jira

Re: the attribute order in sql 'select * from...'

2017-01-13 Thread Fabian Hueske
Hi Yuhong, I assume that OrderA is a table of POJO objects and you are expecting the order of the attribute to be as the order in which the fields of the POJO are defined in the source code. Flink accepts fields which are either public members or accessible via a getter and setter. This makes it

Re: the attribute order in sql 'select * from...'

2017-01-13 Thread Fabian Hueske
.fields = fields.toArray(new PojoField[fields.size()]); > > *Arrays.sort(this.fields, new Comparator() {* > > * @Override* > > * public int compare(PojoField o1, PojoField o2) {* > > * return o1.getField().getName().compareTo(o2.getField().getName());* > > * }* >

Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-13 Thread Fabian Hueske
5 > >> https://issues.apache.org/jira/browse/FLINK-5462 > >> https://issues.apache.org/jira/browse/FLINK-5464 > >> https://issues.apache.org/jira/browse/FLINK-5463 > >> > >> > >> On Thu, Jan 12, 2017 at 9:56 AM, Fabian Hueske &l

Re: Terminology: Split, Group and Partition

2017-01-13 Thread Fabian Hueske
Hi Robert, let me first describe what splits, groups, and partitions are. * Partition: This is basically all data that goes through the same task instance. If you have an operator with a parallelism of 80, you have 80 partitions. When you call sortPartition() you'll have 80 sorted streams, if you

Re: Terminology: Split, Group and Partition

2017-01-13 Thread Fabian Hueske
>> >> thanks for the quick and comprehensive reply. I'll have a look at the >> ExecutionPlan using your suggestion to check what actually gets computed, >> and I'll use the properties as well. If I stumble across something else >> I'll let you know. >>

Re: Objects accessible from all Flink nodes

2017-01-13 Thread Fabian Hueske
Hi Matt, it is not possible to share an object across different task of the same operator or even different operators. This would be globally mutable state which is in general hard to get efficient in distributed systems. Something that might work is to use a CoFlatMapOperator with one input bein

Re: Strategies for Complex Event Processing with guaranteed data consistency

2017-01-13 Thread Fabian Hueske
Hi Kat, I did not understand the difference between a case and a trace. If I got it right, the goal of your first job is to assemble the individual events into cases. Is a case here the last event for a case-id or all events of a case-id? If a case is the collection of all events (which I assume)

Re: Strategies for Complex Event Processing with guaranteed data consistency

2017-01-13 Thread Fabian Hueske
:02 GMT+01:00 Fabian Hueske : > Hi Kat, > > I did not understand the difference between a case and a trace. > If I got it right, the goal of your first job is to assemble the > individual events into cases. Is a case here the last event for a case-id > or all events of a case-id?

Re: Terminology: Split, Group and Partition

2017-01-15 Thread Fabian Hueske
ike > you suggested before? Because my guess is that the grouping information is > lost when going from T to U. > > Best and thanks for the great help! > Robert > > On Fri, Jan 13, 2017 at 8:54 PM, Fabian Hueske wrote: > >> I think so far getExecutionPlan() was only

Re: Kafka KeyedStream source

2017-01-16 Thread Fabian Hueske
Hi Niels, I think the biggest problem for keyed sources is that Flink must be able to co-locate key-partitioned state with the pre-partitioned data. This might work, if the key is the partition ID, i.e, not the original key attribue that was hashed to assign events to partitions. Flink could need

Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-16 Thread Fabian Hueske
LINK-5495 > https://issues.apache.org/jira/browse/FLINK-5496 > > On Fri, Jan 13, 2017 at 11:29 AM, Fabian Hueske wrote: > > > I tested the Table API / SQL a bit. > > > > I implemented a windowed aggregation with the streaming Table API and it > > produced the sa

Re: Can serialization be disabled between chains?

2017-01-16 Thread Fabian Hueske
One of the reasons is to ensure that data cannot be modified after it left a thread. A function that emits the same object several times (in order to reduce object creation & GC) might accidentally modify emitted records if they would be put as object in a queue. Moreover, it is easier to control t

Re: Zeppelin: Flink Kafka Connector

2017-01-17 Thread Fabian Hueske
One thing to add: Flink 1.2.0 has not been release yet. The FlinkKafkaConsumer010 is only available in a SNAPSHOT release or the first release candidate (RC0). Best, Fabian 2017-01-17 16:08 GMT+01:00 Timo Walther : > You are using an old version of Flink (0.10.2). A FlinkKafkaConsumer010 > was n

Re: Zeppelin: Flink Kafka Connector

2017-01-17 Thread Fabian Hueske
fka > specifically? > > > > *From: *Fabian Hueske > *Reply-To: *"user@flink.apache.org" > *Date: *Tuesday, January 17, 2017 at 7:10 AM > *To: *"user@flink.apache.org" > *Subject: *Re: Zeppelin: Flink Kafka Connector > > > >

Re: Zeppelin: Flink Kafka Connector

2017-01-18 Thread Fabian Hueske
Ah, OK :-) Thanks for reporting back! Cheers, Fabian 2017-01-17 17:50 GMT+01:00 Neil Derraugh < neil.derra...@intellifylearning.com>: > I re-read that enough times and it finally made sense. I wasn’t paying > attention and thought 0.10.2 was the Kafka version —which hasn’t been > released yet ei

Re: Strategies for Complex Event Processing with guaranteed data consistency

2017-01-18 Thread Fabian Hueske
best approach > here. > > Thanks a lot, > Kat > > On Fri, Jan 13, 2017 at 10:45 PM, Fabian Hueske wrote: > > On thing to add: the Flink KafkaProducer provides only at-least-once if > > flush-on-checkpoint is enabled [1]. > > > > [1] > > https://ci

Re: Window limitations on groupBy

2017-01-18 Thread Fabian Hueske
Hi Raman, I would approach this issues as follows. You key the input stream on the sourceId and apply a stateful FlatMapFunction. The FlatMapFunction has a key-partioned state and stores for each key (sourceId) the latest event as state. When a new event arrives, you can compute the time spend in

Re: Kafka KeyedStream source

2017-01-18 Thread Fabian Hueske
a and thenit is probably > a hashing function IN kafka that does the magic. > I'm not sure if we can control that enough with Kafka right now. > > > Niels > > On Mon, Jan 16, 2017 at 10:15 AM, Fabian Hueske wrote: > >> Hi Niels, >> >> I think the big

Re: Kafka Fetch Failed / DisconnectException

2017-01-18 Thread Fabian Hueske
Hi Jonas, your mail did not include the error message. Can you send it again? Thanks, Fabian 2017-01-18 17:37 GMT+01:00 Jonas : > Hi! > > According to the output, I'm having some problems with the KafkaConsumer09. > It reports the following on stdout: > > > > Is that something I should worry abo

Re: seeding a stream job

2017-01-19 Thread Fabian Hueske
Hi Jared, I think both approaches should work. The source that integrates the finite batch input and the stream might be more comfortable to use. As you said, the challenge would be to identify the exact point when to switch from one input to the other. One thing to consider when reading finite b

Re: Window limitations on groupBy

2017-01-19 Thread Fabian Hueske
? Would this be another way of having Flink keep this > information persistently without having to implement it manually? > > Thanks, > Raman > > On 18/01/17 11:22 AM, Fabian Hueske wrote: > > Hi Raman, > > > > I would approach this issues as follows. > >

Re: Keep bootstrapped config updated with stream from Kafka

2017-01-19 Thread Fabian Hueske
Hi Nihat, you could implement the stateful function as a RichFunction and load the data in the open() method. Best, Fabian 2017-01-19 2:53 GMT+01:00 Nihat Hosgur : > Hi all, > > We bootstrap data from some DB and then like to keep it updated with > updates coming through Kafka. At spark it was

Re: Flink SQL on JSON data without schema

2017-01-19 Thread Fabian Hueske
Hi Nihat, at the current state, Flink's SQL and Table APIs require a static schema. You could use an JSON object as value and implement scalar functions to extract fields, but that would not be very usable. Best, Fabian 2017-01-19 2:59 GMT+01:00 Nihat Hosgur : > Hi there, > We are evaluating fl

Re: How to get help on ClassCastException when re-submitting a job

2017-01-19 Thread Fabian Hueske
Hi Giuliano, I think it would be good to document this behavior, not sure though what the best place would be. It would be nice, if you could open a JIRA and describe the issue there (basically copy Yuri's analysis). Thank you, Fabian 2017-01-19 8:35 GMT+01:00 Giuliano Caliari : > Hello, > > Yu

Re: Operational concerns with state (was Re: Window limitations on groupBy)

2017-01-19 Thread Fabian Hueske
ally and is > restarted. I was able to take an explicit savepoint and then restart the > job with it. > > Is the correct approach as of now to take savepoints periodically via > cron, and use those to re-run jobs in case of flink failure or restart? > > Regards, > Raman >

Re: Flink SQL on JSON data without schema

2017-01-19 Thread Fabian Hueske
t I've > understood from your response is that regardless table source is KafkaTable > or not we need to provide static schema. > Best, > Nihat > > > On Thu, Jan 19, 2017 at 2:50 AM Fabian Hueske wrote: > > Hi Nihat, > > at the current state, Flink's SQL and Ta

Re: Count window on partition

2017-01-23 Thread Fabian Hueske
Hi Dmitry, the third version is the way to go, IMO. You might want to have a larger number of partitions if you are planning to later increase the parallelism of the job. Also note, that it is not guaranteed that 4 keys are uniformly distributed to 4 tasks. It might happen that one task ends up wi

Re: Custom Partitioning and windowing questions/concerns

2017-01-24 Thread Fabian Hueske
Hi Nikos, Flink's windows require a KeyedStream because they use the keys to manage their internal state (each in-progress window has some state that needs to be persisted and checkpointed). Moreover, Flink's event-time window operators return a deterministic result. In your use-case, the result o

Re: multi tenant workflow execution

2017-01-24 Thread Fabian Hueske
Hi Chen, if you plan to implement your application on top of the upcoming Flink 1.2.0 release, you might find the new AsyncFunction [1] and the ProcessFunction [2] helpful. AsyncFunction can be used for non-blocking calls to external services and maintains the checkpointing semantics. ProcessFunct

Re: How to get top N elements in a DataSet?

2017-01-24 Thread Fabian Hueske
Hi Ivan, I think you can use MapPartition for that. So basically: dataset // assuming some partitioning that can be reused to avoid a shuffle .sortPartition(1, Order.DESCENDING) .mapPartition(new ReturnFirstTen()) .sortPartition(1, Order.DESCENDING).parallelism(1) .mapPartition(new Return

Re: How to get top N elements in a DataSet?

2017-01-24 Thread Fabian Hueske
g the entire partitions. > > And if I remember correctly, this question comes up from time to time > on the mailing list. > > Best, > Gábor > > > > 2017-01-24 11:35 GMT+01:00 Fabian Hueske : > > Hi Ivan, > > > > I think you can use MapPartition for that. &

Re: How to get top N elements in a DataSet?

2017-01-24 Thread Fabian Hueske
=1 > > > On Tue, 24 Jan 2017 at 11:57 Fabian Hueske wrote: > >> You are of course right Gabor. >> @Ivan, you can use a heap in the MapPartitionFunction to collect the top >> 10 elements (note that you need to create deep-copies if object reuse is >> enabled [1])

Re: multi tenant workflow execution

2017-01-25 Thread Fabian Hueske
on TPS > event source. I would like to understand checkpoint size and speed > implications. > > How about checkpointing iteration stream? Can we achieve at least once > semantic in 1.2 on integration jobs? > > Thanks, > Chen > > On Tue, Jan 24, 2017 at 2:26 AM, Fabian Hues

Re: How to get help on ClassCastException when re-submitting a job

2017-01-25 Thread Fabian Hueske
Thank you Giuliano! 2017-01-25 6:54 GMT+01:00 Giuliano Caliari : > Issue reported: > > https://issues.apache.org/jira/browse/FLINK-5633 > > Sorry for taking so long > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/How-to-get-help- >

Re: Custom Partitioning and windowing questions/concerns

2017-01-25 Thread Fabian Hueske
hat the intentions for my use-case were not > quite clear. Please, do not hesitate to ask me for any clarifications. > > > > Again, thank you very much for your interest and your time. > > > > Kind Regards, > > > > Nikos R. Katsipoulakis, > > Department of Co

Re: Custom Partitioning and windowing questions/concerns

2017-01-25 Thread Fabian Hueske
It > looks like an easier way compared to the one I described above and I will > try to dive into its implementation details. > > > > Again, thank you very much for your help and your constructive comments. > > > > Kind Regards, > > > > Nikos R. Katsipoulaki

Re: State Descriptors / Queryable State Question

2017-01-26 Thread Fabian Hueske
Hi Joe, working on a KeyedStream means that the records are partitioned by that key, i.e., all records with the same key are processed by the same thread. Therefore, only on thread accesses the state for a particular key. Other tasks do not have read or write access to the state of other tasks. B

Re: Applying the same operator twice on a windowed stream

2017-01-27 Thread Fabian Hueske
Hi, the window operation is completed after you called apply the first time. The result is a regular DataStream. I assume your TrafficWindow emits multiple records. Otherwise, you'd probably apply a simple MapFunction after the window. So you are looking for a way to iterate over all values retur

Re: Calling external services/databases from DataStream API

2017-01-31 Thread Fabian Hueske
Hi Diego, you can also broadcast a changelog stream: DataStream mainStream = ... DataStream changeStream = ... mainStream.connect(changeStream.broadcast()).flatMap(new YourCoFlatMapFunction()); All records of the changeStream will be forwarded to each instance of the flatmap operator. Best, Fa

Re: Bug in Table api CsvTableSink

2017-01-31 Thread Fabian Hueske
Hi Flavio, I do not remember that such a bug was fixed. Maybe by chance, but I guess not. Can you open a JIRA and maybe provide input data to reproduce the problem? Thank you, Fabian 2017-01-31 16:25 GMT+01:00 Flavio Pompermaier : > Hi to all, > I'm trying to read from a db and then writing to

Re: allowed lateness on windowed join?

2017-02-06 Thread Fabian Hueske
Hi, Union is a super cheap operator in Flink. It does not scan the records, but just merges the streams. So the effort is very low. The built-in join operator works in the same way but does not expose allowed lateness. Cheers, Fabian

Re: Improving Flink Performance

2017-02-06 Thread Fabian Hueske
Hi Jonas, thanks for reporting back! Glad you solve the issue. Cheers, Fabian 2017-02-05 22:07 GMT+01:00 Jonas : > Using a profiler I found out that the main performance problem (80%) was > spent in a domain specific data structure. After implementing it with a > more > efficient one, the perfo

Re: How about Discourse (https://www.discourse.org/) for this mailing list

2017-02-06 Thread Fabian Hueske
Hi Jonas, thanks for the suggestion. Critical infrastructure (repository, dev mailing list) of Apache projects must be hosted on Apache infrastructure. For example, Github is just mirroring the ASF git repositories. We integrated the mailing lists with Nabble (user [1], dev [2]) and there is also

Re: Bug in Table api CsvTableSink

2017-02-06 Thread Fabian Hueske
e someone else could give it a try in the meantime.. >> >> Best, >> Flavio >> >> On Tue, Jan 31, 2017 at 4:49 PM, Fabian Hueske wrote: >> >>> Hi Flavio, >>> >>> I do not remember that such a bug was fixed. Maybe by chance, but I >>> g

Re: Table API: java.sql.DateTime is not supported;

2017-02-06 Thread Fabian Hueske
Hi, you can also use the CsvTableSource and read the DateTime fields as String. This will directly give you a table. You can implement a user-defined scalar function [1] to parse the String into a DateTime type. The benefit is that you stay in the Table API / SQL and don't have to deal with the D

Re: Strange filter performance with parquet

2017-02-07 Thread Fabian Hueske
Hi Billy, this might depend on what you are doing with the live and dead DataSets later on. For example, if you join both data sets, Flink might need to spill one of them to disk and read it back to avoid a deadlock. This happens for instance if the join strategy is a HashJoin which blocks one inp

Re: Strange filter performance with parquet

2017-02-07 Thread Fabian Hueske
ult to parquet. > > > > BTW on another point, > > Reading parquet files seems very slow to me. Writing is very fast in > comparison. It takes 60 slots 10 minutes to read 550million records from a > parquet file. We have MR jobs finishing processing in 8.5 minutes with 33 > cores s

Re: Cogroup hints/performance

2017-02-07 Thread Fabian Hueske
Hi Billy, A CoGroup does not have any freedom in its execution strategy. It requires that both inputs are partitioned on the grouping keys and are then performs a local sort-merge join, i.e, both inputs are sorted. Existing partitioning or sort orders can be reused. Since there is only one execut

Re: Strange filter performance with parquet

2017-02-07 Thread Fabian Hueske
aSet> d = > getExecutionEnvironment().readHadoopFile(inputFormat, Void.*class*, > GenericRecord.*class*, path.toString(), job).filter(*new* > *FilterFunction>()* { this does the live/dead > filtering… > > > > > > > > > > *From:* Fabian Hueske [mailto:f

Re: FieldForwarding hints

2017-02-07 Thread Fabian Hueske
The correct annotation would be: @ForwardedField("*->f1") The asterisk / wildcard addresses the complete input type. The DataSet API also performs a type-based validation. If the types of the fields on the left and right are not correct, it should fail. Best, Fabian 2017-02-07 23:13 GMT+01:00 N

Re: Watermarks per key

2017-02-15 Thread Fabian Hueske
Hi Jordan, it is not possible to generate watermarks per key. This feature has been requested a couple of times but I think there are no plans to implement that. As far as I understand, the management of watermarks would be quite expensive (maintaining several watermarks, purging watermarks of exp

Re: Unable to use Scala's BeanProperty with classes

2017-02-15 Thread Fabian Hueske
Hi Adarsh, I think this is the same bug. I'm afraid you have to wait until the problem is fixed. The only workaround would be to use a different data type, for example a case class. Best, Fabian 2017-02-15 6:08 GMT+01:00 Adarsh Jain : > Any help will be highly appreciable, am stuck on this one.

Re: Trapping Streaming Errors

2017-02-15 Thread Fabian Hueske
Hi Joe, you can also insert a MapFunction between the Kafka source and the keyBy to validate the IDs. The mapper will be chained and should not add only minimal overhead. If you want to keep the events which were incorrectly deserialized, you can use split() to move them somewhere. Validation in

Re: Flink jdbc

2017-02-16 Thread Fabian Hueske
The JdbcOutputFormat was originally meant for batch jobs. It should be possible to use it for streaming jobs as well, however, you should be aware that it is not integrated with Flink checkpointing mechanism. So, you might have duplicate data in case of failures. I also don't know if or how well i

Re: CSV sink partitioning and bucketing

2017-02-17 Thread Fabian Hueske
Hi Flavio, Flink does not come with an OutputFormat that creates buckets. It should not be too hard to implement this in Flink though. However, if you want a solution fast, I would try the following approach: 1) Search for a Hadoop OutputFormat that buckets Strings based on a key (). 2) Implement

Re: Aggregation problem.

2017-02-17 Thread Fabian Hueske
Hi, this looks like a bug to me. Can you open a JIRA and maybe a small testcase to reproduce the issue? Thank you, Fabian 2017-02-18 1:06 GMT+01:00 Kürşat Kurt : > Hi; > > > > I have a Dataset like this: > > > > *(**0,Auto,0.4,1,5.8317538999854194E-5)* > > *(0,Computer,0.2,1,4.8828125E-5)* > >

Re: Aggregation problem.

2017-02-18 Thread Fabian Hueske
rşat Kurt" wrote: > > Ok, i have opened the issue with the test case. > > Thanks. > > > > https://issues.apache.org/jira/browse/FLINK-5840 > > > > > > *From:* Fabian Hueske [mailto:fhue...@gmail.com] > *Sent:* Saturday, February 18, 2017 3:33 AM >

Re: Cross operation on two huge datasets

2017-02-22 Thread Fabian Hueske
Hi Gwen, Flink usually performs a block nested loop join to cross two data sets. This algorithm spills one input to disk and streams the other input. For each input it fills a memory buffer and to perform the cross. Then the buffer of the spilled input is refilled with spilled records and records

Re: Flink jdbc

2017-02-22 Thread Fabian Hueske
gt; kinda a little hard to figure whats going wrong here. > > Thanks > > On 02/16/2017 02:02 PM, Fabian Hueske wrote: > > The JdbcOutputFormat was originally meant for batch jobs. > It should be possible to use it for streaming jobs as well, however, you > should be aware

Re: problem with increase job parallelism

2017-10-20 Thread Fabian Hueske
Hi Lei, setting explicit operator ID should solve this issue. As far as I know, the auto-generated operator id also depended on the operator parallelism in previous versions of Flink (not sure until which point). Which version are you running? Best, Fabian 2017-10-17 3:15 GMT+02:00 Lei Chen :

Re: Monitoring folder in flink

2017-10-24 Thread Fabian Hueske
Hi, with PROCESS_CONTINUOUSLY the application monitors the directory and processes new arriving files or files that have been modified. In this case the application never terminates because it is waiting for new files to appear. With PROCESS_ONCE, the content of a directory is processed as it was

Re: Delta iteration not spilling to disk

2017-10-25 Thread Fabian Hueske
Hi Joshua, that is correct. Delta iterations cannot spill to disk. The solution set is managed in an in-memory hash table. Spilling that hash table to disk would have a significant impact on the performance. By default the hash table is organized in Flink's managed memory. You can try to increase

Re: Delta iteration not spilling to disk

2017-10-25 Thread Fabian Hueske
to a null being present in the solution > set tuple so I added assertions to ensure that tuple values were never > null. However, I’m still getting the above error. Did changing it to > unmanaged cause the tuples to be serialized? Is there another reason aside > from null values that this er

Re: Case Class TypeInformation

2017-10-25 Thread Fabian Hueske
ses: > https://issues.apache.org/jira/browse/FLINK-7859 > > Joshua > > > On Oct 17, 2017, at 3:01 AM, Fabian Hueske wrote: > > Hi Joshua, > > that's a limitation of the Scala API. > Row requires to explicitly specify a TypeInformation[Row] but it is not > possible to in

Re: State snapshotting when source is finite

2017-10-26 Thread Fabian Hueske
Hi Flavio, Thanks for bringing up this topic. I think running periodic jobs with state that gets restored and persisted in a savepoint is a very valid use case and would fit the stream is a superset of batch story quite well. I'm not sure if this behavior is already supported, but think this would

Re: Local combiner on each mapper in Flink

2017-10-26 Thread Fabian Hueske
Hi, in a MapReduce context, combiners are used to reduce the amount of data 1) to shuffle and fully sort (to group the data by key) and 2) to reduce the impact of skewed data. The question is, why do you need a combiner in your use case. - To reduce the data to shuffle: You should not use a windo

Re: Tasks, slots, and partitioned joins

2017-10-26 Thread Fabian Hueske
Hi David, Flink's DataSet API schedules one slice of a program to a task slot. A program slice is one parallel instance of each operator of a program. When all operator of your program run with a parallelism of 1, you end up with only 1 slice that runs on a single slot. Flink's DataSet API leverag

Re: Passing Configuration & State

2017-10-26 Thread Fabian Hueske
Hi Navneeth, the configuring user function using a Configuration object and setting the parameters in the open() method of a RichFunction is no longer recommended. In fact, that only works for the DataSet API and has not been added for the DataStream API. The open() method with the Configuration p

Re: Tasks, slots, and partitioned joins

2017-10-26 Thread Fabian Hueske
Hi David, please find my answers below: 1. For high utilization, all slot should be filled. Each slot will processes a slice of the program on a slice of the data. In case of partitioning or changed parallelism, the data is shuffled accordingly . 2. That's a good question. I think the default log

Re: Not enough free slots to run the job

2017-10-27 Thread Fabian Hueske
Hi David, that's correct. A TM is a single process. A slot is just a virtual concept in the TM process and runs its program slice in multiple threads. Besides managed memory (which is split into chunks add assigned to slots) all other resources (CPU, heap, network, disk) are not isolated and free

Re: Help on RowTypeInfo?

2017-11-01 Thread Fabian Hueske
Hi Paul, The *.scala.StreamTableEnvironment is for Scala programs, the *.java.StreamTableEnvironment for Java programs and the third is the common basis of the Scala and Java environment. TableEnvironment.getTableEnvironment automatically creates the appropriate TableEnvironment based on the provi

Re: Batch job per stream message?

2017-11-01 Thread Fabian Hueske
Hi Tomas, triggering a batch DataSet job from a DataStream program for each input record doesn't sound like a good idea to me. You would have to make sure that the cluster always has sufficient resources and handle failures. It would be preferable to have all data processing in a DataStream job.

Re: Reprocessing the data after config change

2017-11-01 Thread Fabian Hueske
Hi Tomasz, that sounds like a sound design. You have to make sure that the output of the application is idempotent such that the reprocessing job overrides all! output data of the earlier job. Best, Fabian 2017-10-23 16:24 GMT+02:00 Tomasz Dobrzycki : > Hi all, > > I'm currently working on a

Re: Batch job per stream message?

2017-11-02 Thread Fabian Hueske
basically there has to be an accumulator implemented inside > AsyncFunction to gather up all results and return them in a single > .collect() call. > but how to know when to do so? or I am completely off track here > > > > On Wed, 1 Nov 2017 at 03:57 Fabian Hueske wrote: >

Re: Facing issues with Logback

2017-11-06 Thread Fabian Hueske
Hi Teena, thanks for reaching out to the mailing list for this issue. This sound indeed like a bug in Flink and should be investigated. We are currently working on a new release 1.4 and the testing phase will start soon. So it would make sense to include this problem in the testing and hopefully i

Re: DataStream to Table Api idioms

2017-11-06 Thread Fabian Hueske
Hi Seth, I think the Table API is not there yet to address you use case. 1. Allowed lateness cannot be configured but it is on the list of features that we plan to add in the future. 2. Custom triggers are not supported. We are planning to add an option to support your use case (early firing and

Re: Kafka Consumer fetch-size/rate and Producer queue timeout

2017-11-07 Thread Fabian Hueske
Hi Ashish, Gordon (in CC) might be able to help you. Cheers, Fabian 2017-11-05 16:24 GMT+01:00 Ashish Pokharel : > All, > > I am starting to notice a strange behavior in a particular streaming app. > I initially thought it was a Producer issue as I was seeing timeout > exceptions (records expir

Re: ExecutionGraph not serializable

2017-11-07 Thread Fabian Hueske
Hi XiangWei, I don't think this is a public interface, but Till (in CC) might know better. Best, Fabian 2017-11-06 3:27 GMT+01:00 XiangWei Huang : > Hi Flink users, > Flink Jobmanager throw a NotSerializableException when i used > JobMasterGateway to get ExecutionGraph of a specific job with >

Re: Testing / Configuring event windows with Table API and SQL

2017-11-10 Thread Fabian Hueske
Hi Colin, Flink's SQL runner does not support handling of late data yet. At the moment, late events are simply dropped. We plan to add support for late data in a future release. The "withIdleStateRetentionTime" parameter only applies to non-windowed aggregation functions and controls when they ca

Re: Metric Registry Warnings

2017-11-13 Thread Fabian Hueske
Hi Ashish, this is a known issue and has been fixed for the next version [1]. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-7100 2017-11-11 16:02 GMT+01:00 Ashish Pokharel : > All, > > Hopefully this is a quick one. I enabled Graphite reporter in my App and I > started to see th

Re: Flink drops messages?

2017-11-13 Thread Fabian Hueske
Hi Andrea, you are right. Flink's window operators can drop messages which are too late, i.e., have a timestamp smaller than the last watermark. This is expected behavior and documented at several places [1] [2]. There are a couple of options how to deal with late elements: 1. Use more conservat

Re: AvroParquetWriter may cause task managers to get lost

2017-11-13 Thread Fabian Hueske
Hi Ivan, I don't have much experience with Avro, but extracting the schema and creating a writer for each record sounds like a pretty expensive approach. This might result in significant load and increased GC activity. Do all records have a different schema or might it make sense to cache the wri

Re: Flink drops messages?

2017-11-13 Thread Fabian Hueske
Thanks for the correction! :-) 2017-11-13 13:05 GMT+01:00 Kien Truong : > Getting late elements from side-output is already available with Flink 1.3 > :) > > Regards, > > Kien > On 11/13/2017 5:00 PM, Fabian Hueske wrote: > > Hi Andrea, > > you are right.

Re: Streaming User-defined Aggregate Function gives exception when using Table API jars

2017-11-15 Thread Fabian Hueske
Hi Colin, thanks for reporting the bug. I had a look at it and it seems that the wrong classloader is used when compiling the code (both for the batch as well as the streaming queries). I have a fix that I need to verify. It's not necessary to open a new JIRA for that. We can cover all cases unde

Re: How to write dataset as parquet format

2017-11-22 Thread Fabian Hueske
Hi Ebru, AvroParquetOutputFormat seems to implement Hadoop's OutputFormat interface. Flink provides a wrapper for Hadoop's OutputFormat [1], so you can try to wrap AvroParquetOutputFormat in Flink's HadoopOutputFormat. Hope this helps, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-r

Re: external checkpoints

2017-11-24 Thread Fabian Hueske
Hi Aviad, sorry for the late reply. You can configure the checkpoint directory (which is also used for externalized checkpoints) when you create the state backend: env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"); This configures the checkpoint directory to be hdfs:///che

Re: Accessing Cassandra for reading and writing

2017-11-24 Thread Fabian Hueske
Hi Andre, Do you have a batch or streaming use case? Flink provides Cassandra Input and OutputFormats for DataSet (batch) jobs and a Cassandra Sink for DataStream applications. The is no Cassandra source for DataStream applications. Regarding your error, this looks more like a Zepplin configurati

<    5   6   7   8   9   10   11   12   13   14   >