RE: Flink to get historical data from kafka between timespan t1 & t2

2021-01-12 Thread VINAY.RAICHUR
Hi Aljoscha Not sure about your proposal regarding Point 3: * firstly how is it ensured that the stream is closed? If I understand the doc correctly the stream will be established starting with the latest timestamp (hmm... is it not a standard behaviour?) and will never finish (UNBOUNDED)

Re: Idea import Flink source code

2021-01-12 Thread Matthias Pohl
Hi, you might want to move these kinds of questions into the user@flink.apache.org which is the mailing list for community support questions [1]. Coming back to your question: Is it just me or is the image not accessible? Could you provide a textual description of your problem? Best, Matthias [1

RE: Timestamp Issue with OutputTags

2021-01-12 Thread Priyanka Kalra A
Hi Till, I’m using Flink 1.11.2 version. Yes, FlinkKafkaProducer011.setWriteTimestampToKafka(true) was set and causing the issue. Thank you for your help! Regards, Priyanka From: Till Rohrmann Sent: Tuesday, January 12, 2021 3:10 PM To: Taher Koitawala Cc: Priyanka Kalra A ; user Subject

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2021-01-12 Thread Dongwon Kim
Hi Arvid, Okay, I'd better get back to Avro as you suggested! Thanks for the tips regarding Avro. Best, Dongwon On Wed, Jan 13, 2021 at 3:28 AM Arvid Heise wrote: > Do you think Scala is a better option in that regard? > > I'm not sure if scala is better in this regard. Sure you could use sea

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

2021-01-12 Thread Salva Alcántara
Hi Yang, Many thanks for the summary of the different options. For now, as I mentioned, I am interested in the simplest approach since my purpose is to run some smoke (e2e) tests. It is not entirely clear to me how to run flink using option 1. I'm using the official scala template now (https://git

Dynamically send SQL statements to running flink app

2021-01-12 Thread Ahmad Alkilani
Is it possible to dynamically, as the flink application is running, inject new SQL to be executed against a stream? Thank you! >

Re: Simplest way to deploy flink job on k8s for e2e testing purposes

2021-01-12 Thread Yang Wang
Hi Salva, I think we could have the following options to make Flink application run on a Kubernetes cluster. 1. Local cluster This is what you have in mind. Flink now is really like a common java application, which you could start easily. 2. Standalone cluster on K8s By applying some yaml files,

Re: Configuring Elasticsearch Timeouts

2021-01-12 Thread Rex Fenley
For further clarity, we're on Elasticsearch 7. On Tue, Jan 12, 2021 at 4:53 PM Rex Fenley wrote: > Correction, by HTTP timeout I mean BulkProcessor timeout. > > On Tue, Jan 12, 2021 at 4:40 PM Rex Fenley wrote: > >> Hello, >> >> For the past number of days we've been investigating continuous >>

Re: Configuring Elasticsearch Timeouts

2021-01-12 Thread Rex Fenley
Correction, by HTTP timeout I mean BulkProcessor timeout. On Tue, Jan 12, 2021 at 4:40 PM Rex Fenley wrote: > Hello, > > For the past number of days we've been investigating continuous > SocketTimeoutException like the following: > > 2021-01-12 20:53:42,105 DEBUG org.elasticsearch.client.RestCli

Configuring Elasticsearch Timeouts

2021-01-12 Thread Rex Fenley
Hello, For the past number of days we've been investigating continuous SocketTimeoutException like the following: 2021-01-12 20:53:42,105 DEBUG org.elasticsearch.client.RestClient - request [POST :/_bulk?timeout=1m ] failed

Re: Testing Flink Jobs

2021-01-12 Thread Chesnay Schepler
In that case you will have to query the REST API instead; you can retrieve the address via MiniCluster#getRestAddress. Something along these should work: try (final RestClient restClient = new RestClient( RestClientConfiguration.fromConfiguration(new Configuration()), E

Dead code in ES Sink

2021-01-12 Thread Rex Fenley
Hi, I was looking through ES options trying to diagnose a SocketTimeOut we're receiving on the ES TableAPI connector. A bug report on elasticsearch's github[1] indicated I might want to set max retry timeout really high, but from what I can tell it's not even consumed anywhere [2]. [1] https://gi

Re: Testing Flink Jobs

2021-01-12 Thread KristoffSC
Hi, that helped however there is a problem with JobStatus. Please refer to [1] In my case JobStatus is already Running but not all task are running. Any idea how to get task status from MiniCluster? [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issue-with-job-status-td

Simplest way to deploy flink job on k8s for e2e testing purposes

2021-01-12 Thread Salva Alcántara
I would like to deploy flink on a local cluster built with KIND for the purposes of e2e testing. The flink app is one of the components running within the system, which consists of other components (mostly written in Golang). I was wondering what would be the simplest way for me to deploy the flink

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2021-01-12 Thread Arvid Heise
> > Do you think Scala is a better option in that regard? I'm not sure if scala is better in this regard. Sure you could use sealed classes but I don't know if the schema inference is using it. Maybe @Timo Walther knows more? I used to define Avro records in .avro files and generate Java classes

Re: Getting Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler

2021-01-12 Thread Arvid Heise
Can you post the full dependencies of sbt/maven/gradle whatever? On Tue, Jan 12, 2021 at 3:54 AM Avi Levi wrote: > Hi Arvid, > using : > > flinkVersion = '1.12.0' > scalaBinaryVersion = '2.12' > > I simplified the example to (same exception) : > > object Flinktest extends App { > private val

Re: Stateful Functions: Dynamically define and load remote modules

2021-01-12 Thread Ahmad Alkilani
That's awesome, thank you! JIRA I can follow? On Tue, Jan 12, 2021 at 9:01 AM Igal Shilman wrote: > Hello! > > Is it possible to dynamically associate Remote Modules with Remote >> Function Endpoints to an existing/already running Flink stateful >> application? >> > > Yes, in the upcoming StateF

Re: Log length

2021-01-12 Thread Rex Fenley
Got it. Thanks! On Tue, Jan 12, 2021 at 2:12 AM Chesnay Schepler wrote: > A normal FileAppender that does not do any rolling or limiting of the log > file. > > On 1/12/2021 3:09 AM, Rex Fenley wrote: > > Thanks, I'll check them out. What's the default in 1.11.2? > > On Mon, Jan 11, 2021 at 4:26

Re: How does at least once checkpointing work

2021-01-12 Thread Rex Fenley
Thanks! On Tue, Jan 12, 2021 at 1:56 AM Yuan Mei wrote: > >> It sounds like any state which does not have some form of uniqueness >> could end up being incorrect. >> >> at least once usually works if the use case can tolerate a certain level > of duplication or the computation is idempotent. > >

Re: Stateful Functions: Dynamically define and load remote modules

2021-01-12 Thread Igal Shilman
Hello! Is it possible to dynamically associate Remote Modules with Remote Function > Endpoints to an existing/already running Flink stateful application? > Yes, in the upcoming StateFun release we are introducing exactly that :) As of the upcoming version, we are adding a capability to dynamicall

Enrich stream with SQL api

2021-01-12 Thread Marek Maj
Hello, I am trying to use Flink SQL api to join two tables. My stream data source is kafka (defined through catalog and schema registry) and my enrichment data is located in relational database (JDBC connector). I think this setup reflects quite common use case Enrichment table definition looks li

Re: FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread Timo Walther
See here: https://issues.apache.org/jira/browse/FLINK-20942 On 12.01.21 16:04, Timo Walther wrote: Hi, it seems this is a bug that is located in the Apache Calcite code. I will open an issue for it. Thanks for reporting this. Regards, Timo On 12.01.21 11:08, jy l wrote: Hi: Flink SQL filt

Re: FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread Timo Walther
Hi, it seems this is a bug that is located in the Apache Calcite code. I will open an issue for it. Thanks for reporting this. Regards, Timo On 12.01.21 11:08, jy l wrote: Hi: Flink SQL filter data throw an exception, code: def main(args: Array[String]): Unit = {     val env = StreamExecut

PyFlink Table API and connecting streams with SQL

2021-01-12 Thread meneldor
Hello, I'm a beginner in Flink and after trying to solve my problems for several days i decided to ask in the list. My goal is to connect two kafka topics which have a common ID field then produce the enriched object to a third topic based on a Tumble Window because the result has to be applied in

Re: Metrics for average time taken by flatMap function

2021-01-12 Thread Manish G
Prometheus provides avg_over_time for a range vector. That seems to be better suited for this usecase. On Tue, Jan 12, 2021 at 6:53 PM Chesnay Schepler wrote: > The cumulative time probably isn't that useful to detect changes in the > behavior of the application. > > On 1/12/2021 12:30 PM, Chesn

Re: Metrics for average time taken by flatMap function

2021-01-12 Thread Chesnay Schepler
The cumulative time probably isn't that useful to detect changes in the behavior of the application. On 1/12/2021 12:30 PM, Chesnay Schepler wrote: I mean the difference itself, not cumulative. On 1/12/2021 12:08 PM, Manish G wrote: Can you elaborate the second approach more? Currently I am e

Re: Flink kafka exceptions handling

2021-01-12 Thread Aljoscha Krettek
On 2021/01/07 14:36, BELGHITH Amira (EXT) wrote: --> Our processing System is supposed to continue streaming data even though there is some Kafka errors, we are expecting that the KafkaConsumer fails but not the Flink job, do you think it is possible? I'm afraid that's not possible with Flink

Re: Metrics for average time taken by flatMap function

2021-01-12 Thread Chesnay Schepler
I mean the difference itself, not cumulative. On 1/12/2021 12:08 PM, Manish G wrote: Can you elaborate the second approach more? Currently I am exposing the difference itself. OR do you mean the cumulative difference?ie I maintain a member variable, say timeSoFar, and update it with time consu

Re: Metrics for average time taken by flatMap function

2021-01-12 Thread Manish G
Can you elaborate the second approach more? Currently I am exposing the difference itself. OR do you mean the cumulative difference?ie I maintain a member variable, say timeSoFar, and update it with time consumed by each method call and then expose it. Something like this: timeSoFar += timeConsume

RE: Flink to get historical data from kafka between timespan t1 & t2

2021-01-12 Thread VINAY.RAICHUR
Hi Team & Aljoscha Any update on below please? -Original Message- From: RAICHUR, VINAY Sent: 11 January 2021 19:43 To: 'Aljoscha Krettek' ; user@flink.apache.org Subject: RE: Flink to get historical data from kafka between timespan t1 & t2 Importance: High Hi Aljoscha, Currently in our

Re: Metrics for average time taken by flatMap function

2021-01-12 Thread Chesnay Schepler
That approach will generally not work for jobs that run for a long time, because it will be nigh impossible for anomalies to affect the average. You want to look into exponential moving averages. Alternatively, just expose the diff as an absolute value and calculate the average in prometheus.

Re: Metrics for average time taken by flatMap function

2021-01-12 Thread Manish G
OK, got it. So I would need to accumulate the time value over the calls as well as number of times it is called...and then calculate average(accumulated time/ number of times called) and then set calculated value into gauge as above. On Tue, Jan 12, 2021 at 4:12 PM Chesnay Schepler wrote: > A ga

Re: Metrics for average time taken by flatMap function

2021-01-12 Thread Chesnay Schepler
A gauge just returns a value, and Flink exposes it as is. As such you need to calculate the average over time yourself, taking 2 time measurements (before and after the processing of each). On 1/12/2021 11:31 AM, Manish G wrote: startTime is set at start of function: long startTime = System.c

Re: Metrics for average time taken by flatMap function

2021-01-12 Thread Manish G
startTime is set at start of function: long startTime = System.currentTimeMillis(); On Tue, Jan 12, 2021 at 3:59 PM Manish G wrote: > My code is: > > public class SimpleGauge implements Gauge { > > private T mValue; > > @Override > public T getValue() { > return mValue; >

Re: Metrics for average time taken by flatMap function

2021-01-12 Thread Manish G
My code is: public class SimpleGauge implements Gauge { private T mValue; @Override public T getValue() { return mValue; } public void setValue(T value){ mValue = value; } } And in flatmap function: float endTime = (System.currentTimeMillis() - startTim

Re: Metrics for average time taken by flatMap function

2021-01-12 Thread Chesnay Schepler
Sure, that might work. Be aware though that time measurements are, compared to the logic within a function, usually rather expensive and may impact performance. On 1/12/2021 10:57 AM, Manish G wrote: Hi All, I have implemented a flatmap function and I want to collect metrics for average time

Re: state reset(lost) on TM recovery

2021-01-12 Thread Chesnay Schepler
Are the hashes of these object equal as well? On 1/12/2021 3:59 AM, Alexey Trenikhun wrote: Hello, Yes, I'm aware, and I used elements with same key, and logged getCurrentKey() to ensure that key is same, but you are right in terms that it is scope related, the key is protobuf object and I sp

Re: Flink to get historical data from kafka between timespan t1 & t2

2021-01-12 Thread Aljoscha Krettek
On 2021/01/11 14:12, vinay.raic...@t-systems.com wrote: a) As mentioned by you "KafkaSource" was introduced in Flink 1.12 so, I suppose we have to upgrade to this version of Flink. Can you share the link of the stable Flink image (containerized version) to be used in our set-up keeping in mind

Re: How to gracefully avoid "Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type"?

2021-01-12 Thread Dongwon Kim
Hi Arvid, Thanks for the very detailed explanation and tips! inferring the type information of Java classes is quite messy. At first, it > seems like that should work out the box as you are only using as the > type of the list, right? However, there is no way of knowing if you didn't > use a sub

Re: Log length

2021-01-12 Thread Chesnay Schepler
A normal FileAppender that does not do any rolling or limiting of the log file. On 1/12/2021 3:09 AM, Rex Fenley wrote: Thanks, I'll check them out. What's the default in 1.11.2? On Mon, Jan 11, 2021 at 4:26 PM Chesnay Schepler > wrote: Have a look at RollingFi

FlinkSQL Filter Error With Float Column on flink-1.12.0

2021-01-12 Thread jy l
Hi: Flink SQL filter data throw an exception, code: def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val source = env.fromElements( (1.0f, 11.0f, 12.0f), (2.0f, 21.0f, 22.0f), (3.0f, 31.0f, 32.0f), (4.0f, 41.0f, 42.

Re: How should I process a cumulative counter?

2021-01-12 Thread Aljoscha Krettek
Hi Larry, By now, it seems to me that the windowing API might not be the right solution for your use case. The fact that sensors can shut down arbitrarily makes it hard to calculate what window an event should fall into. Have you tried looking into `ProcessFunction`? With this you can keep

Metrics for average time taken by flatMap function

2021-01-12 Thread Manish G
Hi All, I have implemented a flatmap function and I want to collect metrics for average time for this function which I plan to monitor via prometheus. What would be good approach for it? I have added a gauge to the method(extending Gauge interface from flink API). Would it work for my needs?

Re: How does at least once checkpointing work

2021-01-12 Thread Yuan Mei
> > > It sounds like any state which does not have some form of uniqueness could > end up being incorrect. > > at least once usually works if the use case can tolerate a certain level of duplication or the computation is idempotent. > Specifically in my case, all rows passing through the executio

Re: Main class logs in Yarn Mode

2021-01-12 Thread Yangze Guo
I think you can try the application mode[1]. [1] https://ci.apache.org/projects/flink/flink-docs-master/deployment/#application-mode Best, Yangze Guo On Tue, Jan 12, 2021 at 5:23 PM bat man wrote: > > Thanks Yangze Gua. > Is there a way these can be redirected to a yarn logs. > > On Tue, 12 Ja

Re: Timestamp Issue with OutputTags

2021-01-12 Thread Till Rohrmann
Hi Priyanka, Could you tell us which Flink version you are using? Moreover, seeing the complete Flink job could be helpful. The only explanation I have at the moment is that you might have set FlinkKafkaProducer011.setWriteTimestampToKafka(true). If this is true then you have to set the TimeCharac

AW: Statefun with RabbitMQ consumes message but does not run statefun

2021-01-12 Thread Stephan Pelikan
> currently targeting mid February for the next major StateFun release Thank you Gordon for sharing this information. As I could now see in the Flink Blog you bring out new versions frequently. I’m a newbie to Flink and I supposed a match of the docs of Statefun and Flink. But of course it is no

Re: Main class logs in Yarn Mode

2021-01-12 Thread bat man
Thanks Yangze Gua. Is there a way these can be redirected to a yarn logs. On Tue, 12 Jan 2021 at 2:35 PM, Yangze Guo wrote: > The main function of your WordCountExample is executed in your local > environment. So, the logs you are looking for ("Entering > application.") are be located in your co

Re: Statefun with RabbitMQ consumes message but does not run statefun

2021-01-12 Thread Tzu-Li (Gordon) Tai
Hi, There is no lock-step of releasing a new StateFun release when a new Flink release goes out. StateFun and Flink have individual releasing schemes and schedules. Usually, for new major StateFun version releases, we will upgrade its Flink dependency to the latest available version. We are curre

Re: Main class logs in Yarn Mode

2021-01-12 Thread Yangze Guo
The main function of your WordCountExample is executed in your local environment. So, the logs you are looking for ("Entering application.") are be located in your console output and the "log/" directory of your Flink distribution. Best, Yangze Guo On Tue, Jan 12, 2021 at 4:50 PM bat man wrote:

Main class logs in Yarn Mode

2021-01-12 Thread bat man
Hi, I am running a sample job as below - public class WordCountExample { static Logger logger = LoggerFactory.getLogger(WordCountExample.class); public static void main(String[] args) throws Exception { final ExecutionEnvironment env = ExecutionEnvironment. getExecutionEnvironment(); logger.inf