Re: Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Yun Tang
HI Kevin, Currently, you can view logs to find when to start and finish to restore [1] to know how much time spent on task side. Flink-1.13 also try to expose stage of task initializations [2] and maybe it could help you. state.backend.rocksdb.metrics.total-sst-files-size should be correct to

退订

2021-03-31 Thread Chouchou Mei
退订

Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Dawid Wysakowicz
Hi all, @Kurt @Arvid I think it's fine to merge those two, as they are pretty much finished. We can wait for those two before creating the RC0. @Leonard Personally I'd be ok with 3 more days for that single PR. I find the request reasonable and I second that it's better to have a proper review ra

Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Arvid Heise
Hi Dawid and Guowei, I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We are pretty much just waiting for AZP to turn green, it's separate from other components, and it's a super useful feature for Flink users. Best, Arvid [1] https://github.com/apache/flink/pull/15054 On Thu,

Re: IO benchmarking

2021-03-31 Thread Matthias Pohl
For 2. there are also efforts to expose the state and operator initialization through the logs (see FLINK-17012 [1]). For 3. the TypeSerializer [2] might be another point of interest. It is used to serialize specific types. Other than that, the state serialzation depends heavily on the used state b

Re: PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Dian Fu
Hi Sumeet, I think it should be a bug and I have created a ticket https://issues.apache.org/jira/browse/FLINK-22082 as the following up. Regards, Dian > 2021年4月1日 下午12:25,Guowei Ma 写道: > > Hi, Sumeet > > I am not an expert about PyFlink.

Flink Taskmanager failure recovery and large state

2021-03-31 Thread Yaroslav Tkachenko
Hi everyone, I'm wondering if people have experienced issues with Taskmanager failure recovery when dealing with a lot of state. I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and checkpoints. ~150 task managers with 4 slots each. When I run a pipeline without much state

Re: PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Guowei Ma
Hi, Sumeet I am not an expert about PyFlink. But I think @Dian Fu might give some insight about this problem. Best, Guowei On Thu, Apr 1, 2021 at 12:12 AM Sumeet Malhotra wrote: > Cross posting from StackOverlow here: > > > https://stackoverflow.com/questions/66888486/pyflink-extract-nested

Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Kurt Young
Hi Guowei and Dawid, I want to request the permission to merge this feature [1], it's a useful improvement to sql client and won't affect other components too much. We were plan to merge it yesterday but met some tricky multi-process issue which has a very high possibility hanging the tests. It to

Re: Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Guowei Ma
Hi, Kevin If you use the RocksDB and want to know the data on the disk I think that is the right metric. But the SST files might include some expired data. Some data in memory is not included in the SST files yet. In general I think it could reflect the state size of your application. I think tha

Re: How to specific key serializer

2021-03-31 Thread 陳昌倬
On Wed, Mar 31, 2021 at 05:33:19PM +0800, Tzu-Li (Gordon) Tai wrote: > You can try using TypeInfo annotations to specify a TypeInformationFactory > for your key class [1]. > This allows you to "plug-in" the TypeInformation extracted by Flink for a > given class. In that custom TypeInformation, you

Re: s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

2021-03-31 Thread Guowei Ma
Hi, Robert I think you could try to change the "s3://argo-artifacts/" to " s3a://argo-artifacts/". It is because that currently `StreamingFileSink` only supports Hadoop based s3 but not Presto based s3. [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_

Re: IO benchmarking

2021-03-31 Thread deepthi Sridharan
Thanks, Matthias. This is very helpful. Regarding the checkpoint documentation, I was mostly looking for information on how states from various tasks get serialized into one (or more?) files on persistent storage. I'll check out the code pointers! On Wed, Mar 31, 2021 at 7:07 AM Matthias Pohl wr

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2021-03-31 Thread Lu Niu
Hi, Colletta Thanks for sharing! Do you mind share one stacktrace for that error as well? Thanks! Best Lu On Sat, Mar 27, 2021 at 5:36 AM Colletta, Edward wrote: > > > FYI, we experience a similar error again, lost leadership but not due to > timeout but a disconnect from zookeeper. This time

ARM support

2021-03-31 Thread Rex Fenley
Hello, We would like to run Flink on ARM yet haven't found any resources indicating that this is yet possible. We are wondering what the timeline is for Flink supporting ARM. Given that all Mac Books are moving to ARM and that AWS is excitedly supporting ARM, it seems important that Flink also sup

s3 FileSystem Error "s3 file system implementation does not support recoverable writers"

2021-03-31 Thread Robert Cullen
I’m using a local instance of MINIO on my kubernetes cluster for checkpoint/savepoint storage. I’m using this StreamingFileSync configuration: final StreamingFileSink> sink = StreamingFileSink.forRowFormat( new Path("s3://argo-artifacts/"),

Measuring the Size of State, Savepoint Size vs. Restore time

2021-03-31 Thread Kevin Lam
Hi all, We're interested in doing some analysis on how the size of our savepoints and state affects the time it takes to restore from a savepoint. We're running Flink 1.12 and using RocksDB as a state backend, on Kubernetes. What is the best way to measure the size of a Flink Application's state?

PyFlink: Extract nested fields from JSON array

2021-03-31 Thread Sumeet Malhotra
Cross posting from StackOverlow here: https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array Any pointers are appreciated! Thanks, Sumeet

Re: clear() in a ProcessWindowFunction

2021-03-31 Thread Vishal Santoshi
I had a query Say I have a single key with 2 live sessions ( A and B ) with a configured lateness . Do these invariants hold? * The state is scoped to the key (created per key in the ProcessWindowFunction with a ttl ) * The state will remain alive irrespective of whether the Window is closed or n

Re: Restoring from Flink Savepoint in Kubernetes not working

2021-03-31 Thread Claude M
Thanks for your reply. I'm using the flink docker image flink:1.12.2-scala_2.11-java8. Yes, the folder was created in S3. I took a look at the UI and it showed the following: *Latest Restore ID: 49Restore Time: 2021-03-31 09:37:43Type: CheckpointPath: s3:fcc82deebb4565f31a7f63989939c463/chk

Re: IO benchmarking

2021-03-31 Thread Matthias Pohl
Hi Deepthi, 1. Have you had a look at flink-benchmarks [1]? I haven't used it but it might be helpful. 2. Unfortunately, Flink doesn't provide metrics like that. But you might want to follow FLINK-21736 [2] for future developments. 3. Is there anything specific you are looking for? Unfortunately, I

Re: Restoring from Flink Savepoint in Kubernetes not working

2021-03-31 Thread Matthias Pohl
Hi Claude, thanks for reaching out to the Flink community. Could you provide the Flink logs for this run to get a better understanding of what's going on? Additionally, what exact Flink 1.12 version are you using? Did you also verify that the snapshot was created by checking the actual folder? Bes

Re: DataStream from kafka topic

2021-03-31 Thread Arian Rohani
The issue at hand is that the record contains an unmodifiable collection which the kryo serialiser attempts to modify by first initialising the object and then adding items to the collection (iirc). Caused by: java.lang.UnsupportedOperationException > at > java.util.Collections$Unmodifiabl

Re: Proper way to get DataStream

2021-03-31 Thread Matthias Pohl
Hi Maminspapin again, have you checked whether your topic actually contains data that matches your schema specified through cep.model.User? Best, Matthias On Tue, Mar 30, 2021 at 3:39 PM Maminspapin wrote: > Hi, > > I'm trying to solve a task with getting data from topic. This topic keeps > avr

Re: DataStream from kafka topic

2021-03-31 Thread Matthias Pohl
Ok, it looks like you've found that solution already based on your question in [1]. [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html On Wed, Mar 31, 2021 at 1:26 PM Matthias Pohl wrote: > Hi Maminspapin, > I ha

Re: DataStream from kafka topic

2021-03-31 Thread Matthias Pohl
Hi Maminspapin, I haven't worked with Kafka/Flink, yet. But have you had a look at the docs about the DeserializationSchema [1]? It mentions ConfluentRegistryAvroDeserializationSchema. Is this something you're looking for? Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-stable/

Re: Flink Table to DataStream: how to access column name?

2021-03-31 Thread Yik San Chan
Thank you, Till! Actually I find I can access this via `Table.getSchema.getFieldNames` in version 1.12.0 Best, Yik San On Wed, Mar 31, 2021 at 4:26 PM Till Rohrmann wrote: > You are right Yik San. This feature has only been introduced in the > upcoming 1.13 release [1]. Sorry for causing confu

Re: [DISCUSS] Feature freeze date for 1.13

2021-03-31 Thread Guowei Ma
Hi, community: Friendly reminder that today (3.31) is the last day of feature development. Under normal circumstances, you will not be able to submit new features from tomorrow (4.1). Tomorrow we will create 1.13.0-rc0 for testing, welcome to help test together. After the test is relatively stable

Re:Re: How does Flink SQL read Avro union?

2021-03-31 Thread Vincent Dong
Hi Arvid, I cannot decide the schema of the Kafka source topic since others also consume this topic. I use Flink DataStream to consume the topic and then transform it to schema without union field in it, to avoid the Flink SQL issue. Cheers, Vincent At 2021-03-22 22:04:53, "Arvid Heise" wro

Re: How to specific key serializer

2021-03-31 Thread Tzu-Li (Gordon) Tai
Hi CZ, The issue here is that the Scala DataStream API uses Scala macros to decide the serializer to be used. Since that recognizes Scala case classes, the CaseClassSerializer will be used. However, in the State Processor API, those Scala macros do not come into play, and therefore it directly goe

Re: Fw:A question about flink watermark illustration in official documents

2021-03-31 Thread Matthias Pohl
Hi 罗昊, the 2nd picture is meant to visualize the issue of out-of-orderness in general. I'd say it's not referring to a specific strategy. But one way to interpret the image is using the BoundedOutOfOrderness strategy for watermark generation [1]: You can define an upper bound B for the out-of-orde

Re: Flink Table to DataStream: how to access column name?

2021-03-31 Thread Till Rohrmann
You are right Yik San. This feature has only been introduced in the upcoming 1.13 release [1]. Sorry for causing confusion here. I fear that you have to either use 1.13-SNAPSHOT or wait for the 1.13 release which should happen in a couple of weeks if you really need this feature. [1] https://issue

Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-31 Thread Sumeet Malhotra
Thanks Dawid. This looks like what I needed :-) On Tue, Mar 30, 2021 at 12:28 PM Dawid Wysakowicz wrote: > Hey, > > I am not sure which format you use, but if you work with JSON maybe this > option[1] could help you. > > Best, > > Dawid > > [1] > https://ci.apache.org/projects/flink/flink-docs-r