Re: coordinate watermarks between jobs?

2018-05-04 Thread Eron Wright
It might be possible to apply backpressure to the channels that are significantly ahead in event time. Tao, it would not be trivial, but if you'd like to investigate more deeply, take a look at the Flink runtime's `StatusWatermarkValve` and the associated stream input processors to see how an oper

Use of AggregateFunction's merge() method

2018-05-04 Thread Ken Krugler
I’m trying to figure out when/why the AggregateFunction.merge() method is called in a streaming job, to ensure I’ve implemented it properly. The docu

Re: Stashing key with AggregateFunction

2018-05-04 Thread Ken Krugler
Hi Fabian & Stefan, Thanks, and yes that does work more like what I’d expect. Regards, — Ken PS - Just FYI the Java code examples in the documentation referenced below have a number of bugs, see FLINK-9299 . > On May 4, 2018, at 7:35 AM, Fab

Re: Flink + Marathon (Mesos) Memory Issues

2018-05-04 Thread hao gao
Hi, Since you said BucketingSink, I think it may be related to your bucketer. Let's say you bucket by hour. In your stream, at a moment, your records' timestamp ranges from hour 00 to hour 23. Which means in your task, it needs 24 writers dedicated to each bucket. If you have 4 task slots in a ta

Why FoldFunction deprecated?

2018-05-04 Thread 陈梓立
I just write a code snip like ``` .fold(new Tuple2<>("", 0L), new FoldFunction>() { @Override public Tuple2 fold(Tuple2 acc, WikipediaEditEvent event) { acc.f0 = event.getUser(); acc.f1 += event.getByteDiff();

Re: Init RocksDB state backend during startup

2018-05-04 Thread Tao Xia
Also would like to know how to do this if it is possible. On Fri, May 4, 2018 at 9:31 AM, Peter Zende wrote: > Hi, > > We use RocksDB with FsStateBackend (HDFS) to store state used by the > mapWithState operator. Is it possible to initialize / populate this state > during the streaming applicati

Re: coordinate watermarks between jobs?

2018-05-04 Thread Tao Xia
Without throttle, it will eventually ran out of memory. I think this is a very common use case for Flink users during stream replay or re-process. Do we have anything feature planed for it? Would like to contribute on the initiative. On Wed, May 2, 2018 at 2:43 AM, Fabian Hueske wrote: > Hi Tao,

Re: Wrong joda lib

2018-05-04 Thread Stefan Richter
Then that is the jar from which it currently takes the code for your DateTime class at that point in the code. > Am 04.05.2018 um 18:29 schrieb Flavio Pompermaier : > > The output of that code is > file:/opt/cloudera/parcels/CDH-5.11.2-1.cdh5.11.2.p0.4/jars/jruby-cloudera-1.0.0.jar > > On Fri,

Init RocksDB state backend during startup

2018-05-04 Thread Peter Zende
Hi, We use RocksDB with FsStateBackend (HDFS) to store state used by the mapWithState operator. Is it possible to initialize / populate this state during the streaming application startup? Our intention is to reprocess the historical data from HDFS in a batch job and save the latest state of the

Re: Wrong joda lib

2018-05-04 Thread Flavio Pompermaier
The output of that code is file:/opt/cloudera/parcels/CDH-5.11.2-1.cdh5.11.2.p0.4/jars/jruby-cloudera-1.0.0.jar On Fri, May 4, 2018 at 6:11 PM, Stefan Richter wrote: > Hi, > > you can try to figure out the jar with org.joda.time.DateTime.class. > getProtectionDomain().getCodeSource().getLocation

Re: Wrong joda lib

2018-05-04 Thread Stefan Richter
Why does it make this a problem? You could also run method in a static block and move the initialization of your variable at the bottom of that static block (or comment it out). > Am 04.05.2018 um 18:14 schrieb Flavio Pompermaier : > > The problem is that the problem occurs during the initiali

Re: Wrong joda lib

2018-05-04 Thread Flavio Pompermaier
The problem is that the problem occurs during the initialization of a static variable: private static final DateTime MIN_DATE = new DateTime(1850, 01, 01, 0, 0); On Fri, May 4, 2018 at 6:11 PM, Stefan Richter wrote: > Hi, > > you can try to figure out the jar with org.joda.time.DateTime.class.g

Re: Wrong joda lib

2018-05-04 Thread Stefan Richter
Hi, you can try to figure out the jar with org.joda.time.DateTime.class.getProtectionDomain().getCodeSource().getLocation() in the right context. Best, Stefan > Am 04.05.2018 um 18:02 schrieb Flavio Pompermaier : > > Hi to all, > I'm trying to run a job on a test cluster with Flink 1.3.1 but

Re: Question about datasource replication

2018-05-04 Thread Fabian Hueske
The spilling will only happen when joining the branched data sets. If you keep them separate and eventually emit them, no intermediate data will be spilled. 2018-05-04 18:05 GMT+02:00 Flavio Pompermaier : > Does this duplication happen when I write directly to disk after the > flatMaps? > > On Fr

Re: Question about datasource replication

2018-05-04 Thread Flavio Pompermaier
Does this duplication happen when I write directly to disk after the flatMaps? On Fri, May 4, 2018 at 6:02 PM, Fabian Hueske wrote: > That will happen if you join (or coGroup) the branched DataSets, i.e., you > have branching and merging pattern in your stream. > > The problem in that case is th

Wrong joda lib

2018-05-04 Thread Flavio Pompermaier
Hi to all, I'm trying to run a job on a test cluster with Flink 1.3.1 but my job fails with the following error: java.lang.NoSuchMethodError: org.joda.time.DateTime.(I)V The job works when I run it from the IDE and in our production environment...I've looked into all jars within libs and tha

Re: Question about datasource replication

2018-05-04 Thread Fabian Hueske
That will happen if you join (or coGroup) the branched DataSets, i.e., you have branching and merging pattern in your stream. The problem in that case is that one of the inputs is pipelined (e.g., the probe side of a hash join) and the other one is blocking. In order to execute such a plan, we mus

Re: This server is not the leader for that topic-partition

2018-05-04 Thread Alexander Smirnov
Thanks for quick turnaround Stefan, Piotr This is a rare reproducible issue and I will keep an eye on it searching on the Stack Overflow I found https://stackoverflow.com/questions/43378664/kafka-leader-election-causes-kafka-streams-crash They say that the problem is fixed in 0.10.2.1 of kafka p

Re: Question about datasource replication

2018-05-04 Thread Flavio Pompermaier
Hi Fabian, thanks for the detailed reply. The problem I see is that the source dataset is huge and, since it doesn't fit in memory, it's spilled twice to disk (I checked the increasing disk usage during the job and it was corresponding exactly to the size estimated by the Flink UI, that is twice it

Re: Question about datasource replication

2018-05-04 Thread Fabian Hueske
Hi Flavio, No, there's no way around it. DataSets that are processed by more than one operator cannot be processed by chained operators. The records need to be copied to avoid concurrent modifications. However, the data should not be shipped over the network if all operators have the same parallel

Re: Stashing key with AggregateFunction

2018-05-04 Thread Fabian Hueske
Hi Ken, You can also use an additional ProcessWindowFunction [1] that is applied on the result of the AggregateFunction to set the key. Since the PWF is only applied on the final result, there no overhead (actually, it might even be slightly cheaper because the AggregateFunction can be simpler).

Re: Stashing key with AggregateFunction

2018-05-04 Thread Stefan Richter
Hi, I have two possible options to achieve this. The first option is that you could obviously always derive the key again from the input of the aggregate function. The second option is combining an AggregateFunction with a ProcessWindowFunction. With the second solution you get incremental aggr

Re: This server is not the leader for that topic-partition

2018-05-04 Thread Piotr Nowojski
Hi, I think Stefan is right. Quick google search points to this: https://stackoverflow.com/questions/47767169/kafka-this-server-is-not-the-leader-for-that-topic-partition Please let us k

Re: This server is not the leader for that topic-partition

2018-05-04 Thread Stefan Richter
Hi, I think in general this means that your producer client does not connect to the correct Broker (the leader) but to a broker that is just a follower and the follower can not execute that request. However, I am not sure what causes this in the context of the FlinkKafkaProducer. Maybe Piotr (i

This server is not the leader for that topic-partition

2018-05-04 Thread Alexander Smirnov
Hi, what could cause the following exception? org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaPro

Re: PartitionNotFoundException after deployment

2018-05-04 Thread Gyula Fóra
Looks pretty clear that one operator takes too long to start (even on the UI it shows it in the created state for far too long). Any idea what might cause this delay? It actually often crashes on Akka ask timeout during scheduling the node. Gyula Piotr Nowojski ezt írta (időpont: 2018. máj. 4.,

Re: PartitionNotFoundException after deployment

2018-05-04 Thread Piotr Nowojski
Ufuk: I don’t know why. +1 for your other suggestions. Piotrek > On 4 May 2018, at 14:52, Ufuk Celebi wrote: > > Hey Gyula! > > I'm including Piotr and Nico (cc'd) who have worked on the network > stack in the last releases. > > Registering the network structures including the intermediate r

Re: Question about datasource replication

2018-05-04 Thread Flavio Pompermaier
Flink 1.3.1 (I'm waiting 1.5 before upgrading..) On Fri, May 4, 2018 at 2:50 PM, Amit Jain wrote: > Hi Flavio, > > Which version of Flink are you using? > > -- > Thanks, > Amit > > On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier > wrote: > > Hi all, > > I've a Flink batch job that reads a pa

Re: PartitionNotFoundException after deployment

2018-05-04 Thread Ufuk Celebi
Hey Gyula! I'm including Piotr and Nico (cc'd) who have worked on the network stack in the last releases. Registering the network structures including the intermediate results actually happens **before** any state is restored. I'm not sure why this reproducibly happens when you restore state. @Ni

Re: Question about datasource replication

2018-05-04 Thread Amit Jain
Hi Flavio, Which version of Flink are you using? -- Thanks, Amit On Fri, May 4, 2018 at 6:14 PM, Flavio Pompermaier wrote: > Hi all, > I've a Flink batch job that reads a parquet dataset and then applies 2 > flatMap to it (see pseudocode below). > The problem is that this dataset is quite big a

Question about datasource replication

2018-05-04 Thread Flavio Pompermaier
Hi all, I've a Flink batch job that reads a parquet dataset and then applies 2 flatMap to it (see pseudocode below). The problem is that this dataset is quite big and Flink duplicates it before sending the data to these 2 operators (I've guessed this from the doubling amount of sent bytes) . Is the

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-05-04 Thread Stefan Richter
Cool, that is good news! Thanks for sharing this information with us, Best, Stefan > Am 04.05.2018 um 12:27 schrieb Tony Wei : > > have replaced to local SSDs and enabled incremental checkpoint mechanism as > well. Our job has run healthily for more than two weeks.

Re: checkpoint stuck with rocksdb statebackend and s3 filesystem

2018-05-04 Thread Tony Wei
Hi Stefan, Sihua, We finally found out the root cause. Just as you said, why the performance had been downgraded is due to EBS. My team and I wasn't familiar with EBS before. We thought its performance is not so weak as the monitor showed us. But we visited this page [1]

Dynamically deleting kafka topics does not remove partitions from kafkaConsumer

2018-05-04 Thread Edward Rojas
Hello all, We have a kafka consumer listening to a topic pattern "topic-*" with a partition discovery interval. We eventually add new topics and this is working perfectly, the consumer discover the new topics (and partitions) and listen to them. But we also remove topics eventually and in this ca

Re: intentional back-pressure (or a poor man's side-input)

2018-05-04 Thread Piotr Nowojski
> running a batch or "bounded stream" job first to generate a "cache state", > and then launching the main streaming job, which loads this initial state > load in open()... not sure how to work out the keying. > This is the recommended workaround this issue - first start a job to precompute so

Re: Flink + Marathon (Mesos) Memory Issues

2018-05-04 Thread Stefan Richter
Hi, besides your configured heap size, there is also some off-heap memory used in the JVM process, in particular by RocksDB. Each keyed operator instance on a TM has its own RocksDB instance, so the question is how many are running in one container and what is their configuration? For RocksDB f

RE: use of values of previously accepted event

2018-05-04 Thread Esa Heikkinen
That would be enough, but I would appreciate the full source (Scala) code examples of using IterativeConditions. How to find correct imports for getEventsForPattern ? Best, Esa From: Dawid Wysakowicz Sent: Thursday, May 3, 2018 2:53 PM To: Esa Heikkinen Subject: Re: use of values of previousl

PartitionNotFoundException after deployment

2018-05-04 Thread Gyula Fóra
Hi Ufuk, Do you have any quick idea what could cause this problems in flink 1.4.2? Seems like one operator takes too long to deploy and downstream tasks error out on partition not found. This only seems to happen when the job is restored from state and in fact that operator has some keyed and oper