RE: Lower Parallelism derives better latency

2018-01-03 Thread Netzer, Liron
Hi Stefan, Thanks for replying. All of the tests below were executed with a buffer timeout of zero: env.setBufferTimeout(0); so this means that the buffers were flushed after each record. Any other explanation? :) Thanks, Liron From: Stefan Richter [mailto:s.rich...@data-artisans.com] Sent:

RE: Lower Parallelism derives better latency

2018-01-03 Thread Netzer, Liron
Hi Aljoscha, The latency is measured with Flink MetricGroup (specifically with "DropwizardHistogram"). The latency is measured from message read time (i.e. from when the message is pulled from Kafka source) until the last operator completes the processing(there is no Kafka sink). Thanks, Liron

Exception on running an Elasticpipe flink connector

2018-01-03 Thread vipul singh
Hello, We are working on a Flink ES connector, sourcing from a kafka stream, and sinking data into elasticsearch. The code works fine in intellij, but while running the code on emr(version 5.9, which uses flink 1.3.2) using flink-yarn-session, we are seeing this exception Using the parallelism pr

Re: JobManager not receiving resource offers from Mesos

2018-01-03 Thread 김동원
Hi Stefan, I don't want to introduce Hadoop in Flink clusters. I think the exception is not that serious as it is shown only when log-level is set to DEBUG. Do I have to set HADOOP_HOME to use Flink on dc/os? Regards, Dongwon > 2018. 1. 3. 오후 7:34, Stefan Richter 작성: > > Hi, > > did you see

Can't call getProducedType on Avro messages with array types

2018-01-03 Thread Kyle Hamlin
Hi, It appears that Kryo can't properly extract/deserialize Avro array types. I have a very simple Avro schema that has an array type and when I remove the array field the error is not thrown. Is there any way around this without using a specific type? *Avro Schema:* { "type": "record", "

BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

2018-01-03 Thread Kyle Hamlin
Hello, After moving to Flink 1.4.0 I'm getting the following error. I can't find anything online that addresses it. Is it a Hadoop dependency issue? Here are my project dependencies: libraryDependencies ++= Seq( "org.apache.flink" %% "flink-scala" % flinkVersion % Provided, "org.apache.flink"

Re: Flink CEP with event time

2018-01-03 Thread shashank agarwal
@Dawid, I was using 1.3.2, I have checked on 1.4.0 also still facing the same issue. @Aljoscha, I have to cover the case where B can come after A from Kafka. How I can achieve this as Event Time is not working. How should I implement this? A followedBy B. As I am using kafka source and my even

Re: Job Manager not able to fetch job info when restarted

2018-01-03 Thread Sushil Ks
Hi Stefan, It was just ran without HA, via yarn. Will try running a yarn session with HA and test, thanks for you time. Also is there a way we can get alerts if a pipeline restarts or gets cancelled in Flink? Regards, Sushil Ks On Jan 3, 2018 7:07 PM, "Stefan Richter" wrote: > Hi,

Re: Apache Flink - Connected Stream with different number of partitions

2018-01-03 Thread M Singh
Thanks Aljoscha and Timo for your answers.  I will try to digest the pointers you provided. Mans On Wednesday, January 3, 2018 3:01 AM, Aljoscha Krettek wrote: Hi, The answer is correct but I'll try and elaborate a bit: the way data is sent to downstream operations depends on a couple

Re: Apache Flink - Question about rolling window function on KeyedStream

2018-01-03 Thread M Singh
Hi Stefan: Thanks for your response. A follow up question - In a streaming environment, we invoke the operation reduce and then output results to the sink. Does this mean reduce will be executed once on every trigger per partition with all the items in each partition ? Thanks On Wednesday

Re: Pending parquet file with Bucking Sink

2018-01-03 Thread Aljoscha Krettek
Hi, Your analysis is correct. If the program ends before we can do a checkpoint files will never be moved to "final" state. We could move all files to "final" stage when the Sink is closing but the problem here is that Flink currently doesn't provide a way for user functions (which Sinks are) t

Re: scala 2.12 support/cross-compile

2018-01-03 Thread Hao Sun
Thanks Stephan and Alhoscha for the info! On Wed, Jan 3, 2018 at 2:41 AM Aljoscha Krettek wrote: > Hi, > > This is the umbrella issue for Scala 2.12 support. As Stephan pointed out, > the ClosureCleaner and SAMs are currently the main problems. The first is > also a problem for Spark, which trac

Re: state.checkpoints.dir not configured

2018-01-03 Thread Aljoscha Krettek
Hi, I think what might have happened is that you had an earlier JobManager still running that had the old configuration loaded. Then you tried starting a new JobManager. Could that be the case? Best, Aljoscha > On 21. Dec 2017, at 16:34, Plamen Paskov > wrote: > > I'm sorry but i already cl

Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-03 Thread Kyle Hamlin
Hello Stephan & Nico, Here is the full stacktrace, its not much more than what I originally posted. I remember seeing an XMLInputFactory input error at one point, but I haven't seen that again. Is there any other information I can provide that will help resolve? [flink-1.4.0] ./bin/flink run ~/st

Re: Flink CEP with event time

2018-01-03 Thread Dawid Wysakowicz
Hi shashank, What version of flink are you using? Is it possible that you are hitting this issue: https://issues.apache.org/jira/browse/FLINK-7563 ? Watermark semantics in CEP was buggy and events were processed only if its timestamp was lower than current watermark while it should be lower or

Re: Flink CEP with event time

2018-01-03 Thread Aljoscha Krettek
What are the actual timestamps? If your BoundedOutOfOrderness extractor is lagging by 10 seconds then only seeing Event 1.B would not trigger execution. Only the later Event 2.A is sufficiently far ahead to trigger execution, which you actually get. > On 3. Jan 2018, at 17:05, shashank agarwal

Re: Flink CEP with event time

2018-01-03 Thread shashank agarwal
Low Watermark is showing the same value which I am passing in event "1514994744412" for all the tasks related to that stream, (No watermark) is showing for Kafka source in UI. So the pattern is following for CEP A followedBy B : Event 1 - I passed A with origTimestamp X. (Low watermark updated to

Re: periodic trigger

2018-01-03 Thread Piotr Nowojski
Hi, Sorry for late response (because of the holiday period). You didn’t mention lateness previously, that’s why I proposed such solution. Another approach would be to calculate max session length per user on the first aggregation level and at the same time remember what was the previously emi

Re: Separate checkpoint directories

2018-01-03 Thread Kyle Hamlin
Hi Stefan, In the past, I ran four separate Flink apps to sink data from four separate Kafka topics to s3 without any transformations applied. For each Flink app, I would set the checkpoint directory to s3://some-bucket/checkpoints/topic-name. It appears that with Flink 1.4 I can just use a regex

Re: BackPressure handling

2018-01-03 Thread Vishal Santoshi
Absolutely. But without a a view into a global WM at the source level, the --> would require sources to wait for sources that are "slower in time" --> is not possible for folks creating custom sources on extending existing ones. I would have loved to use a more scientific/data drive approa

Fwd: Queryable State Client - Actor Not found Exception

2018-01-03 Thread Velu Mitwa
Hi, I am running a Flink Job which uses the Queryable State feature of Apache Flink(1.3.2). I was able to do that in local mode. When I try to do that in Cluster mode (Yarn Session), I am getting Actor not found Exception. Please help me to understand what is missing. *Exception Trace* Query fa

Re: Flink CEP with event time

2018-01-03 Thread Aljoscha Krettek
Can you please check what the input watermark of your operations is? There is a metric called "currentLowWatermark" for this. Best, Aljoscha > On 3. Jan 2018, at 15:54, shashank agarwal wrote: > > Actually, In Kafka there are other topics also (around 5-6 topics) I am > consuming particular t

Re: A question about Triggers

2018-01-03 Thread Vishal Santoshi
Dear Fabian, I was able to create a pretty functional ProcessFunction and here is the synopsis and please see if it makes sense. Sessionization is unique as in it entails windows of dynamic length. The way flink approaches is pretty simple. It will create a TimeWindow of size "gap" rel

Re: BackPressure handling

2018-01-03 Thread Aljoscha Krettek
Hi, I think your analysis is very thorough and accurate. However, I don't think that https://issues.apache.org/jira/browse/FLINK-7282 will solve this problem. We're dealing with "time back-pressure" here and not traditional processing back-pre

Re: does the flink sink only support bio?

2018-01-03 Thread Stefan Richter
I think a mix of async UPDATES and exactly-once all this might be tricky, and the typical use case for async IO is more about reads. So let’s take a step back: what would you like to achieve with this? Do you want a read-modify-update (e.g. a map function that queries and updates a DB) or just

Re: Flink CEP with event time

2018-01-03 Thread shashank agarwal
Actually, In Kafka there are other topics also (around 5-6 topics) I am consuming particular topic 'x' which only contains events. Other topics have different data. I am using two consumers in my program for 2 different topics. in first topic x i am extracting the timestamp from origintimestamp v

Re: RichAsyncFunction in scala

2018-01-03 Thread Aljoscha Krettek
Hi, There is this Jira issue: https://issues.apache.org/jira/browse/FLINK-6756 Best, Aljoscha > On 28. Dec 2017, at 17:10, Antoine Philippot > wrote: > > Hi Ufuk, > > I don't think it is possible as I use this function as a parameter of >

Re: BackPressure handling

2018-01-03 Thread Vishal Santoshi
To add and an interim solution to the issue. I extended the based on the advise "custom source/adapt an existing source" and put in a RateLimiter ( guava ) that effectively put a cap on each kafka consumer ( x times the expected incident rqs ). That solved the issue as in it stabilized the flow

Re: MergingWindow

2018-01-03 Thread Aljoscha Krettek
Yes, this is a very good description! To see this in action you can run MergingWindowSetTest and comment out the check in MergingWindowSet, then you will see test failures and can trace what situations lead to problematic behaviour without that check. > On 29. Dec 2017, at 15:07, jincheng sun

Re: Flink CEP with event time

2018-01-03 Thread Aljoscha Krettek
Ok, but will there be events in all Kafka partitions/topics? > On 3. Jan 2018, at 15:33, shashank agarwal wrote: > > Hi, > > Yes, Events will always carry a variable OriginTimestamp which I am using in > the extractor. I have used fallback also in case of data missing will put > System curren

Re: Flink CEP with event time

2018-01-03 Thread shashank agarwal
Hi, Yes, Events will always carry a variable OriginTimestamp which I am using in the extractor. I have used fallback also in case of data missing will put System current millis. Still, it's not printing results. Best, Shashank ‌ On Wed, Jan 3, 2018 at 7:40 PM, Aljoscha Krettek wrote: > H

Re: Separate checkpoint directories

2018-01-03 Thread Kyle Hamlin
> On Jan 3, 2018, at 5:51 AM, Stefan Richter > wrote: > > Hi, > > first, let my ask why you want to have a different checkpoint directory per > topic? It is perfectly ok to have just a single checkpoint directory, so I > wonder what the intention is? Flink will already create proper subdire

Re: keyby() issue

2018-01-03 Thread Aljoscha Krettek
Side note: Sliding windows can be quite expensive if the slide is small compared to the size. Flink will treat each "slide" as a separate window, so in your case you will get 60 * num_keys windows, which can become quite big. Best, Aljoscha > On 2. Jan 2018, at 17:41, Timo Walther wrote: > >

Re: Flink CEP with event time

2018-01-03 Thread Aljoscha Krettek
Hi, Are all the partitions always carrying data that has advancing timestamps? When using Event-time the Kafka source (and Flink in general) needs to have steady progress in all partitions, otherwise the watermark does not advance, which in turn means that processing will be stalled downstream.

Re: does the flink sink only support bio?

2018-01-03 Thread Jinhua Luo
No, I mean how to implement exactly-once db commit (given our async io target is mysql), not the state used by flink. As mentioned in previous mail, if I commit db in notifyCheckpointComplete, we have a risk to lost data (lost commit, and flink restart would not trigger notifyCheckpointComplete for

Re: How to stop FlinkKafkaConsumer and make job finished?

2018-01-03 Thread Aljoscha Krettek
Hi, There is a discussion about this in the Jira Issue: https://issues.apache.org/jira/browse/FLINK-7883 . See especially my last comment there. Best, Aljoscha > On 2. Jan 2018, at 17:19, Timo Walther wrote: > > Hi Arnaud, > > thanks for le

Re: does the flink sink only support bio?

2018-01-03 Thread Stefan Richter
Hi, > > Then how to implement exactly-once async io? That is, neither missing > data or duplicating data. From the docs about async IO here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

Re: Job Manager not able to fetch job info when restarted

2018-01-03 Thread Stefan Richter
Hi, did you configure high availability with Zookeeper, as described here: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html ? This should bri

Flink CEP with event time

2018-01-03 Thread shashank agarwal
Hello, I have some patterns in my program. For an example, A followedBy B. As I am using kafka source and my event API's using load balancers so sometimes B comes before A. So my CEP doesn't generate any result for those events. I have then tried event time and applied "BoundedOutOfOrdernessTi

Re: Lower Parallelism derives better latency

2018-01-03 Thread Stefan Richter
Hi, one possible explanation that I see is the following: in a shuffle, each there are input and output buffers for each parallel subtask to which data could be shuffled. Those buffers are flushed either when full or after a timeout interval. If you increase the parallelism, there are more buff

Re: does the flink sink only support bio?

2018-01-03 Thread Jinhua Luo
Then how to implement exactly-once async io? That is, neither missing data or duplicating data. Is there some way to index data by checkpoint id and records which checkpoints already commit to db? But that means we need MapState, right? However, the async-io operator normally follows other operat

Job Manager not able to fetch job info when restarted

2018-01-03 Thread Sushil Ks
Hi, We are launching Flink on Yarn job, here whenever the job manager gets restarted, the previous task that was running doesn't get restarted instead it shows no jobs running. Is there a way to resume previous running task, when job manager restarts. Regards, Sushil Ks

Re: Lower Parallelism derives better latency

2018-01-03 Thread Aljoscha Krettek
Hi, How are you measuring latency? Is it latency within a Flink Job or from Kafka to Kafka? The first seems more likely but I'm still interested in the details. Best, Aljoscha > On 3. Jan 2018, at 08:13, Netzer, Liron wrote: > > Hi group, > > We have a standalone Flink cluster that is runni

Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-03 Thread Stephan Ewen
The error is not a the missing Class "S3ErrorResponseHandler", but the initialization of that class. It could be missing classes that are statically referenced by the "S3ErrorResponseHandler". The ones I see are "XMLInputFactory" (part of Java itself, should always be there) and "org.apache.common

Re: Apache Flink - Connected Stream with different number of partitions

2018-01-03 Thread Aljoscha Krettek
Hi, The answer is correct but I'll try and elaborate a bit: the way data is sent to downstream operations depends on a couple of things in this case: - parallelism of first input operation - parallelism of second input operation - parallelism of co-operation - transmission pattern on first i

Re: Separate checkpoint directories

2018-01-03 Thread Stefan Richter
Hi, first, let my ask why you want to have a different checkpoint directory per topic? It is perfectly ok to have just a single checkpoint directory, so I wonder what the intention is? Flink will already create proper subdirectories and filenames and can identify the right checkpoint data for e

Re: Apache Flink - Question about rolling window function on KeyedStream

2018-01-03 Thread Stefan Richter
Hi, I would interpret this as: the reduce produces an output for every new reduce call, emitting the updated value. There is no need for a window because it kicks in on every single invocation. Best, Stefan > Am 31.12.2017 um 22:28 schrieb M Singh : > > Hi: > > Apache Flink documentation >

Re: does the flink sink only support bio?

2018-01-03 Thread Stefan Richter
> Am 01.01.2018 um 15:22 schrieb Jinhua Luo : > > 2017-12-08 18:25 GMT+08:00 Stefan Richter : >> You need to be a bit careful if your sink needs exactly-once semantics. In >> this case things should either be idempotent or the db must support rolling >> back changes between checkpoints, e.g. v

Re: scala 2.12 support/cross-compile

2018-01-03 Thread Aljoscha Krettek
Hi, This is the umbrella issue for Scala 2.12 support. As Stephan pointed out, the ClosureCleaner and SAMs are currently the main problems. The first is also a problem for Spark, which track their respective progress here: https://issues.apache.org/jira/browse/SPARK-14540

Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-03 Thread Nico Kruber
Hi Kyle, except for putting the jar into the lib/ folder and setting up credentials, nothing else should be required [1]. The S3ErrorResponseHandler class itself is in the jar, as you can see with jar tf flink-s3-fs-presto-1.4.0.jar | grep org.apache.flink.fs.s3presto.shaded.com.amazonaws.services

Re: JobManager not receiving resource offers from Mesos

2018-01-03 Thread Stefan Richter
Hi, did you see this exception right at the head of your log? java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set. at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:265) at org.apache.hadoop.util.Shell.(Shell.java:290) at org.apache.hadoop.util.StringU

Re: Flink on K8s job submission best practices

2018-01-03 Thread Robert Metzger
Hi, For the future, the FLIP-6 [1] work will solve the job submission problem in a nice way: You'll be able to build a docker image containing the job and the jobmanager. It's basically a jobmanager configured to only ever run this job. This way, by starting this image, you'll automatically also l

Re: Apache Flink - Connected Stream with different number of partitions

2018-01-03 Thread Timo Walther
Hi Mans, I did a quick test on my PC where I simply set breakpoints in map1 and map2 (someStream has parallelism 1, otherStream 5, my CoMapFunction 8). Elements of someStream end up in different CoMapTasks (2/8, 7/8 etc.). So I guess the distribution is a round robin partioning. @Aljoscha mig

Re: scala 2.12 support/cross-compile

2018-01-03 Thread Stephan Ewen
Hi Hao Sun! This is work in progress, but Scala 2.12 is a bit tricky. I think the Scala folks have messed this version up a bit, to be honest. The main blockers is that Scala 2.12 breaks some classes through its addition of SAM interface lambdas (similar to Java). Many of the DataStream API class

Re: Testing Flink 1.4 unable to write to s3 locally

2018-01-03 Thread Stephan Ewen
Hi Kyle! Is there more of the stack trace available, like an original exception cause? Best, Stephan On Sun, Dec 31, 2017 at 5:10 PM, Kyle Hamlin wrote: > Hi, > > When testing Flink 1.4 locally the error below keeps getting thrown. I've > followed the setup by moving the flink-s3-fs-presto.ja

Re: Two operators consuming from same stream

2018-01-03 Thread Timo Walther
Hi Tovi, I think your code without duplication performs two separate shuffle operations whereas the other code only performs one shuffle. Further latency impacts might be due to the overhead involved in maintaining the partitioning for a keyed stream/key groups and switching key contexts in