Re: How to select an event that has max attribute

2021-12-08 Thread Jing Zhang
Hi Guoqin, I understand the problem you are suffering. I'm sorry I could not find out a perfect solution on Flink 1.13. Maybe you could try to use TopN [1] instead of Window TopN by normalizing time into a unit with 5 minute, and add it to be one of partition keys. But the result is an update stre

Re: Customize Kafka client (module.yaml)

2021-12-08 Thread Jérémy Albrecht
Hi Robert, Thanks for your answer. Indeed, you were right. The properties attribute have to be specified and then it is the non-nested variant. In fact, it is documented for the Egress but not the Ingress but the same behaviour applies. Have a great day, Jérémy

Re: How to select an event that has max attribute

2021-12-08 Thread Guoqin Zheng
Hi Jing, Just verified that it worked with Flink 1.14. But as you said, Flink 1.13 does not yet support it. Other than waiting for KDA to upgrade the Flink version, is there any workaround for Flink 1.13? Thanks, -Guoqin On Wed, Dec 8, 2021 at 10:00 PM Guoqin Zheng wrote: > Hi Jing, > > Thanks

Re: How to select an event that has max attribute

2021-12-08 Thread Guoqin Zheng
Hi Jing, Thanks for chiming in. This sounds great. Any chance this will work for Flink 1.13 as well, as I am using AWS KDA. Thanks, -Guoqin On Wed, Dec 8, 2021 at 7:47 PM Jing Zhang wrote: > Hi Guoqin, > I guess you have misunderstood Martijn's response. > Martijn suggest you use Window TopN.

RE: PyFlink convert data from ds to table

2021-12-08 Thread Королькевич Михаил
I found the problem.In the data stream I had an empty list, but not none (null) On 2021/12/08 13:11:31 Королькевич Михаил wrote:> Hello, Flink team!>> 1) Is it possible to save a python list to table from datastream?>> 2) and then save the accumulated data to avro file?>> For example, my data strea

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-12-08 Thread Natu Lauchande
Hey Ingo, Thanks for the suggestion. It's definitely an issue with the Parquet connector, when we try with the CSV or Blackhole connector it's all fine. I will be trying this approach and report back. Thanks, Natu On Wed, Dec 8, 2021 at 7:02 PM Ingo Bürk wrote: > Hi Natu, > > Something you co

Re: How to select an event that has max attribute

2021-12-08 Thread Jing Zhang
Hi Guoqin, I guess you have misunderstood Martijn's response. Martijn suggest you use Window TopN. Besides, Window TopN does not need to follow a Window Aggregate, it could followed with Window TVF directly since Flink 1.14. Please see document [1] attached. You could try the following SQL to get t

Re: How to select an event that has max attribute

2021-12-08 Thread Guoqin Zheng
Hi Martijn, Thanks for your quick response. I tried it, but it does not seem to work. The problem is that I want to select fields that are not in the `GROUP BY`. So in my example, I can have a tumble window on `readtime`, and select max(gauge), but I also want both `deviceId` and `locationId` of

Re: Running a flink job with Yarn per-job mode from application code.

2021-12-08 Thread Caizhi Weng
Hi! Please make sure to always reply to the user mailing list so that everyone can see the discussion. You can't get the execution environment for an already running job but if you want to operate on that job you can try to get its JobClient instead. However this is somewhat complicated to get wi

Re: Stateful functions - egress question

2021-12-08 Thread Igal Shilman
Hello, Glad to hear that you've successfully deployed a remote function with StateFun :-) It's not clear to me if the only way to deploy a custom egress is to use > and embedded module because documentation states: Indeed currently the only way to define a custom egress is by writing an embedde

Re: GCS/Object Storage Rate Limiting

2021-12-08 Thread Seth Wiesman
Not sure if you've seen this, but Flinks file systems do support connection limiting. https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/common/#connection-limiting Seth On Wed, Dec 8, 2021 at 12:18 PM Kevin Lam wrote: > Hey David, > > Thanks for the response. The

Re: How to select an event that has max attribute

2021-12-08 Thread Martijn Visser
Hi Guoqin, I think you could use the Window Top-N. There's a recipe in the Flink SQL Cookbook [1]. The example uses a SUM which you should change to MAX and of course you change the rownum to 1 instead of 3. Best regards, Martijn [1] https://github.com/ververica/flink-sql-cookbook/blob/main/agg

How to select an event that has max attribute

2021-12-08 Thread Guoqin Zheng
Hi Flink Community, I am curious what the recommended way is to select the event with a max attribute value with SQL api. For example, I have an event stream like: { deviceId, locationId gauge, readtime, <-- eventTime } I want to figure out which device and location has the max gau

Re: GCS/Object Storage Rate Limiting

2021-12-08 Thread Kevin Lam
Hey David, Thanks for the response. The retry eventually succeeds, but I was wondering if there was anything that people in the community have done to avoid GCS/S3 rate-limiting issues. The retries do result in it taking longer for all the task managers to recover and register. On Mon, Dec 6, 202

Re: Issue with Flink jobs after upgrading to Flink 1.13.1/Ververica 2.5 - java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

2021-12-08 Thread Ingo Bürk
Hi Natu, Something you could try is removing the packaged parquet format and defining a custom format[1]. For this custom format you can then fix the dependencies by packaging all of the following into the format: * flink-sql-parquet * flink-shaded-hadoop-2-uber * hadoop-aws * aws-java-sdk-bundle

Re: [DISCUSS] Strong read-after-write consistency of Flink FileSystems

2021-12-08 Thread David Morávek
Hi Martijn, I simply wasn't aware of that one :) It seems to be provided the guarantees that we need [1]. > Of course, Azure Storage is built on a platform grounded in strong > consistency guaranteeing that writes are made durable before acknowledging > success to the client. This is critically i

[DISCUSS] Deprecate MapR FS

2021-12-08 Thread Martijn Visser
Hi all, Flink supports multiple file systems [1] which includes MapR FS. MapR as a company doesn't exist anymore since 2019, the technology and intellectual property has been sold to Hewlett Packard. I don't think that there's anyone who's using MapR anymore and therefore I think it would be good

Re: FileSink in Apache Flink not generating logs in output folder

2021-12-08 Thread Fabian Paul
Hi Kajal, This looks indeed strange. Are you sure that there are records sent to the sink? You can verify it by looking at some Flink metrics of tasks before the task if they emit something. The sink should create a part file immediately when it receives a record and the rolling policy should ensu

Re: [DISCUSS] Strong read-after-write consistency of Flink FileSystems

2021-12-08 Thread Martijn Visser
Hi David, Just to be sure, since you've already included Azure Blob Storage, but did you deliberately skip Azure Data Lake Store Gen2? That's currently supported and also used by Flink users [1]. There's also MapR FS, but I doubt if that is still used. Best regards, [1] https://nightlies.apache.

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-08 Thread Arvid Heise
Hi Ayush, DeserializationSchema.isEndOfStream was only ever supported by Kafka. For new Kafka source, the recommended way is to use the bounded mode like this KafkaSource source = KafkaSource.builder() ... .setStartingOffsets(OffsetsInitializer.earliest())

Re: Stateful functions module configurations (module.yaml) per deployment environment

2021-12-08 Thread Fabian Paul
Hi Deniz, Great to hear from someone using Ververica Platform with StateFun. When deploying your job you can specify `additionalConfigurations`[1] that are also pulled and put into the classpath. Hopefully, that is suitable for your scenario. Best, Fabian [1] https://docs.ververica.com/user_gu

PyFlink convert data from ds to table

2021-12-08 Thread Королькевич Михаил
Hello, Flink team!1) Is it possible to save a python list to table from datastream?2) and then save the accumulated data to avro file? For example, my data stream has the type. Types.ROW_NAMED(['id', 'items'], [Types.STRING, Types.LIST(items_row)] ) items_row  = Types.ROW_NAMED(field_names=['start'

Stateful functions - egress question

2021-12-08 Thread Gigio Topo
Hi, I successfully created a Stateful function F that transforms incoming objects and writes them on a relational database. Function F is deployed as remote module. Everything looks fine. Now, I want to split responsibilities by introducing an custom Egress E for the database while I refactor

Re: Re: Re: Re: how to run streaming process after batch process is completed?

2021-12-08 Thread Yun Gao
Hi Joern, Very thanks for sharing the detailed scenarios! It inspires a lot. If I understand right, could it might be summaried as follows? 1. There is a batch job to first intialize the state, the state is used in the stream mode, and the stream pipeline is different from the the batch job. 2.

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-08 Thread Hang Ruan
There is no way to end the kafka stream from the deserializer. When would you want to end the stream? Could you explain why you need to end the kafka stream without using the offset? Ayush Chauhan 于2021年12月8日周三 15:29写道: > > https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/fl