Re: Delete Keyed State outside of StateTTL

2021-08-30 Thread narasimha
Thank JING. But I have a question here, what will happen to the keyed stream in that case? Will it be removed automatically? or will be present but the state will be empty, in that case what is the implication on memory occupation? On Tue, Aug 31, 2021 at 8:14 AM JING ZHANG wrote: > Hi, > All

Re: k8S HA mode

2021-08-30 Thread Yang Wang
Could you please share the full JobManager logs? AFAIK, you attached exceptions are normal logs when the JobManager is trying to acquire the configmap lock. Best, Yang houssem 于2021年8月31日周二 上午4:36写道: > Hello, thanks for the response > > I am using kubernetes standalone application mode not the

Re: Flink performance with multiple operators reshuffling data

2021-08-30 Thread JING ZHANG
Hi Jason, A job with multiple reshuffle data could be scalable under normal circumstances. But we should carefully avoid data skew. Because if input stream has data skew, add more resources would not help. Besides that, if we could adjust the order of the functions, we could put the keyed process f

Re: Deploying Flink on Kubernetes with fractional CPU and different limits and requests

2021-08-30 Thread Yang Wang
Hi all, I think it is a good improvement to support different resource requests and limits. And it is very useful especially for the CPU resource since it heavily depends on the upstream workloads. Actually, we(alibaba) have introduced some internal config options to support this feature. WDYT?

Re: Flink performance with multiple operators reshuffling data

2021-08-30 Thread Caizhi Weng
Hi! Key-by operations can scale with parallelisms. Flink will shuffle your record to different sub-task according to the hash value of the key modulo number of parallelism, so the more parallelism you have the faster Flink can process data, unless there is a data skew. Jason Liu 于2021年8月31日周二 上午

Re: Delete Keyed State outside of StateTTL

2021-08-30 Thread JING ZHANG
Hi, All types of state also have a method clear() that clears the state for the currently active key, i.e. the key of the input element. Could we call the `clear()` method directly to remove the state under the specified key? Best, JING ZHANG narasimha 于2021年8月31日周二 上午9:44写道: > Hi, > > I have

Delete Keyed State outside of StateTTL

2021-08-30 Thread narasimha
Hi, I have a use case where the keyed state is managed (create, reset) by dynamically changing rules. New action "delete" has to be added. Delete is to completely delete the keyed state, same as how StateTTL does post expiration time. Use StateTTL? Initially used StateTTL, but it ended up in inc

Flink performance with multiple operators reshuffling data

2021-08-30 Thread Jason Liu
Hi there, We have this use case where we need to have multiple keybys operators with its own MapState, all with different keys, in a single Flink app. This obviously means we'll be reshuffling our data a lot. Our TPS is around 1-2k, with ~2kb per event and we use Kinesis Data Analytics as

Re: k8S HA mode

2021-08-30 Thread houssem
Hello, thanks for the response I am using kubernetes standalone application mode not the native one. and this error happens randomly at some point while running the job. Also i am using just one replicas of the jobmanager here is some other logs:: {"@timestamp":"2021-08-30T15:43:44.970+02:00"

Re: Flink issues with Avro GenericRecord serialization

2021-08-30 Thread tarun joshi
An update on this , I see that `IndexedRecord` is part of Avro Library. Please correct me If I am wrong in assuming that the "Pojo's generated by Avro POJO generator must be implementing IndexedRecord interface" It seems either - I should be parsing Stringified Json from AWS Kinesis directly

Flink issues with Avro GenericRecord serialization

2021-08-30 Thread tarun joshi
Hey all, I am trying to write a simple pipeline to read Read Stringified JSON from Kinesis -> parsed to POJO -> converted to Avro -> for the purpose of writing Parquet files to AWS S3. 1) This is my SimpleMapper public class SimpleMapper extends RichMapFunction { private static final GsonBui

Table API demo problem

2021-08-30 Thread Tatla, Manraj
Hello everyone, I am learning Flink because at work we need stateful real time computations in a bot detection system. This weekend, I have had much difficulty in getting the real time reporting API tutorial working. https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/try-flink/

Re: Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

2021-08-30 Thread Samir Vasani
HI, Accessing file is not the problem. If i put the file before starting the job then this job reads it correctly but with if i add any file at runtime then it does not read this newly added files. Let me know if you need more information. Thanks & Regards, Samir Vasani On Mon, Aug 30, 2021 at

Re: Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

2021-08-30 Thread Roman Khachatryan
Hi, If I understand correctly, the problem is accessing local files from Flink running in docker. Have you tried mounting the local directory into the container, for example as a bind mount [1]? [1] https://docs.docker.com/storage/bind-mounts/ Regards, Roman On Mon, Aug 30, 2021 at 3:33 PM Sami

Re: checkpoints/.../shared cleanup

2021-08-30 Thread Khachatryan Roman
Hi, I think the documentation is correct. Once the job is stopped with savepoint, any of its "regular" checkpoints are discarded, and as a result any shared state gets unreferenced and is also discarded. Savepoints currently do not have shared state. Furthermore, the new job should have a new ID

Unable to read a (text)file in FileProcessing.PROCESS_CONTINUOS mode

2021-08-30 Thread Samir Vasani
I have a requirement to read a file continously from a specific path. Means flink job should continously poll the specified location and read a file will arrive at this location at certains intervals . Example: my location on windows machine is C:/inputfiles get a file file_1.txt at 2:00PM, file_

Re: Flink sql CLI parsing avro format data error 解析avro数据失败

2021-08-30 Thread Timo Walther
Thanks for letting us now. I hope we can improve the Avro support in the 1.15 release. Maybe the `"name" : "KafkaAvroMessage", "namespace" : "xxx"` causes the exception then? Otherwise the schema looks identical. Regards, Timo On 30.08.21 11:00, Wayne wrote: I use the way of writing code t

Re: KafkaFetcher [] - Committing offsets to Kafka failed.

2021-08-30 Thread Roman Khachatryan
Hi, I think the preceding message that the consumer is not a member of the group suggests that there is some connectivity issue. Perhaps, heartbeats are timing out in which case you might want to increase session.timeout.ms [1] and heartbeat.interval.ms. [1] https://docs.confluent.io/platform/cur

Re: k8S HA mode

2021-08-30 Thread Roman Khachatryan
Hello, Do I understand correctly that you are using native Kubernetes deployment in application mode; and the issue *only* happens if you set kubernetes-jobmanager-replicas [1] to a value greater than 1? Does it happen during deployment or at some point while running the job? Could you share Fli

Re: Flink sql CLI parsing avro format data error 解析avro数据失败

2021-08-30 Thread Timo Walther
Hi, could it be that there is some corrupt record in your Kafka topic? Maybe you can read from a different offset to verify that. In general, I cannot spot an obivious mistake in your schema. Regards, Timo On 28.08.21 14:32, Wayne wrote: i have Apache Avro schema 我的avro schema 如下 |{ "ty