Re: Flink AutoScaling EMR

2020-11-12 Thread Robert Metzger
Hi, it seems that YARN has a feature for targeting specific hardware: https://hadoop.apache.org/docs/r3.1.0/hadoop-yarn/hadoop-yarn-site/PlacementConstraints.html In any case, you'll need enough spare resources for some time to be able to run your job twice for this kind of "zero downtime handover"

Re: Adding keyed state to test harness before calling process function.

2020-11-12 Thread Marco Villalobos
Thank you.. I looked into that, but that does not initialize any values in keyed state, instead, it using key state, and lines 407-412 show that is not setting key state values in advanced, handling null values when it is not set in advance. public void processElement(String value, Context ctx, Co

Re: Adding keyed state to test harness before calling process function.

2020-11-12 Thread Guowei Ma
Hi, Macro I think you could look at testScalingUp() at flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java Best, Guowei On Fri, Nov 13, 2020 at 10:36 AM Marco Villalobos wrote: > Hi, > > I would like

Adding keyed state to test harness before calling process function.

2020-11-12 Thread Marco Villalobos
Hi, I would like to adding keyed state to test harness before calling process function. I am using the OneInputStreamOperatorTestHarness. I can't find any examples online on how to do that, and I am struggling to figure this out. Can somebody please provide guidance? My test case has keyed s

Re: Job is still in ACTIVE state after /jobs/:jobid/stop

2020-11-12 Thread Averell
I have some updates. Some weird behaviours were found. Please refer to the attached photo. All requests were sent via REST API The status of the savepoint triggered by that stop request (ID 11018) is "COMPLETED [Savepoint]", however, no checkpoint data has been persisted (in S3). The folder /`sav

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-12 Thread fuyao . li
The test workflow attachment is not added in the previous email, sorry for the confusion, please refer to the describe text workflow.. Thanks. On 11/12/20 16:17, fuyao...@oracle.com wrote: Hi All, Just to add a little more context to the problem. I have a full outer join operation before th

Re: BoundedOutOfOrderness Watermark Generator is NOT making the event time to advance

2020-11-12 Thread fuyao . li
Hi All, Just to add a little more context to the problem. I have a full outer join operation before this stage. The source data stream for full outer join is a Kafka Source. I also added timestamp and watermarks to the FlinkKafkaConsumer. After that, it makes no difference to the result, stil

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
I'm now trying with a MATCH_RECOGNIZE: SELECT * FROM customers MATCH_RECOGNIZE ( PARTITION BY client_number ORDER BY proctime() MEASURES LAST(B.client_number) as client_number, LAST(B.address) as address PATTERN (A* B) DEFINE B AS LAST(A.address, 1) is NULL OR B.address <> LAST(A.address, 1) ) as

Re: Logs of JobExecutionListener

2020-11-12 Thread Flavio Pompermaier
Actually what I'm experiencing is that the JobListener is executed successfully if I run my main class from the IDE, while the job listener is not fired at all if I submit the JobGraph of the application to a cluster using the RestClusterClient.. Am I doing something wrong? My main class ends with

Re: DataStream.connect semantics for Flink SQL / Table API

2020-11-12 Thread Yuval Itzchakov
Hi Aljoscha, You're right, I had a misunderstanding about how unions without window operations work. Thanks! On Thu, Nov 12, 2020, 18:37 Aljoscha Krettek wrote: > Hi, > > I think if you don't do any operations that are sensitive to event-time > then just using a UNION/UNION ALL should work be

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-12 Thread Simone Cavallarin
Hi Aljoscha, Yes correct i would like to have more windows when there are more events for a given time frame. That is when the events are more dense in time. I can calculate the time difference between each event and create a parameter that can create windows of different sizes dynamically base

Re: Flink AutoScaling EMR

2020-11-12 Thread Rex Fenley
Awesome, thanks! Is there a way to make the new yarn job only on the new hardware? Or would the two jobs have to run on intersecting hardware and then would be switched on/off, which means we'll need a buffer of resources for our orchestration? Also, good point on recovery. I'll spend some time lo

Re: DataStream.connect semantics for Flink SQL / Table API

2020-11-12 Thread Aljoscha Krettek
Hi, I think if you don't do any operations that are sensitive to event-time then just using a UNION/UNION ALL should work because then there won't be any buffering by event time which could delay your output. Have you tried this and have you seen an actual delay in your output? Best, Aljosch

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-12 Thread Aljoscha Krettek
Hi, I'm not sure that what you want is possible. You say you want more windows when there are more events for a given time frame? That is when the events are more dense in time? Also, using the event timestamp as the gap doesn't look correct. The gap basically specifies the timeout for a ses

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi Arvid, Thank you so much for your detailed reply. I think I will go with one schema per topic using GenericRecordAvroTypeInfo for genericRecords and not do any custom magic. Approach of sending records as byte array also seems quite interesting. Right now I am deserializing avro records so

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
I see what was my mistake: I was using a field in my ORDER BY, while it only support proctime() for now. That allows me to create an append only stream, thanks a lot! However, it still does not allow me to do what I need: *If I use both my primary key and changing column in PARTITION BY, then i

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
You need to differentiate two serialization abstractions (which I guess you already know). One is coming from reading the source, where the DeserializationSchema is used, and it translates the bytes of Kafka into something that Flink can handle. The second serialization occurs within Flink through

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
So in this case, flink will fall back to default kyro serialiser right ? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-12 Thread Arvid Heise
Hi Tim, afaik we are confusing two things here, there is a transaction timeout = how long the transaction lasts until aborted. And what you see here is some timeout while creating the transaction in the first place. A quick google search turned up [1], from which I'd infer that you need to set TR

How to use EventTimeSessionWindows.withDynamicGap()

2020-11-12 Thread Simone Cavallarin
Hi All, I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of th

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-12 Thread Tim Josefsson
To further add to this problem, I've now got our ops team to set transaction.max.timeout.ms on our Kafka brokers to 1 hour (as suggested by the Flink docs). However the problem persists and I'm still getting the same error message. I've confirmed that this config setting is actually set on the Kafk

Data loss exception using hash join in batch mode

2020-11-12 Thread
Data loss exception using hash join in batch mode

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-12 Thread Tim Josefsson
Also realized I had a typo in the config dump I did in the previous email (the one from the 10th). If I don't do Properties producerProps = new Properties(); producerProps.setProperty("transaction.timeout.ms", "90"); Then the value reported from the ProducerConfig is 360 and not 6 as I

Logs of JobExecutionListener

2020-11-12 Thread Flavio Pompermaier
Hello everybody, I'm trying to use the JobListener to track when a job finishes (with Flink 1.11.0). It works great but I have the problem that logs inside the onJobExecuted are not logged anywhere..is it normal? Best, Flavio

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
If you follow the best practices, then topics should never have different schemas as you can't enforce schema compatibility. You also have very limited processing capabilities and clumsy workflows attached to it. If you want to encode different kinds of events, then the common approach is to use so

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi Arvid, Thanks a lot for your reply. And yes, we do use confluent schema registry extensively. But the `ConfluentRegistryAvroDeserializationSchema` expects reader schema to be provided. That means it reads the message using writer schema and converts to reader schema. But this is not what I want

Re: Flink-Kafka exactly once errors even with transaction.timeout.ms set

2020-11-12 Thread Tim Josefsson
Sure, I've attached it to this email. The process seems to restart once the TimeoutException happens so it's repeated a couple of times. Thanks for looking at it! /Tim On Wed, 11 Nov 2020 at 10:37, Aljoscha Krettek wrote: > Hmm, could you please post the full stack trace that leads to the > Ti

Job is still in ACTIVE state after /jobs/:jobid/stop

2020-11-12 Thread Averell
Hi, I'm on 1.11.0, with a streaming job running on a YARN session, reading from Kinesis. I tried to stop the job using REST, with "drain=false". After that POST request, I got back a request_id (not sure how should I use that for). Checked the job in GUI, I could see that a savepoint has been com

DataStream.connect semantics for Flink SQL / Table API

2020-11-12 Thread Yuval Itzchakov
Hi, I want to create an abstraction over N source tables (streams), and unify them into 1 table. I know UNION and UNION ALL exist, but I'm looking for DataStream.connect like semantics in regards to watermark control. I don't want to take the minimum watermark over all N streams, as I know for sur

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-12 Thread Felipe Gutierrez
I see. now it has different query plans. It was documented on another page so I got confused. Thanks! -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Thu, Nov 12, 2020 at 12:41 PM Jark Wu wrote: > > Hi Felipe, > > The default value of `table.optimiz

Re: PyFlink Table API and UDF Limitations

2020-11-12 Thread Dian Fu
Hi Niklas, Python DataStream API will also be supported in coming release of 1.12.0 [1]. However, the functionalities are still limited for the time being compared to the Java DataStream API, e.g. it will only support the stateless operations, such as map, flat_map, etc. [1] https://ci.apache

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Jark Wu
Hi Laurent, 1. Currently, it's impossible to convert deduplicate with last row into an append-only stream. 2. Yes, I think Ververica platform doesn't support 'changelog-json' format natively. However, regarding your case, I think you can use keep first row on client_number+address key. SELECT *

Re: PyFlink Table API and UDF Limitations

2020-11-12 Thread Niklas Wilcke
Hi Dian, thank you very much for this valuable response. I already read about the UDAF, but I wasn't aware of the fact that it is possible to return and UNNEST an array. I will definitely have a try and hopefully this will solve my issue. Another question that came up to my mind is whether PyFl

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-12 Thread Jark Wu
Hi Felipe, The default value of `table.optimizer.agg-phase-strategy` is AUTO, if mini-batch is enabled, if will use TWO-PHASE, otherwise ONE-PHASE. https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy On Thu, 12 Nov 2020 at 17:52, Felipe

Re: Password usage in ssl configuration

2020-11-12 Thread Nico Kruber
Hi Suchithra, I'm not sure you can actually pass passwords in any other way. I'm also not sure this is needed if these are job-/cluster-specific because then, an attacker would have to have access to that first in order to get these credentials. And if the attacker has access to the job/cluster,

Re: Long blocking call in UserFunction triggers HA leader lost?

2020-11-12 Thread Maxim Parkachov
Hi Theo, We had a very similar problem with one of our spark streaming jobs. Best solution was to create a custom source having all external records in cache, periodically reading external data and comparing it to cache. All changed records were then broadcasted to task managers. We tried to imple

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Arvid Heise
The common solution is to use a schema registry, like Confluent schema registry [1]. All records have a small 5 byte prefix that identifies the schema and that gets fetched by deserializer [2]. Here are some resources on how to properly secure communication if needed [3]. [1] https://docs.confluen

Re: Flink 1.8.3 GC issues

2020-11-12 Thread Aljoscha Krettek
Created an issue for this: https://issues.apache.org/jira/browse/BEAM-11251 On 11.11.20 19:09, Aljoscha Krettek wrote: Hi, nice work on debugging this! We need the synchronized block in the source because the call to reader.advance() (via the invoker) and reader.getCurrent() (via emitElement

Re: Stream aggregation using Flink Table API (Blink plan)

2020-11-12 Thread Felipe Gutierrez
Hi Jack, I don't get the difference from the "MiniBatch Aggregation" if compared with the "Local-Global Aggregation". On the web page [1] it says that I have to enable the TWO_PHASE parameter. So I compared the query plan from both, with and without the TWO_PHASE parameter. And they are the same.

Re: FlinkSQL kafka->dedup->kafka

2020-11-12 Thread Laurent Exsteens
Hi Jark, thanks again for your quick response! I tried multiple variants of my query by: - specifying only the primary key in the PARTITION BY clause - changing the order to DESC to keep the last row --> I unfortunately always get the same error message. If I try to make a simple select on the r

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread ashwinkonale
Hi, Thanks a lot for the reply. And you both are right. Serializing GenericRecord without specifying schema was indeed a HUGE bottleneck in my app. I got to know it through jfr analysis and then read the blog post you mentioned. Now I am able to pump in lot more data per second. (In my test setup a

Re: Help needed to increase throughput of simple flink app

2020-11-12 Thread Robert Metzger
Hi, from my experience serialization contributes a lot to the maximum achievable throughput. I can strongly recommend checking out this blog post, which has a lot of details on the topic: https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html On Tue, Nov 10, 2020 at 9:46 AM

Re: Job crash in job cluster mode

2020-11-12 Thread Robert Metzger
Hey Tim, what Is your Flink job doing? Is it restarting from time to time? Is the JobManager crashing, or the TaskManager? On Tue, Nov 10, 2020 at 6:01 PM Matthias Pohl wrote: > Hi Tim, > I'm not aware of any memory-related issues being related to the deployment > mode used. Have you checked th

Re: why not flink delete the checkpoint directory recursively?

2020-11-12 Thread Robert Metzger
Hey Josh, As far as I understand the code CompletedCheckpoint.discard(), Flink is removing all the files in StateUtil.bestEffortDiscardAllStateObjects, then deleting the directory. Which files are left over in your case? Do you see any exceptions on the TaskManagers? Best, Robert On Wed, Nov 11

Re: SSL setup for YARN deployment when hostnames are unknown.

2020-11-12 Thread Robert Metzger
Hi Jiahui, using the yarn.container-start-command-template is indeed a good idea. I was also wondering whether the Flink YARN client that submits the Flink cluster to YARN has knowledge of the host where the ApplicationMaster gets deployed to. But that doesn't seem to be the case. On Wed, Nov 11