Re: Best practice for packaging and deploying Flink jobs on K8S

2021-04-29 Thread Till Rohrmann
Hi Sumeet, Is there a problem with the documented approaches on how to submit the Python program (not working) or are you asking in general? Given the documentation, I would assume that you can configure the requirements.txt via `set_python_requirements`. I am also pulling in Dian who might be ab

Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-29 Thread Till Rohrmann
Hi Lars, The KafkaSourceBuilder constructs the new KafkaSource which has not been fully hardened in 1.12.2. In fact, it should not be documented yet. I think you are running into an instability/bug of. The new Kafka source should be hardened a lot more in the 1.13.0 release. Could you tell us exa

Re: Best practice for packaging and deploying Flink jobs on K8S

2021-04-29 Thread Sumeet Malhotra
Hi Till, There’s no problem with the documented approach. I was looking if there were any standardized ways of organizing, packaging and deploying Python code on a Flink cluster. Thanks, Sumeet On Thu, Apr 29, 2021 at 12:37 PM Till Rohrmann wrote: > Hi Sumeet, > > Is there a problem with the

Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Till Rohrmann
Hi Yegor, If you want to use Flink's keyed windowing logic, then you need to insert a keyBy/shuffle operation because Flink currently cannot simply use the partitioning of the Kinesis shards. The reason is that Flink needs to group the keys into the correct key groups in order to support rescaling

Re: Flink Resuming From Checkpoint With "-s" FAILURE

2021-04-29 Thread Till Rohrmann
Hi Zachary, How did you configure the Kafka connector to commit the offsets (periodically, on checkpoint)? One explanation for the graphs you showed is that you enabled periodic committing of the offsets. If this automatic commit happens between two checkpoints and you later fall back to the earli

Re: How to implement a window that emits events at regular intervals and on specific events

2021-04-29 Thread Till Rohrmann
Hi Tim, I think you could use Flink's trigger API [1] to implement a trigger which fires when it sees a certain event or after some time. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#triggers Cheers, Till On Wed, Apr 28, 2021 at 5:25 PM Tim Josefs

Re: Queryable State unavailable after Kubernetes HA State cleanup

2021-04-29 Thread Till Rohrmann
Hi Sandeep, I don't fully understand the problematic scenario yet. What exactly is the HA state maintained by Kubernetes in S3? Queryable state works by asking for the current state of an operator. If you use asQueryableState, then you create a reducing state which appends all stream elements. Th

Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-29 Thread Till Rohrmann
Hi Lars, I think this is a duplicate message. Let's continue the discussion on your original message. Cheers, Till On Wed, Apr 28, 2021 at 8:50 PM Lars Skjærven wrote: > Hello, > I ran into an issue when using the new KafkaSourceBuilder (running Flink > 1.12.2, scala 2.12.13, on ververica plat

Re: Taskmanager killed often after migrating to flink 1.12

2021-04-29 Thread Till Rohrmann
Great, thanks for the update. On Wed, Apr 28, 2021 at 7:08 PM Sambaran wrote: > Hi Till, > > Thank you for the response, we are currently running flink with an > increased memory usage, so far the taskmanager is working fine, we will > check if there is any further issue and will update you. > >

Re: Best practice for packaging and deploying Flink jobs on K8S

2021-04-29 Thread Till Rohrmann
Alright. Then let's see what Dian recommends to do. Cheers, Till On Thu, Apr 29, 2021 at 9:25 AM Sumeet Malhotra wrote: > Hi Till, > > There’s no problem with the documented approach. I was looking if there > were any standardized ways of organizing, packaging and deploying Python > code on a F

Re: Best practice for packaging and deploying Flink jobs on K8S

2021-04-29 Thread Dian Fu
Hi Sumeet, For the Python dependencies, multiple ways have been provided to specify them and you could take either way of them. Regarding to requirements.txt, there are 3 ways provided and you could specify it via either of them: - API inside the code: set_python_requirements - command line opt

Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Raghavendar T S
Hi Yegor The trigger implementation in Flink does not support trigger by event count and duration together. You can update the existing CountTrigger implementation to support your functionality. You can use the CustomTrigger.java (minor enhancement of CountTrigger) as such which I have attached i

Blocking in an Async function

2021-04-29 Thread Olivier Nouguier
Hi, We have a rather classical situation where we use an AsyncRich function to read from Cassandra (C*) and enrich some domain elements. It works great, but some time, C* is not reachable, how could we handle this in async function ? 0/ Let it crash && restart. 1/ Thread.sleep() and retry see

Re: Blocking in an Async function

2021-04-29 Thread Arian Rohani
Is this your own implementation of the AsyncRich function? There is already an API available for Flink for async I/O for external data access that solves some of the challenges that you seem to be encountering. https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.htm

Re: How to implement a window that emits events at regular intervals and on specific events

2021-04-29 Thread Tim Josefsson
Thanks for the suggestions! I'll see if I can implement something that works! A followup question, more related to state. If I implement either the custom trigger with or the process function, how will they handle crashes and such. So if I for instance have a checkpointing interval of 10s will the

Re: Blocking in an Async function

2021-04-29 Thread Arvid Heise
Hi Oliver, if you are truly running async in asyncIO (remember you need an external thread pool provided by an async lib or by yourself), then option 1) sounds best to me. Could you elaborate why you'd think it's terrible? The only downside is that you'd need to disable timeout in asyncIO and hand

Re: Stateful Functions, Kinesis, and ConsumerConfigConstants

2021-04-29 Thread Tzu-Li (Gordon) Tai
Hi Ammon, Unfortunately you're right. I think the Flink Kinesis Consumer specific configs, e.g. keys in the ConsumerConfigConstants class, were overlooked in the initial design. One way to workaround this is to use the `SourceFunctionSpec` [1]. Using that spec, you can use any Flink SourceFunctio

Flink Checkpoint for Stateless Operators

2021-04-29 Thread Raghavendar T S
Hi Team Assume that we have a job (Checkpoint enabled) with Kafka source and a stateless operator which consumes events from Kafka source. We have a stream of records 1, 2, 3, 4, 5,...Let's say the event 1 reaches the Flat Map operator and is being processed. Then the Kafka source has made a succe

Re: Flink missing Kafka records

2021-04-29 Thread Arvid Heise
Hi Dan, could you check which records are missing? I'm suspecting it could be records that are emitted right before roll over of the bucket strategy from an otherwise idling partition. If so it could be indeed connected to idleness. Idleness tells Flink to not wait on the particular partition to

Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-29 Thread Lars Skjærven
Thanks Till. Here is how we created the KafkaSource: val sensorSource = KafkaSource.builder[SensorInput]() .setBootstrapServers(myConfig.kafkaBrokers) .setGroupId(myConfig.kafkaGroupId) .setTopics(myConfig.kafkaTopicIn) .setDeserializer(new SensorInputPBDeserialization

Re: [Stateful Functions] Help for calling remote stateful function (written in Python)

2021-04-29 Thread Igal Shilman
Hi Bonino, What you are experiencing is "expected" the situation is that the finite streaming job is completing too fast. StateFun is designed to run continuously, and fault tolerance and corrections is achieved by checkpointing its internal state into a durable storage. You can verify this by sim

Re: Flink Event specific window

2021-04-29 Thread Arvid Heise
Hi Swagat, 1. Where the data primarily resides depends on the chosen state backend [1]. In most cases, it's written to some file with a memory cache. It's possible to query the state [2] but not with SQL. In fact, it's so basic that we decided to drop the feature in the future to make room for a m

Re: Flink Checkpoint for Stateless Operators

2021-04-29 Thread Roman Khachatryan
Hi Raghavendar, In Flink, checkpoints are global, meaning that a checkpoint is successful only if all operators acknowledge it. So the offset will be stored in state and then committed to Kafka [1] only after all the tasks acknowledge that checkpoint. At that moment, the element must be either emi

Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Yegor Roganov
Hi Till, thank you for your reply. > What you can do, though, is to create a custom operator or use a flatMap to build your own windowing operator. Since my stream wouldn't be keyed, does this mean that I would need to use "Managed Operator State" (aka raw state)? On Thu, Apr 29, 2021 at 10:34 AM

Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Yegor Roganov
Hi Raghavendar, thank you for your reply. > stream.timeWindow(Time.seconds(10)).trigger(CustomTrigger.of(3)).apply(new TestWindow()); What would this stream be keyed on? On Thu, Apr 29, 2021 at 11:58 AM Raghavendar T S wrote: > Hi Yegor > > The trigger implementation in Flink does not support

Re: Flink Checkpoint for Stateless Operators

2021-04-29 Thread Raghavendar T S
Hi Roman In general, how Flink tracks the events from source to downstream operators? We usually emit existing events from an operator or create a new instance of a class and emit it. How does Flink or Flink source know whether the events are which snapshot? > So you don't need to re-process it m

[ANNOUNCE] Apache Flink 1.12.3 released

2021-04-29 Thread Arvid Heise
Dear all, The Apache Flink community is very happy to announce the release of Apache Flink 1.12.3, which is the third bugfix release for the Apache Flink 1.12 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data

Re: Flink Checkpoint for Stateless Operators

2021-04-29 Thread Roman Khachatryan
Flink uses checkpoint barriers that are sent through along the same channels as data. Events are included into the checkpoint if they precede the corresponding barrier (or the RPC call for sources). [1] is the algorithm description and [2] is about integration with Kafka. > In my example, I have o

Re: How to verify what maxParallelism is set to?

2021-04-29 Thread Bob Tiernay
I agree that a way to introspect the effective current value would be a great observability tool for sanity checking. Fabian, do you know if a ticket was ever created? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Flink Checkpoint for Stateless Operators

2021-04-29 Thread Raghavendar T S
Hi Roman I am just doing write operations from the flat map. Does it matter If I use a flap map or sink for this purpose? Thank you Virus-free. www.avast.com

TypeSerializer Example

2021-04-29 Thread Sandeep khanzode
Hello, Is there a working example of a TypeSerializer for a Java type stored in the State? My requirement is that I should be able to store the Java POJO entity in the MapState. The state is backed by RocksDBBackend. If I update the entity with a new member variable, I am unable to deserialise

Backpressure configuration

2021-04-29 Thread Kurtis Walker
Hello, I’m building a POC for a Flink app that loads data from Kafka in to a Greenplum data warehouse. I’ve built a custom sink operation that will bulk load a number of rows. I’m using a global window, triggering on number of records, to collect the rows for each sink. I’m finding that whi

Re: [Schema Evolution] Cannot restore from savepoint after deleting field from POJO

2021-04-29 Thread Alexis Sarda-Espinosa
Hello, I see that new Jira bots are now active. If no one has time to look at this, could documentation at least be updated to reflect the fact that removing fields from POJOs will break state restoration? Regards, Alexis. From: Roman Khachatryan Sent: Friday,

Re: Flink Checkpoint for Stateless Operators

2021-04-29 Thread Roman Khachatryan
Hi Raghavendar, It sounds like you don't actually have flatMap logic, in which case you should use a sink instead of a flatMap. And probably one of the existing ones, as some of them already provide exactly-once guarantee [1]. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connect

Re: How to implement a window that emits events at regular intervals and on specific events

2021-04-29 Thread Till Rohrmann
If you use the Trigger API, then you don't have to do anything special for fault tolerance. When using the ProcessFunction, then you should use Flink's state primitives to store your state (e.g. ValueState). This will automatically checkpoint the state. In case of a failure Flink will always resume

Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-29 Thread Till Rohrmann
Thanks for the additional information Lars. Could you maybe also share the full stack traces of the errors you see when the checkpoint fails? @Becket Qin is it a known issue with the new Kafka sources trying to checkpoint negative offsets? Cheers, Till On Thu, Apr 29, 2021 at 1:06 PM Lars Skjær

Re: Key by Kafka partition / Kinesis shard

2021-04-29 Thread Till Rohrmann
Yes you would have to use the operator state for this. This would have the limitation that rescaling would probably not properly work. Also if the assignment of shards to operators changes upon failure recovery it can happen that it generates some incorrect results (some elements from shard 1 might

Re: [ANNOUNCE] Apache Flink 1.12.3 released

2021-04-29 Thread Till Rohrmann
Great to hear. Thanks a lot for being our release manager Arvid and to everyone who has contributed to this release! Cheers, Till On Thu, Apr 29, 2021 at 4:11 PM Arvid Heise wrote: > Dear all, > > The Apache Flink community is very happy to announce the release of Apache > Flink 1.12.3, which i

How to create a java unit test for Flink SQL with nested field and watermark?

2021-04-29 Thread Svend
I'm trying to write java unit test for a Flink SQL application using Flink mini cluster, but I do not manage to create an input table with nested fields and time characteristics. I had a look at the documentation and examples below, although I'm still struggling: https://ci.apache.org/projects/

Re: Flink missing Kafka records

2021-04-29 Thread Dan Hill
Hey Arvid, I'll try to repo sometime in the next few weeks. I need to make some larger changes to get a full diff to see what is being dropped. On Thu, Apr 29, 2021 at 4:03 AM Arvid Heise wrote: > Hi Dan, > > could you check which records are missing? I'm suspecting it could be > records that

Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

2021-04-29 Thread Lars Skjærven
Unfortunately, I only have the truncated stack trace available (from the flink UI). L 2021-04-27 16:32:02 java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9. at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamT

Re: How to create a java unit test for Flink SQL with nested field and watermark?

2021-04-29 Thread Svend
I found an answer to my own question! For future reference, the snipet below allows to create a SQL table with a nested field and a watermark and filled with hard-coded values, which is all I need in order to test SQL expressions. It's quite a mouthful though, is there a more succint to express

Re: Backpressure configuration

2021-04-29 Thread Roman Khachatryan
Hello Kurt, Assuming that your sink is blocking, I would first make sure that it is not chained with the preceding operators. Otherwise, the same thread will output data and perform windowing/triggering. You can add disableChaining after addSink to prevent this [1]. Besides that, you probably cou

Re: Backpressure configuration

2021-04-29 Thread Kurtis Walker
Thanks Roman. I had already tried disableChaining, it didn’t have any effect. The built in JDBC sink is really slow compared to a bulk load(close to 100x), but I had tested that and saw the same issue. When a given message triggers the JDBC sink to write a batch, everything else waits for it.

Re: Stateful Functions, Kinesis, and ConsumerConfigConstants

2021-04-29 Thread Ammon Diether
That does allow me to setup the ConsumerConfigConstants. - This does have one downside. The SourceFunctionSpec has a different TYPE than KinesisFunctionSpec so the hashed operator ID does not match. Thus I had to allowNonRestoredState. But it is worth it. Thank you. On Thu, Apr 29, 2021 at 4

Re: Flink Event specific window

2021-04-29 Thread Swagat Mishra
Thanks Arvid. very helpful. On Thu, Apr 29, 2021 at 5:46 PM Arvid Heise wrote: > Hi Swagat, > > 1. Where the data primarily resides depends on the chosen state backend > [1]. In most cases, it's written to some file with a memory cache. It's > possible to query the state [2] but not with SQL. I