RE: A question about restoring state with an additional variable with kryo

2022-09-15 Thread Schwalbe Matthias
Hi Vishal, Good news and bad news 😊: * Bad: Kryo serializer cannot be used for schema evolution, see [1] * Good: not all is lost here, * If you happen to have state that you cannot afford to lose, you can transcode it by means of the savepoint API [2], * However, this take

Metrics for ElasticsearchSink and JdbcTableSource

2022-09-15 Thread Satish Saley
Hi folks, I use the following to interact with databases and elasticsearch. org.apache.flink.connector.jdbc.table.JdbcTableSource org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink Do these connectors provide any metrics out of the box? Metrics such as - QPS to database, Elast

Re: A question about restoring state with an additional variable with kryo

2022-09-15 Thread Vishal Santoshi
The exception thrown is as follows. I realize that it is trying to read the long value. How do I signal to kryo that it is OK and that he object can have a default value Caused by: java.io.EOFException: No more bytes left. at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(

Re: A question about restoring state with an additional variable with kryo

2022-09-15 Thread Vishal Santoshi
<< How do I make sure that when reconstituting the state, kryo does not complain? It tries to map the previous state to the new definition of Class A and complains that it cannot read the value for `String b`. >> How do I make sure that when reconstituting the state, kryo does not complain? It tri

A question about restoring state with an additional variable with kryo

2022-09-15 Thread Vishal Santoshi
I have state in rocksDB that represents say class A { String a } I now change my class and add another variable Class A { String a; long b = 0; } How do I make sure that when reconstituting the state, kryo does not complain? It tries to map the previous state to the new definition of Cla

SQL Engine Type inference when extending AsyncTableFunction class twice.

2022-09-15 Thread Jonathan Weaver
I am having an issue with the automatic type inference with SQL engine in an AsyncTableFunction class. I am extending AsyncTableFunction in a BaseClass (common code). Then extending again for some specific implementations. FinalClass extends BaseClass If I use BaseClass it correctly infers the

INSERT INTO will work faster in Flink than in regular database?

2022-09-15 Thread podunk
What's the most effective way (performance) to update big no of rows? Sure this will be probably "INSERT INTO table (column1) SELECT column1 FROM ...". Anyway, I do not see any "UPDATE" in Flink? But sometimes SQL is not enough. Suppose I have code:   TableResult tableResult1 = tEnv.executeSql("SE

Re: Query regarding akka.framesize in Flink

2022-09-15 Thread Danny Cranmer
Hey, Akka frame maximum size is 2GB, which is limited by maximum java byte[] size. Not sure why you config is being rejected. If your Akka frames are getting large you can consider reducing state.storage.fs.memory-threshold [1],l. If you are using RocksDB with incremental checkpoints, triggering co

Query regarding akka.framesize in Flink

2022-09-15 Thread Mahima Agarwal
Hi Team, I am trying to set the value of akka.framesize in flinkconf.yaml to 2 gb, because of which I am getting the below exception which staring flink: "Caused by: java.lang.IllegalArgumentException: requirement failed: Setting 'maximum-frame-size' must be at least 32000 bytes" Flink is getti