RE: Apache Flink Serialization Question

2025-05-15 Thread Schwalbe Matthias
Hi Richard, Same problem, 12 Flink versions later, I created my own TypeInformation/Serializer/Snapshot for UUID (Scala in that case), along: class UUIDTypeInformation extends TypeInformation[UUID] … class UUIDSerializer extends TupleSerializerBase[UUID]( … class UUIDSerializerSnapshot(serializ

RE: [External] Re: Is there a way to get consolidated metrics at task level

2025-01-06 Thread Schwalbe Matthias
/operators/#aggregation-operators [2] https://prometheus.io/docs/prometheus/latest/querying/examples/#using-functions-operators-etc Thias From: Sachin Mittal Sent: Monday, January 6, 2025 3:23 PM To: Schwalbe Matthias Cc: user Subject: [External] Re: Is there a way to get consolidated metrics at task

RE: [External] Is there a way to get consolidated metrics at task level

2025-01-06 Thread Schwalbe Matthias
Hi Sachin, Happy new year … 😊 On Yarn we setup prometheus push gateway on one machine and the have al task managers export their monitoring to this push gateway. Then Prometheus would collect all metrics from push gateway. On yarn, the main problem is missing isolation of workloads and thus you

RE: [External] Joining Streams: "One operator with N inputs" vs "N-1 co-processors"

2024-12-04 Thread Schwalbe Matthias
Good morning Salva, The situation is much better than you apparently are aware of 😊 For quite some time there is an implementation for keyed operators with as many inputs as you like: * MultipleInputStreamOperator/KeyedMultipleInputTransformation I originally used your proposed sum types wi

RE: [External] Re: Flink table materialization

2024-11-07 Thread Schwalbe Matthias
Hi Jacob, It’s a little bit of guesswork … The disappearing records remind me a bit of a peculiarity of Oracle, that each (e.g. INSERT) statement is in an implicit transaction and hence needs to be committed. In Flink committing transaction happen together with the checkpoint cycle, i.e. this

RE: [External] Terrible bug related to serialization of Pojos in keystate.

2024-10-16 Thread Schwalbe Matthias
Hi Ammon Diether, This is actually not a bug, for logical (and documented) reasons keys can not be schema-migrated: * When storing state / hash-distributing events, the target key group (one out of max parallelism) is calculated from the key hash. * If you change the key, the hash chang

RE: Kafka connector exception restarting Flink 1.19 pipeline

2024-09-03 Thread Schwalbe Matthias
: Schwalbe Matthias ; user@flink.apache.org Subject: [External] Re: Kafka connector exception restarting Flink 1.19 pipeline ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi Matthias, Thank you for your reply! There should not be a dependency for 3.0.x in my docker image, I only add 3.2.0

RE: Kafka connector exception restarting Flink 1.19 pipeline

2024-09-03 Thread Schwalbe Matthias
Hi Dominik, No clue why this happens, but it looks like that when restarting from the savepoint it uses the flink-connector-kafka version from your docker image (3.0.x ?) instead of the newer one you configured. How did you integrate the newer version? Thias From: dominik.buen...@swisscom.co

RE: Can we share states across tasks/operators

2024-08-16 Thread Schwalbe Matthias
elps. Flink-Greetings Thias From: Christian Lorenz Sent: Friday, August 9, 2024 2:07 PM To: Schwalbe Matthias ; Sachin Mittal ; user@flink.apache.org Subject: AW: Can we share states across tasks/operators ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi Matthias, I am facing a simila

RE: Can we share states across tasks/operators

2024-08-07 Thread Schwalbe Matthias
Hi Sachin, Just as an idea, while you cannot easily share state across operators, you can do so within the same operator: * For two such input streams you could connect() the two streams into a ConnectedStreams and then process() by means of a KeyedCoProcessFunction * For more than two

RE: In what cases Flink op is backpressured but the downstream op is not busy or backpressured?

2024-08-06 Thread Schwalbe Matthias
Hi Brian, Not a direct answer to your question; * Increasing/configuring buffers might not help (Flink self-organizes/optimizes buffers), but maybe increasing parallelism * Something in the numbers you related does not quite fit. With parallelism 1, not having configured slot groups you

RE: Using state processor for a custom windowed aggregate function

2024-08-05 Thread Schwalbe Matthias
* you can aggregate it into the new state * (cardinalities could change) Thias From: Alexis Sarda-Espinosa Sent: Friday, August 2, 2024 7:47 PM To: Schwalbe Matthias Cc: user Subject: Re: Using state processor for a custom windowed aggregate function Hi Matthias, Thank you for looking

RE: Using state processor for a custom windowed aggregate function

2024-08-02 Thread Schwalbe Matthias
ation outputType) throws IOException { Cheers Thias PS: will you come to the FlinkForward conference in October in Berlin (to socialize)? From: Alexis Sarda-Espinosa Sent: Wednesday, July 31, 2024 3:46 PM To: Schwalbe Matthias Cc: user Subject: Re: Using state processor for a custom wi

RE: Using state processor for a custom windowed aggregate function

2024-07-31 Thread Schwalbe Matthias
Hi Alexis, Just a couple of points to double-check: * Does your code compile? (the second argument of withOperator(..) should derive StateBootstrapTransformation instead of SingleOutputStreamOperator) * From the documentation of savepoint API you’ll find examples for each type of state

RE: need flink support framework for dependency injection

2024-03-27 Thread Schwalbe Matthias
ode](ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH case _ => false } } } From: Ruibin Xing Sent: Wednesday, March 27, 2024 10:41 AM To: Schwalbe Matthias Cc: Marco Villalobos ; Ganesh Walse ; user@flink.apache.org Subject: Re: need flink support framework for dependency inject

RE: need flink support framework for dependency injection

2024-03-27 Thread Schwalbe Matthias
Hi Ganesh, I tend to agree with Marco. However your 'feature request' is very loose and leave much room for misunderstanding. There are at least two scenarios for DI integration: - DI for job setup: - we use spring for job setup, which - lets us use the same job structure for (at leas

RE: Not all the task slots are used. Are we missing a setting somewhere?

2024-02-23 Thread Schwalbe Matthias
Thought it would be something the like 😊 Jean-Marc, in future, please ‘reply all’ for your answer such that the community can see it as well 😊 Welcome anyway to the community Thias From: Jean-Marc Paulin Sent: Friday, February 23, 2024 1:14 PM To: Schwalbe Matthias Subject: Re: Not all

RE: Not all the task slots are used. Are we missing a setting somewhere?

2024-02-23 Thread Schwalbe Matthias
Hi Jean-Marc, In absence of more context, did you adjust the parallelism of you job accordingly? Thias From: Jean-Marc Paulin Sent: Friday, February 23, 2024 11:06 AM To: user@flink.apache.org Subject: Q: Not all the task slots are used. Are we missing a setting somewhere? Hi, We used to run

RE: Preparing keyed state before snapshot

2024-02-21 Thread Schwalbe Matthias
te" of each key and putting it in the state with state.update(...) . This must happen per key, But snapshotState() has no visibility of the keys. And I have no way of selectively accessing the state of a specific key to update it. Unless I am missing something Thanks Lorenzo On Fri, 16 Feb 20

RE: Preparing keyed state before snapshot

2024-02-15 Thread Schwalbe Matthias
Good morning Lorenzo, You may want to implement org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in your KeyedProcessFunction. Btw. By the time initializeState(…) is called, the state backend is fully initialized and can be read and written to (which is not the case for

RE: Idleness not working if watermark alignment is used

2024-02-06 Thread Schwalbe Matthias
source is not configured with .withIdleness(…) and becomes factually idle, all window aggregations or stateful stream joins stall until that source becomes active again (= added latency) Thias From: Alexis Sarda-Espinosa Sent: Tuesday, February 6, 2024 9:48 AM To: Schwalbe Matthias Cc: user

RE: Idleness not working if watermark alignment is used

2024-02-05 Thread Schwalbe Matthias
Good morning Alexis, withIdleness(…) is easily misunderstood, it actually means that the thus configured stream is exempt from watermark processing after 5 seconds (in your case). Hence also watermark alignment is turned off for the stream until a new event arrives. .withIdleness(…) is good fo

RE: Issue with Flink Job when Reading Data from Kafka and Executing SQL Query (q77 TPC-DS)

2024-01-03 Thread Schwalbe Matthias
Hi Vladimir, I might be mistaken, here my observations: * List res = CollectionUtil.iteratorToList(result.execute().collect()); will block until the job is finished * However, we have a unbounded streaming job which will not finish until you cancel it * If you just want to print r

RE: Updating existing state with state processor API

2023-10-31 Thread Schwalbe Matthias
, java.lang.String, org.apache.flink.runtime.state.StateBackend) From: Alexis Sarda-Espinosa Sent: Friday, October 27, 2023 4:29 PM To: Schwalbe Matthias Cc: user Subject: Re: Updating existing state with state processor API ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi Matthias

RE: Updating existing state with state processor API

2023-10-26 Thread Schwalbe Matthias
Good morning Alexis, Something like this we do all the time. Read and existing savepoint, copy some of the not to be changed operator states (keyed/non-keyed) over, and process/patch the remaining ones by transforming and bootstrapping to new state. I could spare more details for more specific

RE: Window aggregation on two joined table

2023-09-21 Thread Schwalbe Matthias
which case it is rendered as ‘timeless’ and does not prevent time progress in your join operator * Again, not sure how to configure this Ancora cari saluti Thias From: Eugenio Marotti Sent: Thursday, September 21, 2023 2:35 PM To: Schwalbe Matthias Cc: user@flink.apache.org Subject: Re

RE: Window aggregation on two joined table

2023-09-21 Thread Schwalbe Matthias
Ciao Eugenio, I might be mistaken, but did you specify the event time for the second table like you did for the first table (watermark(….))? I am no so acquainted with table api (doing more straight data stream api work), but I assume this join and windowing should be by event time. What do you

RE: Checkpoint jitter?

2023-09-13 Thread Schwalbe Matthias
Hi Mátyás, Checkpoint are meant to be atomic in nature, i.e. everything is checkpointed at the more or less same time. What you can do in newer Flink versions is to enable the Change Log Feature (see [1]) which spreads the actual I/O for writing checkpoint files to a longer period and to keep

RE: kafka duplicate messages

2023-09-07 Thread Schwalbe Matthias
Hi Nick, Short (and somewhat superficial answer): * (assuming your producer supports exactly once mode (e.g. Kafka)) * Duplicates should only ever appear when your job restarts after a hiccup * However if you job is properly configured (checkpointing/Kafka transactions) everything sh

RE: using CheckpointedFunction on a keyed state

2023-09-07 Thread Schwalbe Matthias
Hi Krzysztof again, Just for clarity … your sample code [1] tries to count the number of events per key. Assuming this is your intention? Anyway your previous implementation initialized the keyed state keyCounterState in the open function that is the right place to do this, you just wouldn’t wa

RE: updating keyed state in open method.

2023-09-07 Thread Schwalbe Matthias
Hi Krzysztof, You cannot access keyed state in open(). Keyed state has a value per key. In theory you would have to initialize per possible key, which is quite impractical. However you don’t need to initialize state, the initial state per key default to the default value of the type (null for ob

RE: Rate Limit / Throttle Data to Send

2023-08-30 Thread Schwalbe Matthias
Hi Patricia, What you try to implement can be achieved out-of-the-box by windowing. I assume these packets of 100 event are by key but globally. In that case use non-keyed windowing [1] with count trigger (100) [3] and maybe add a processing time trigger if it takes too long time to collect all

RE: Checkpoint/savepoint _metadata

2023-08-29 Thread Schwalbe Matthias
Hi Frederic, I’ve once (upon a time 😊) had a similar situation when we changed from Flink 1.8 to Flink 1.13 … It took me a long time to figure out. Some hints where to start to look: * _metadata file is used for * Job manager state * Smallish keyed state (in order to avoid too

RE: [E] RE: Recommendations on using multithreading in flink map functions in java

2023-08-18 Thread Schwalbe Matthias
7hl-VgoNQ_l5tszcDDoP-vY4yBoLTIBRev_Iqtkyrei7vIQtduLckRXkz5Q3SIo42ZmYhhONov02b1Cl1g$> From: Vignesh Kumar Kathiresan Sent: Thursday, August 17, 2023 10:27 PM To: Schwalbe Matthias Cc: liu ron ; dominik.buen...@swisscom.com Subject: Re: [E] RE: Recommendations on using multithreading in flink map functi

RE: Recommendations on using multithreading in flink map functions in java

2023-08-15 Thread Schwalbe Matthias
Hi Vignesh, In addition to what Ron has said, there are a number of options to consider, depending on the nature of your calculations: Given that your main focus seems to be latency: * As Ron has said, Flink manages parallelism in a coarse grained way that is optimized for spending as lit

RE: Join two streams

2023-06-29 Thread Schwalbe Matthias
Привет Иван, The source of your problem is quite easy: - If you do windowing by event time, all the sources need to emit watermarks. - watermarks are the logical clock used when event-time timing - you could use either processing time windows, or adjust watermark strategy of your sources accordin

RE: Identifying a flink dashboard

2023-06-28 Thread Schwalbe Matthias
ry: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless From: Mike Phillips Sent: Thursday, June 29, 2023 7:42 AM To:

RE: Identifying a flink dashboard

2023-06-27 Thread Schwalbe Matthias
Good Morning Mike, As a quick fix, sort of, you could use an Ingress on nginx-ingress (instead of the port-forward) and Add a sub_filter rule to patch the HTML response. I use this to add a tag to the header and for the Flink-Dashboard I experience no glitches. As to point 3. … you don’t need

RE: Using pre-registered schemas with avro-confluent-registry format is not possible

2023-05-31 Thread Schwalbe Matthias
Hello Jannik, Some things to consider (I had a similar problem a couple of years before): * The schemaRegistryClient actually caches schema ids, so it will hit the schema registry only once, * The schema registered in schema registry needs to be byte-equal, otherwise schema registry co

RE: Bootstrapping multiple state within same operator

2023-03-24 Thread Schwalbe Matthias
Hi David, … coming in late into this discussion We had a very similar problem and I found a simple way to implement priming savepoints with mixed keyed/operator state. The trick is this: * In your KeyedStateBootstrapFunction also implement CheckpointedFunction * In initializeState() you

RE: Is it possible to preserve chaining for multi-input operators?

2023-03-24 Thread Schwalbe Matthias
gards, Viacheslav From: Schwalbe Matthias mailto:matthias.schwa...@viseca.ch>> Sent: 28 February 2023 15:50 To: Viacheslav Chernyshev mailto:v.chernys...@outlook.com>>; user@flink.apache.org<mailto:user@flink.apache.org> mailto:user@flink.apache.o

RE: Avoiding data shuffling when reading pre-partitioned data from Kafka

2023-03-07 Thread Schwalbe Matthias
Hi Tommy, While not coming up with a sure solution, I’ve got a number of idea on how to continue and shed light into the matter: * With respect to diagnostics, have you enabled flame graph (cluster-config.rest.flamegraph.enabled), * It allows you to see the call tree of each task an

RE: Is it possible to preserve chaining for multi-input operators?

2023-02-28 Thread Schwalbe Matthias
have never used it before. Would you be able to share the details for how to force the chaining with e.g. two input streams? Kind regards, Viacheslav From: Schwalbe Matthias mailto:matthias.schwa...@viseca.ch>> Sent: 28 February 2023 14:12 To: Viacheslav

RE: Is it possible to preserve chaining for multi-input operators?

2023-02-28 Thread Schwalbe Matthias
Hi Viacheslav, These are two very interesting questions... You have found out about the chaining restriction to single input operators to be chained, it does also not help to union() multiple streams into a single input, they still count as multiple inputs. * The harder way to go would b

RE: Fast and slow stream sources for Interval Join

2023-02-28 Thread Schwalbe Matthias
Hi All, Another option to consider (and this is more a question 😊) is to * Implement org.apache.flink.streaming.api.operators.InputSelectable in the join operator * And manually control backpressure on the inputs running ahead of watermark time I’m not sure where actually to implement

RE: DI in flink

2023-02-14 Thread Schwalbe Matthias
Hi Yashoda, I use Spring-Boot to setup my job networks and DI-compose streaming components like operators/functions etc. Important part is that all components need to be serializable in order for this to work. Specific task implementations are a little more difficult (little experience) to set

RE: Reducing Checkpoint Count for Chain Operator

2023-02-02 Thread Schwalbe Matthias
single file? Sincere greetings Thias From: Talat Uyarer Sent: Thursday, February 2, 2023 5:57 PM To: Schwalbe Matthias Cc: Kishore Pola ; weijie guo ; user@flink.apache.org Subject: Re: Reducing Checkpoint Count for Chain Operator ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi

RE: Reducing Checkpoint Count for Chain Operator

2023-02-01 Thread Schwalbe Matthias
Hi Kishore, Having followed this thread for a while it is still quite a bit of confusion of concepts and in order to help resolve your original we would need to know, * what makes your observation a problem to be solved? * You write, you have no shuffling, does that mean you don’t use a

RE: Processing watermarks in a broadcast connected stream

2023-01-31 Thread Schwalbe Matthias
Good Morning Sajjad, I’ve once had a similar problem. As you’ve found out, directly using KeyedBroadcastProcessFunction is a little tricky. What I ended up with instead is to use the rather new @PublicEvolving MultipleInputStreamOperator. It allows you to connect and process any (reasonable) num

RE: Understanding pipelined regions

2022-12-20 Thread Schwalbe Matthias
Hi Sunny, Welcome to Flink 😊. The next thing for you to consider is to setup checkpointing [1] which allows a failing job to pick up from where it stopped. Sincere greetings from the supposed close-by Zurich 😊 Thias [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastre

RE: Concatenating a bounded and unbounded stream

2022-10-27 Thread Schwalbe Matthias
Sorry, I’ve got thing really mixed up, I meant to reply to this other thread … ☹ Thias From: Schwalbe Matthias Sent: Thursday, October 27, 2022 9:14 AM To: 'Tzu-Li (Gordon) Tai' ; Filip Karnicki Cc: user Subject: RE: State Processor API - VoidNamespaceSerializer must be compatible

RE: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

2022-10-27 Thread Schwalbe Matthias
[1] org.apache.flink.streaming.api.graph.StreamGraphGeneratorBatchExecutionTest.InputSelectableMultipleInputOperator [2] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_ From: Tzu-Li (Gordon) Tai Sent: Wednesday, October 26, 2022 6:59 PM To: Filip Karnicki Cc: Schwalbe Matthias ; user Subject:

RE: State Processor API - VoidNamespaceSerializer must be compatible with the old namespace serializer LongSerializer

2022-10-25 Thread Schwalbe Matthias
Hi Filip, It looks like, your state primitive is used in the context of Windows: Keyed state works like this: * It uses a cascade of key types to store and retrieve values: * The key (set by .keyBy) * A namespace (usually a VoidNamespace), unless it is used in context of a spec

RE: Re:Question about Flink Broadcast State event ordering

2022-10-10 Thread Schwalbe Matthias
Hi Qing again, Another point to consider: broadcast streams are subject to watermarking. i.e. * You can wait to process the broadcast records only after the watermark passed, then * order those records by time * keep all broadcast records where the watermark not yet passed in some e

RE: Loading broadcast state on BroadcastProcessFunction instantiation or open method

2022-09-27 Thread Schwalbe Matthias
Hi Alfredo, Did you consider implementing org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in your broadcast function … the initializeState(…) function should give you access to the state backend. Kind regards Thias From: David Anderson Sent: Tuesday, September 27,

KeyedMultipleInputTransformation: StreamingRuntimeContext#keyedStateStore is not properly initialized

2022-09-21 Thread Schwalbe Matthias
Hi all, When trying to adopt the new (@Experimental) KeyedMultipleInputTransformation I came across following problem: * In the open(…) function of my operator, derived from MultipleInputStreamOperator with AbstractStreamOperatorV2, I can not initialize keyed state primitives, because *

RE: Is it possible to connect multiple streams

2022-09-21 Thread Schwalbe Matthias
Hi Deepak, Coming back to your original question, you’ve got a number of option (some of them already mentioned: * You can connect/join 2 streams of different types at a time by means of s1.connect(s2). * (your example does not work directly as written (3 streams)) * You can conn

RE: A question about restoring state with an additional variable with kryo

2022-09-15 Thread Schwalbe Matthias
Hi Vishal, Good news and bad news 😊: * Bad: Kryo serializer cannot be used for schema evolution, see [1] * Good: not all is lost here, * If you happen to have state that you cannot afford to lose, you can transcode it by means of the savepoint API [2], * However, this take

RE: how to connect to the flink-state store and use it as cache to serve APIs.

2022-07-06 Thread Schwalbe Matthias
Hi Laxmi, Did you consider Apache Flink Table Store [1] which was introduced short time ago. Yours sounds like a case for early integration … Sincere greetings Thias [1] https://nightlies.apache.org/flink/flink-table-store-docs-release-0.1/ From: laxmi narayan Sent: Wednesday, July 6, 2022 6

RE: Synchronizing streams in coprocessfunction

2022-06-27 Thread Schwalbe Matthias
Hi Gopi, Your use case is a little under-specified to give a specific answer, especially to the nature of the two input streams and the way events of both streams are correlated (joined): * Is your fast-stream keyed? * If yes: keyed state and timers can be used, otherwise only operat

RE: Recover watermark from savepoint

2022-06-09 Thread Schwalbe Matthias
Hi Sweta, It is actually a sound idea to implement a dedicated process function for this purpose, as David suggests. Especially if you are in a situation where waiting for a valid natural watermark after a restore from savepoint is not sufficient. We had a situation with input streams of differ

RE: Checkpoint directories not cleared as TaskManagers run

2022-05-19 Thread Schwalbe Matthias
ckpoint. It will not cause my program to fail correct? * Imho there would be no reason to setup checkpointing in the first place, if you cannot restart a job from such checkpoint * This is only important, of course, if you need reliability, or exactly once semantics … Thias From: Jam

RE: Checkpoint directories not cleared as TaskManagers run

2022-05-17 Thread Schwalbe Matthias
Hi James, From reading the thread … I assume, your file:/tmp/Flink/State folder is not shared across all machines, right? In this case it cannot work: - checkpoints and savepoints need to go to a path that can be commonly accessed by jobmanager and all taskmanagers in order to work - as your jo

RE: Practical guidance with Scala and Flink >= 1.15

2022-05-10 Thread Schwalbe Matthias
… just for my understanding From the announcements I only got that scala remains only a dependency in the JARs that relate to the Scala API. I never read about plans to drop the Scala API altogether … is that the case?? That would be very unfortunate … What is the state of the affair? Best rega

RE: Notify on 0 events in a Tumbling Event Time Window

2022-05-10 Thread Schwalbe Matthias
Hi Shilpa, There is no need to have artificial messages in the input kafka topic (and I don’t see where Andrew suggests this 😊 ) However your use case is not 100% clear as to for which keys you want to emit 0-count window results , either: * A) For all keys your job has ever seen (that’s e

RE: Migrating Flink apps across cloud with state

2022-05-04 Thread Schwalbe Matthias
Hello Hemanga, MirrorMaker can cause havoc in many respects, for one, it does not have strict exactly-once.semantics… The way I would tackle this problem (and have done in similar situaltions): * For the source topics that need to be have exactly-once-semantics and that are not intrinsica

RE: Restore Job from CheckPoint in IntelliJ IDE - MiniCluster

2022-04-22 Thread Schwalbe Matthias
Happy to hear that (back-posted to usr list) Thias -Original Message- From: Κωνσταντίνος Αγαπίδης Sent: Friday, April 22, 2022 3:50 PM To: Schwalbe Matthias Subject: Re: Restore Job from CheckPoint in IntelliJ IDE - MiniCluster *** EXTERNAL MESSAGE – CAUTION: Think Before You Click

RE: Kubernetes killing TaskManager - Flink ignoring taskmanager.memory.process.size

2022-04-21 Thread Schwalbe Matthias
Hi Dan, Assuming from previous mails that you are using RocksDb … this could have to do with the glibc bug [1][2] … I’m never sure in which setting this is already been taken care of … However your situation is very typical with glibc as allocator underneath RocksDb and giving more memory won’t

RE: Restore Job from CheckPoint in IntelliJ IDE - MiniCluster

2022-04-21 Thread Schwalbe Matthias
Hi Kostas, Did you give setting execution.savepoint.path a try? You can set the property on local environment by means of env.configure(...). This work for me ... (didn't try yet on Flink 1.15) Thias [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#exec

RE: Watermarks event time vs processing time

2022-03-29 Thread Schwalbe Matthias
manner. Hope this helps Thias From: HG Sent: Tuesday, March 29, 2022 1:07 PM To: Schwalbe Matthias Cc: user Subject: Re: Watermarks event time vs processing time ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hello Matthias, When I remove all the watermark strategies it does not

RE: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Schwalbe Matthias
Oops mistyped your name, Dan From: Schwalbe Matthias Sent: Freitag, 18. März 2022 09:02 To: 'Dan Hill' ; Dongwon Kim Cc: user Subject: RE: Weird Flink Kafka source watermark behavior Hi San, Dongwon, I share the opinion that when per-partition watermarking is enabled, you shoul

RE: Weird Flink Kafka source watermark behavior

2022-03-18 Thread Schwalbe Matthias
Hi San, Dongwon, I share the opinion that when per-partition watermarking is enabled, you should observe correct behavior … would be interesting to see why it does not work for you. I’d like to clear one tiny misconception here when you write: >> - The same issue happens even if I use an idle

RE: Watermarks event time vs processing time

2022-03-16 Thread Schwalbe Matthias
ing of the watermarks on single operators / per subtask useful: Look for subtasks that don’t have watermarks, or too low watermarks for a specific session window to trigger. Thias From: HG Sent: Mittwoch, 16. März 2022 16:41 To: Schwalbe Matthias Cc: user Subject: Re: Watermarks event time

RE: Watermarks event time vs processing time

2022-03-16 Thread Schwalbe Matthias
Hi Hanspeter, Let me relate some hints that might help you getting concepts clearer. From your description I make following assumptions where your are not specific enough (please confirm or correct in your answer): 1. You store incoming events in state per transaction_id to be sorted/aggreg

RE: Savepoint API challenged with large savepoints

2022-03-10 Thread Schwalbe Matthias
From: Chesnay Schepler Sent: Donnerstag, 10. März 2022 10:47 To: Schwalbe Matthias ; user@flink.apache.org Subject: Re: Savepoint API challenged with large savepoints That all sounds very interesting; I'd go ahead with creating tickets. On 08/03/2022 13:43, Schwalbe Matthias wrote: Dear Flink T

RE: Could not stop job with a savepoint

2022-03-09 Thread Schwalbe Matthias
-exceeded [2] https://issues.apache.org/jira/browse/FLINK-19125 From: Vinicius Peracini Sent: Mittwoch, 9. März 2022 17:56 To: Schwalbe Matthias Cc: Dawid Wysakowicz ; user@flink.apache.org Subject: Re: Could not stop job with a savepoint So apparently the YARN container for Task Manager is

RE: Incremental checkpointing & RocksDB Serialization

2022-03-08 Thread Schwalbe Matthias
Hi Vidya, As to the choice of serializer: * Flink provides two implementations that support state migration, AVRO serializer, and Pojo serializer * Pojo serializer happens to be one of the fastest available serializers (faster than AVRO) * If your record sticks to Pojo coding rules

Savepoint API challenged with large savepoints

2022-03-08 Thread Schwalbe Matthias
Dear Flink Team, In the last weeks I was faced with a large savepoint (around 40GiB) that contained lots of obsolete data points and overwhelmed our infrastructure (i.e. failed to load/restart). We could not afford to lose the state, hence I spent the time to transcode the savepoint into someth

RE: Could not stop job with a savepoint

2022-03-07 Thread Schwalbe Matthias
Bom Dia Vinicius, Can You still find (and post) the exception stack from your jobmanager log, the flink client log does not reveal enough information. Your situation reminds me of something similar I had. In the log you might find something like this or similar: 2022-03-07 02:15:41,347 INFO org

RE: MapState.entries()

2022-03-07 Thread Schwalbe Matthias
Hi Alexey, To my best knowledge it's lazy with RocksDBStateBackend, using the Java iterator you could even modify the map (e.g. remove()). Cheers Thias From: Alexey Trenikhun Sent: Dienstag, 8. März 2022 06:11 To: Flink User Mail List Subject: MapState.entries() Hello, We are using RocksDB

RE: processwindowfunction output Iterator

2022-03-01 Thread Schwalbe Matthias
Goedemorgen Hans, You can call the out.collect(…) multiple times, i.e. for each forwarded event … how about this 😊 Thias From: HG Sent: Montag, 28. Februar 2022 16:25 To: user Subject: processwindowfunction output Iterator Hi, Can processwindowfunction output an Iterator? I need to sort a

RE: Trouble sinking to Kafka

2022-02-23 Thread Schwalbe Matthias
Good morning Marco, Your fix is pretty plausible: * Kafka transactions get started at the beginning of a checkpoint period and contain all events collected through this period, * At the end of the checkpoint period the associated transaction is committed and concurrently the transactio

RE: Basic questions about resuming stateful Flink jobs

2022-02-17 Thread Schwalbe Matthias
Hi James, Coming back to your original question on how to restart jobs from savepoints/checkpoints on LocalStreamEnvironment (the one used in a debugger): Out of the box LocalStreamEnvironment does not allow setting a snapshot path to resume the job from. The trick for me to do it anyway was to

RE: Memory issues with Rocksdb ColumnFamilyOptions

2022-02-04 Thread Schwalbe Matthias
Hi Natalie, I happen to currently work on a similar problem: I’ve got a savepoint of about 40Gb just for one of the operator states, 70 Million keys. With ExistingSavepoint there is currently a number of problems: * When reading from a local copy of the savepoint, non-buffered I/O is used a

RE: Regarding Queryable state in Flink

2022-01-25 Thread Schwalbe Matthias
Hi Jessy, Have you considered using the state processor api [1] for offline analysis of checkpoints and savepoints? [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/libs/state_processor_api/ Sincere greetings Thias From: Jessy Ping Sent: Montag, 24. Januar 2022 16:47 To:

RE: unaligned checkpoint for job with large start delay

2022-01-11 Thread Schwalbe Matthias
Hi Mason, Since you are using RocksDB, you could enable this metric [1] state-backend-rocksdb-metrics-estimate-num-keys which gives (afaik) good indication of the number of active windows. I’ve never seen (despite the warning) negative effect on the runtime. Hope this help … Thias [1] htt

RE: unexpected result of interval join when using sql

2021-12-15 Thread Schwalbe Matthias
Probably an oversight ... did you actually mean to publish your password? Better change it the sooner possible ... Thias From: cy Sent: Donnerstag, 16. Dezember 2021 06:55 To: user@flink.apache.org Subject: unexpected result of interval join when using sql Hi Flink 1.14.0 Scala 2.12 I'm usin

RE: CoGroupedStreams and disableAutoGeneratedUIDs

2021-12-12 Thread Schwalbe Matthias
Hi Dan, When I run into such problem I consider using the not so @public api levels: * First of all uids are especially needed for operator that hold state and is not so important for operators that don’t hold state primitives, not sure of the implications created by disableAutoGeneratedUID

RE: Any way to require .uid(...) calls?

2021-12-05 Thread Schwalbe Matthias
Hi Dan, In case you also want to keep automatic UID assignment, we do something like this (scala): override def run(args: ApplicationArguments): Unit = { require(jobName != null, "a specific jobName needs to be configured, if hosted in Spring Boot, configure 'flink.job.name' in application.ya

RE: Windows and data loss.

2021-12-01 Thread Schwalbe Matthias
. Thias From: John Smith Sent: Freitag, 26. November 2021 17:17 To: Schwalbe Matthias Cc: Caizhi Weng ; user Subject: Re: Windows and data loss. Or as an example we have a 5 minutes window and lateness of 5 minutes. We have the following events in the logs 10:00:01 PM > Already pushed

RE: Windows and data loss.

2021-11-26 Thread Schwalbe Matthias
] https://www.oreilly.com/radar/the-world-beyond-batch-streaming-102/ … happy to discuss further 😊 Thias From: John Smith Sent: Freitag, 26. November 2021 14:09 To: Schwalbe Matthias Cc: Caizhi Weng ; user Subject: Re: Windows and data loss. But if we use event time, if a failure happens

RE: Windows and data loss.

2021-11-25 Thread Schwalbe Matthias
:55 To: Schwalbe Matthias Cc: Caizhi Weng ; user Subject: Re: Windows and data loss. Well what I'm thinking for 100% accuracy no data loss just to base the count on processing time. So whatever arrives in that window is counted. If I get some events of the "current" window late a

RE: Windows and data loss.

2021-11-25 Thread Schwalbe Matthias
Hi John, … just a short hint: With datastream API you can * hand-craft a trigger that decides when an how often emit intermediate, punctual and late window results, and when to evict the window and stop processing late events * in order to process late event you also need to specify for

RE: Custom partitioning of keys with keyBy

2021-11-04 Thread Schwalbe Matthias
Hi Yuval, … I had to do some guesswork with regard to your use case … still not exactly clear what you want to achieve, however I remember having done something similar in that area 2 years ago. Unfortunately I cannot find the implementation anymore ☹ * If you tried a combination of .parti

RE: Custom partitioning of keys with keyBy

2021-11-03 Thread Schwalbe Matthias
Hi Yuval, Just a couple of comments: * Assuming that all your 4 different keys are evenly distributed, and you send them to (only) 3 buckets, you would expect at least one bucket to cover 2 of your keys, hence the 50% * With low entropy keys avoiding data skew is quite difficult *

RE: FlinkKafkaConsumer -> KafkaSource State Migration

2021-11-01 Thread Schwalbe Matthias
Thanks Fabian, That was the information I was missing. (Late reply ... same here, FlinkForward 😊 ) Thias -Original Message- From: Fabian Paul Sent: Donnerstag, 28. Oktober 2021 08:38 To: Schwalbe Matthias Cc: Mason Chen ; user Subject: Re: FlinkKafkaConsumer -> KafkaSou

RE: FlinkKafkaConsumer -> KafkaSource State Migration

2021-10-27 Thread Schwalbe Matthias
I would also be interested on instructions/discussion on how to state-migrate from pre-unified sources/sinks to unified ones (Kafka) 😊 Thias From: Mason Chen Sent: Mittwoch, 27. Oktober 2021 01:52 To: user Subject: FlinkKafkaConsumer -> KafkaSource State Migration Hi all, I read these instru

RE: Duplicate Calls to Cep Filter

2021-10-27 Thread Schwalbe Matthias
Hi Puneet, … not able to answer your question, but I would be curious to also print out the value with your diagnostic message. … assuming we’ll see an ‘a’ and a ‘b’ for both filters resp. … simple explanation would be that the filters are applied to all input, regardless of the pattern matchi

RE: Huge backpressure when using AggregateFunction with Session Window

2021-10-26 Thread Schwalbe Matthias
onnerstag, 21. Oktober 2021 15:32 To: Schwalbe Matthias Cc: user Subject: Re: Huge backpressure when using AggregateFunction with Session Window Thanks for taking the time to answer this. * You're correct that the SimpleAggregator is not used in the job setup. I didn't copy th

RE: Huge backpressure when using AggregateFunction with Session Window

2021-10-20 Thread Schwalbe Matthias
Hi Ori, Just a couple of comments (some code is missing for a concise explanation): * SimpleAggregator is not used in the job setup below (assuming another job setup) * SimpleAggregator is called for each event that goes into a specific session window, however * The scala vectors

  1   2   >