Re: Flink job manager conditional start of flink jobs

2023-04-11 Thread Gen Luo
Hi, Is the job you want to start running or already finished? If the job is running, this is simply a failover or a JM failover case. While if the job has finished, there's no such feature that can restart the job automatically, AFAIK. The job has to be submitted again. On Wed, Apr 12, 2023 at 1

Re: Fast and slow stream sources for Interval Join

2023-03-02 Thread Gen Luo
Hi all, A hacky but simple way I have tried is to chain a map function to the fast source. The function checks the timestamp of the input message, if current time - timestamp less than a certain value, sleeps for a while until the message is late enough, then passes it through. Further messages sh

Re: RE: Re: Re: Should we always mark ValueState as "transient" forRichFunctions

2023-02-26 Thread Gen Luo
for your explanation. > > > > > > Back to this code snippet, since they are not marked with "transient" > > > now, I suppose Flink will use avro to serialize them (null values). Is > > > there any benchmark to show the performance test between null valu

Re: Should we always mark ValueState as "transient" for RichFunctions

2023-02-23 Thread Gen Luo
Hi, ValueState is a handle rather than an actual value. So it should never be serialized. In fact, ValueState itself is not a Serializable. It should be ok to always mark it as transient. In this case, I suppose it works because the ValueState is not set (which happens during the runtime) when th

Re: Non-temporal watermarks

2023-02-02 Thread Gen Luo
Hi, This is an interesting topic. I suppose the watermark is defined based on the event time since it's mainly used, or designed, for the event time processing. Flink provides the event time processing mechanism because it's widely needed. Every event has its event time and we usually need to grou

Re: Understanding pipelined regions

2022-12-23 Thread Gen Luo
lined-region-sheduling.html#intermediate-results > > On Tue, Dec 20, 2022 at 3:26 PM Gen Luo wrote: > >> Hi Raihan, >> >> As the description of PipelinedRegion says, a pipelined region is a set >> of vertices connected via pipelined data exchanges. For example in a j

Re: Understanding pipelined regions

2022-12-20 Thread Gen Luo
Hi Raihan, As the description of PipelinedRegion says, a pipelined region is a set of vertices connected via pipelined data exchanges. For example in a job with such a dag A->B, both of the tasks have two subtasks. If the edge between A and B is a forward edge, there are two pipelined regions: (A1

Re: (Co)ProcessFunction vs Keyed(Co)ProcessFunction

2022-12-02 Thread Gen Luo
nction());` > stream.keyBy(...).process(new MyKeyedCoProcessFunction());` > ``` > > will just work well with keyed state & timers if I'm not mistaken since in > both cases you are on a keyed stream. > > Salva > > > > On Fri, Dec 2, 2022 at 3:27 AM Gen Luo

Re: (Co)ProcessFunction vs Keyed(Co)ProcessFunction

2022-12-01 Thread Gen Luo
Hi Salva, I suppose what you are missing is that, the timers are stored in the keyed state, so you may only register timers when using KeyedCoProcessFunction. If you try to register a timer in the CoProcessFunction, you'll get an UnsupportedOperationException with the message "Setting timers is on

Re: Difference between DataStream.broadcast() vs DataStream.broadcast(MapStateDescriptor)

2022-12-01 Thread Gen Luo
Datastream.broadcast only determines the distribution behavior. All elements from the stream will broadcast to all the downstream tasks. Its downstream can be a single input processing operator, or a co-processing operator if it's connected to another stream. DataStream.broadcast(MapStateDescripto

Re: Any caveats about processing abstract classes ?

2022-11-14 Thread Gen Luo
don't have to use it. On Sat, Nov 12, 2022 at 8:24 AM Davide Bolcioni via user < user@flink.apache.org> wrote: > Thanks, > On Thu, Nov 10, 2022 at 6:21 PM Gen Luo wrote > >> >> I suppose it would be fine. The only difference I can figure out that may >> matter

Re: Any caveats about processing abstract classes ?

2022-11-10 Thread Gen Luo
Hi Davide, I suppose it would be fine. The only difference I can figure out that may matter is the serialization. Flink uses KryoSerializer as the fallback serializer if the TypeInformation of the records is not provided, which can properly process abstract classes. This works well in most cases.

Re: [DISCUSS] Replace Attempt column with Attempt Number on the subtask list page of the Web UI

2022-07-20 Thread Gen Luo
id that is > different from the corresponding attempt number in REST, metrics and logs. > It adds burden to users to do the mapping in troubleshooting. Mis-mapping > can be easy to happen and result in a waste of efforts and wrong > conclusion. > > Therefore, +1 for this proposal.

Re: Flatmap operator in an Asynchronous call

2022-03-07 Thread Gen Luo
Hi Diwakar, An asynchronous flatmap function without the support of the framework can be problematic. You should not call collector.collect outside the main thread of the task, i.e. outside the flatMap method. I'd suggest using a customized Source instead to process the files, which uses a SplitE

Re: New blog post published - Sort-Based Blocking Shuffle Implementation in Flink

2021-11-02 Thread Gen Luo
Thanks Daisy and Kevin! The benchmark results look really exciting! On Tue, Nov 2, 2021 at 4:38 PM David Morávek wrote: > Thanks Daisy and Kevin for a great write up! ;) Especially the 2nd part > was really interesting, I really like the idea of the single spill file > with a custom scheduling o

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-22 Thread Gen Luo
> Hbase). If most of those calls are very fast, sometimes when the system >> is >> > under heavy load they may block more than a few seconds, and having our >> app >> > killed because of a short timeout is not an option. >> > >> > >> > >

Re: [DISCUSS] FLIP-185: Shorter heartbeat timeout and interval default values

2021-07-21 Thread Gen Luo
Hi, Thanks for driving this @Till Rohrmann . I would give +1 on reducing the heartbeat timeout and interval, though I'm not sure if 15s and 3s would be enough either. IMO, except for the standalone cluster, where the heartbeat mechanism in Flink is totally relied, reducing the heartbeat can also

Re: Job Recovery Time on TM Lost

2021-07-08 Thread Gen Luo
In our inner >> flink version, we optimize it by task's report and jobmaster's probe. When >> a task fails because of the connection, it reports to the jobmaster. The >> jobmaster will try to confirm the liveness of the unconnected >> taskmanager for certain times by con

Re: Job Recovery Time on TM Lost

2021-07-06 Thread Gen Luo
one could say that one can disable reacting to a failed heartbeat RPC as it > is currently the case. > > We currently have a discussion about this on this PR [1]. Maybe you wanna > join the discussion there and share your insights. > > [1] https://github.com/apache/flink/pull/16357 &

Re: Job Recovery Time on TM Lost

2021-07-05 Thread Gen Luo
ailability of the remote system (e.g. a couple of lost heartbeat > messages). > > Cheers, > Till > > On Mon, Jul 5, 2021 at 10:00 AM Gen Luo wrote: > >> As far as I know, a TM will report connection failure once its connected >> TM is lost. I suppose JM can believe the

Re: Job Recovery Time on TM Lost

2021-07-05 Thread Gen Luo
TaskExecutors as fast as the heartbeat interval. > > [1] https://issues.apache.org/jira/browse/FLINK-23209 > > Cheers, > Till > > On Fri, Jul 2, 2021 at 9:33 AM Gen Luo wrote: > >> Thanks for sharing, Till and Yang. >> >> @Lu >> Sorry but I don't know

Re: Job Recovery Time on TM Lost

2021-07-02 Thread Gen Luo
least `heartbeat.timeout` time before the system recovers. Even >>>>>> if >>>>>> the cancellation happens fast (e.g. by having configured a low >>>>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the >>&g

Re: Job Recovery Time on TM Lost

2021-06-30 Thread Gen Luo
I'm also wondering here. In my opinion, it's because the JM can not confirm whether the TM is lost or it's a temporary network trouble and will recover soon, since I can see in the log that akka has got a Connection refused but JM still sends a heartbeat request to the lost TM until it reaches hea

Re: Job Recovery Time on TM Lost

2021-06-29 Thread Gen Luo
Hi Lu, We found almost the same thing when we were trying failover in a large scale job. The akka.ask.timeout and heartbeat.timeout were set to 10min for the test, and we found that the job would take 10min to recover from TM lost. We reached the conclusion that the behavior is expected in the Fl