Re: Data Stream countWindow followed by keyBy does not preserve time order

2021-11-03 Thread Guowei Ma
Hi Yan After a second thought I think you are right, the downstream operator should keep the order of the same key from the same upstream. So feel free to open a jira. Best, Guowei On Wed, Nov 3, 2021 at 7:30 PM Yan Shen wrote: > Hi, > > It will complicate things a lot if we cannot assume input

Re: Custom partitioning of keys with keyBy

2021-11-03 Thread naitong Xiao
I think I had a similar scenario several months ago, here is my related code: val MAX_PARALLELISM = 16 val KEY_RAND_SALT = “73b46” logSource.keyBy{ value =>  val keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(value.deviceUdid, MAX_PARALLELISM)  s"$KEY_RAND_SALT$keyGroup" } The keyGroup is

Re: Statefun remote functions - acessing kafka headers from a remote function

2021-11-03 Thread Igal Shilman
Hi Fil, The default Kafka ingress that ships with StateFun indeed doesn't bundle the headers with the incoming message, so there is no way of getting them at the moment, without doing some work :( I'd be also happy to kick off the discussion (I guess JIRA would be the right place) about supporting

How to tune memory settings for batch job using sort-merge?

2021-11-03 Thread Joern Kottmann
Hello! I often use batch mode to validate that my pipeline can produce the expected results over some fixed input data, that usually works very well and definitely helps to find bugs in my user code. I have one job that reads many TBs of data from S3 and then writes reduced outputs back to S3. T

Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Ali Bahadir Zeybek
Hello Qihua, This will require you to implement and maintain your own database insertion logic using any of the clients that your database and programming language supports. Bear in mind that you will be losing all the optimizations Flink's connector provides for you and this will add complexity t

Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Qihua Yang
Many thanks guys! Hi Ali, for approach 2, what is the better way to do the database inserts for this case? Currently we simply use JDBC SQL connector to sink to database. Thanks, Qihua On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek wrote: > Hello Qihua, > > If you do not care with the events

Re: Custom partitioning of keys with keyBy

2021-11-03 Thread David Anderson
Another possibility, if you know in advance the values of the keys, is to find a mapping that transforms the original keys into new keys that will, in fact, end up in disjoint key groups that will, in turn, be assigned to different slots (given a specific parallelism). This is ugly, but feasible.

Re: What is Could not retrieve file from transient blob store?

2021-11-03 Thread John Smith
Ok I missed the log below. I guess when the task manager was stopped this happened. I attached the full sequence. But I guess it's ok and not a big issue??? 2021-11-02 23:20:22,682 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler - Failed to transfer file from Ta

GenericWriteAheadSink, declined checkpoint for a finished source

2021-11-03 Thread James Sandys-Lumsdaine
Hello, I have a Flink workflow where I need to upload the output data into a legacy SQL Server database and so I have read the section in the Flink book about data sinks and utilizing the GenericWriteAheadSink base class. I am currently using Flink 1.12.3 although we plan to upgrade to 1.14 sho

Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Ali Bahadir Zeybek
Hello Qihua, If you do not care with the events that are not committed to DB, you can use Async I/O [1] and implement a logic that - does the database inserts - completes the original events that are only accepted by DB You can then sink this new datastream to kafka. If you are also inter

RE: Custom partitioning of keys with keyBy

2021-11-03 Thread Schwalbe Matthias
Hi Yuval, Just a couple of comments: * Assuming that all your 4 different keys are evenly distributed, and you send them to (only) 3 buckets, you would expect at least one bucket to cover 2 of your keys, hence the 50% * With low entropy keys avoiding data skew is quite difficult *

Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Francesco Guardiani
An alternative is to use a CDC tool like Debezium to stream your table changes, and then ingest that stream using Flink to push data later to Kafka. On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma wrote: > Hi, Qihua > > AFAIK there is no way to do it. Maybe you need to implement a "new" sink > to archi

Re: [Statefun] Questions on recovery

2021-11-03 Thread Igal Shilman
Hello Hady, Glad to see that you are testing StateFun! Regarding that exception, I think that this is not the root cause. The root cause is as you wrote that the StateFun job failed because it wasn't able to deliver a message to a remote function in the given time frame. If you look at the logs yo

Re: Statefun embedded functions - parallel per partition, sequential per key

2021-11-03 Thread Igal Shilman
Glad to hear it worked out for you :-) Cheers, Igal On Tue, Nov 2, 2021 at 1:57 PM Filip Karnicki wrote: > Hi All > > Just an update for future reference, it turned out that the machine we > were using for this test didn't have enough memory for what we were asking > it to do. It was that simpl

Re: Data Stream countWindow followed by keyBy does not preserve time order

2021-11-03 Thread Yan Shen
Hi, It will complicate things a lot if we cannot assume input order of any operator after a keyBy. So far I only have the problem with countWindow which I seem to be able to avoid by writing my own stateful KeyedProcess. Are there other operators which might cause the same problem? The other alte

Re: Possibility of supporting Reactive mode for native Kubernetes application mode

2021-11-03 Thread Nicolaus Weidner
Hi Fuyao, I just wanted to say that the performance loss that you rightly suspected when using savepoints (as opposed to checkpoints) may disappear with Flink 1.15. There should be no loss of functionality as far as checkpoints are concerned. I don't think the savepoint performance improvement goa

Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-03 Thread Yuxin Tan
Thanks Daisy and Kevin! The IO scheduling idea of the sequential reading and the benchmark result look really great! Looking forward to the next work. Best, Yuxin weijie guo 于2021年11月3日周三 下午5:24写道: > It's really an amazing job to fill in the defects of flink in batch > shuffle. I really appre

[Statefun] Questions on recovery

2021-11-03 Thread Hady Januar Willi
Hi everyone, When testing Flink statefun, the job eventually throws the following exception after failing to reach the endpoint or if the endpoint fails after the exponentially increasing delay. java.util.concurrent.RejectedExecutionException: org.apache.flink.streaming.runtime.tasks.mailbox.Task

Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-03 Thread weijie guo
It's really an amazing job to fill in the defects of flink in batch shuffle. I really appreciate the work done in io scheduling, the sequential reading of the shuffle reader can greatly improve the disk IO performance and stability. Sort-based shuffle realizes this feature in a concise and efficien

Proactively Push Metrics After Tasks Finished

2021-11-03 Thread kanata163
Hi, all: As discussed in FLIP-147 [1], checkpoints are supported after tasks finished, but metrics not. As we known, metricReporters are reported periodically, default 10 seconds. If sources are bounded, the final metrics may not be reported to metric system like pushgateway when task finished.

Re: Reactive mode in 1.13

2021-11-03 Thread Till Rohrmann
Hi Ravi, I think you can pass the arguments to the job via `./bin/standalone-job.sh start -Dscheduler-mode=reactive -Dexecution.checkpointing.interval="3000s" lib/tornado.jar myArguments`. Cheers, Till On Wed, Nov 3, 2021 at 5:20 AM Ravi Sankar Reddy Sangana wrote: > Thanks a lot working fine

Re: Data Stream countWindow followed by keyBy does not preserve time order

2021-11-03 Thread Guowei Ma
Hi, Yan I do not think it is a bug. Maybe we could not assume the input's order of an operator simply. Best, Guowei On Wed, Nov 3, 2021 at 3:10 PM Yan Shen wrote: > Yes, it does not happen in streaming mode. Is this considered a bug or is > it by design? > > Thanks! > > On Wed, Nov 3, 2021 at 1

Re: Question on BoundedOutOfOrderness

2021-11-03 Thread Guowei Ma
Hi Oliver I think Alexey is right that you could not assume that the record would be output in the event time order. And there is a small addition.I see your output and there are actually multiple concurrencies (probably 11 subtasks). You also can't expect these concurrencies to be ordered accordi

Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-03 Thread Lijie Wang
Thanks Daisy and Kevin for bringing this blog, it is very helpful for understanding the principle of sort shuffle. Best, Lijie Guowei Ma 于2021年11月3日周三 下午2:57写道: > > Thank Daisy& Kevin much for your introduction to the improvement of TM > blocking shuffle, credit base+io scheduling is indeed a

Re: Why do the count windows in Flink Table APIs require processing time for sorting whereas in Flink Datastream APIs they do not

2021-11-03 Thread Guowei Ma
Hi Long >From the API point of view, this processing time can be omitted. This is mainly for unification: event-time&processing-time scenarios, and alignment with other window APIs. Thanks Jark Wu for telling me this offline. Best, Guowei On Wed, Nov 3, 2021 at 11:55 AM Long Nguyễn wrote: >

Re: Is there a way to update checkpoint configuration for a job "in-place"?

2021-11-03 Thread Guowei Ma
Hi Kevin If you want to change this configuration(execution.checkpointing.timeout) without restarting the job, as far as I know, there may not be such a method. But could you consider increasing this value by default? Best, Guowei On Wed, Nov 3, 2021 at 5:15 AM Kevin Lam wrote: > Hi all, > > W

Re: How to refresh topics to ingest with KafkaSource?

2021-11-03 Thread Martijn Visser
Hi Mason, I've assigned it to you. Best regards, Martijn On Tue, 2 Nov 2021 at 23:28, Mason Chen wrote: > Hi Arvid, > > I have some bandwidth to contribute to this task and am familiar with the > code. Could you or another committer assign me this ticket? > > Thanks, > Mason > > On Oct 30, 20

Re: Data Stream countWindow followed by keyBy does not preserve time order

2021-11-03 Thread Yan Shen
Yes, it does not happen in streaming mode. Is this considered a bug or is it by design? Thanks! On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma wrote: > Hi > > I did not run your program directly, but I see that you are now using the > Batch execution mode. I suspect it is related to this, because in