Re: Poor performance with large keys using RocksDB and MapState

2020-09-24 Thread Yun Tang
Hi If you want to improve the performance of point lookup, you could try to use additional hash index. This feature needs to pass a prefix extractor, however, original interface is not exposed out directly in java API. You could try to call columnFamilyOptions.optimizeForPointLookup(blockCache

Re: How can I drop events which are late by more than X hours/days?

2020-09-24 Thread Arvid Heise
Hi Ori, if you use windows, Flink has already a solution on board with allowed lateness [1]. By default, Flink filters all late records (records timestamp < current watermark). You can add the time X and still allow these elements to be processed. If you end up treating all late events as normal

Best way to resolve bottlenecks with Flink?

2020-09-24 Thread Dan Hill
My job has very slow throughput. What are the best signals that will indicate if there are performance issues? Is there an overall health summary that would indicate the most likely issues impacting performance? I found a variety of pages and metrics. I resolved some of the backpressure in my j

Live updating Serialization Schemas in Flink

2020-09-24 Thread Hunter Herman
Hi flink-users! I need advice about how to tackle a programming problem I’m facing. I have a bunch of jobs that look something like this sketch Source kafkaSource; kafkaSource .map(function that takes generic record) .map( ... ) ... .sink(kafka sink that takes in generic records) T

Re: Ignoring invalid values in KafkaSerializationSchema

2020-09-24 Thread Alexey Trenikhun
You could forward tombstone records to “graveyard” topic, this way they will not confuse anyone reading from regular topic From: Yuval Itzchakov Sent: Thursday, September 24, 2020 11:50:28 AM To: Arvid Heise Cc: Matthias Pohl ; user Subject: Re: Ignoring inval

Re: Ignoring invalid values in KafkaSerializationSchema

2020-09-24 Thread Yuval Itzchakov
Hi Arvid, Thanks for the response: The topic is not log compacted and these invalid values are not actually tombstones, I wouldn't want anyone to misinterpret them as such. Regarding filtering the rows in a separate flatMap, that's a great idea. Only problem is that the rows are opaque from the

Re: Ignoring invalid values in KafkaSerializationSchema

2020-09-24 Thread Arvid Heise
Hi Yuval, Here are some workarounds. One option is to use a tombstone record (0 byte payload) and filter it downstream. If it's log-compacted, Kafka would filter them on compaction. Second option is to actually translate the Row to a byte[] array in a separate flatMap (returning 0 records on err

Re: Ignoring invalid values in KafkaSerializationSchema

2020-09-24 Thread Matthias Pohl
Hi Yuval, thanks for bringing this issue up. You're right: There is no error handling currently implemented for SerializationSchema. FLIP-124 [1] addressed this for the DeserializationSchema, though. I created FLINK-19397 [2] to cover this feature. In the meantime, I cannot think of any other solu

Re: global state and single stream

2020-09-24 Thread Matthias Pohl
Hi Adam, sorry for the late reply. Introducing a global state is something that should be avoided as it introduces bottlenecks and/or concurrency/order issues. Broadcasting the state between different subtasks will also bring a loss in performance since each state change has to be shared with every

Re: metaspace out-of-memory & error while retrieving the leader gateway

2020-09-24 Thread Xintong Song
I'm not entirely sure how many instances of ChildFirstClassLoader should be expected. I would say 3~5 sounds fine. (1 per slot, 1 for the file system plugin, 1 for the metrics reporter plugin, and probably a few more that I'm not aware of). How many task managers and jobs exist in the cluster shoul

Re: metaspace out-of-memory & error while retrieving the leader gateway

2020-09-24 Thread Claude M
I have 35 task managers, 1 slot on each. I'm running a total of 7 jobs in the cluster. All the slots are occupied. When you say that 33 instances of the ChildFirstClassLoader does not sound right, what should I be expecting? Could the number of jobs running in the cluster contribute to the out

Re: How can I drop events which are late by more than X hours/days?

2020-09-24 Thread Matthias Pohl
Hi Ori, one way to do it is to implement a basic ProcessFunction. ProcessFunction.processElement(I value, Context ctx, Collector out) offers access to the context through which you can access the current watermark timestamp using ctx.timerService().currentWatermark(). That you can use to filter out

Re: Error on deploying Flink docker image with Kubernetes (minikube) and automatically launch a stream WordCount job.

2020-09-24 Thread Yang Wang
Glad to hear that. And the repository you shared is very helpful for the users who are trying to deploy Flink clusters on K8s. Best, Yang Felipe Gutierrez 于2020年9月24日周四 下午2:04写道: > thanks Yang, > I got to put it to work in the way that you said. > https://github.com/felipegutierrez/explore-flin

How can I drop events which are late by more than X hours/days?

2020-09-24 Thread Ori Popowski
I need to drop elements which are delayed by more than a certain amount of time from the current watermark. I wanted to create a FilterFunction where I get the current watermark, and if the difference between the watermark and my element's timestamp is greater than X - drop the element. However,

Re: Efficiently processing sparse events in a time windows

2020-09-24 Thread David Anderson
Steven, I'm pretty sure this is a scenario that doesn't have an obvious good solution. As you have discovered, the window API isn't much help; using a process function does make sense. The challenge is finding a data structure to use in keyed state that can be efficiently accessed and updated. On