[ANNOUNCE] Flink Jira Bot fully live (& Useful Filters to Work with the Bot)

2021-04-22 Thread Konstantin Knauf
Hi everyone,

all of the Jira Bot rules are live now. Particularly in the beginning the
Jira Bot will be very active, because the rules apply to a lot of old,
stale tickets. So, if you get a huge amount of emails from the Flink Jira
Bot right now, this will get better. In any case, the Flink Jira Bot (or
the rules that it implements) demand some changes to how we work with Jira.

Here are a few points to make this transition easier for us:

*1) Retrospective*

In 3-4 weeks I would like to collect feedback. What is working well? What
is not working well or getting in your way? Is the bot moving us closer to
the goals mentioned in the initial email? Specifically, the
initial parameterization [1] of the bot was kind of an educated guess. I
will open a [DISCUSS]ion thread to collect feedback and proposals for
changes around that time.

*2) Use Sub-Tasks*

The bot will ask you for an update on assigned tickets after quite a short
time for Flink standards. If you are working on a ticket that takes longer,
consider splitting it up in Sub-Tasks. Activity on Sub-Tasks counts as
activity for the parent ticket, too. So, as long as any subtask is moving
along you won't be nagged by the bot.


*3) Useful Filters*

You've probably received a lot of emails already, in particular if you are
watching many tickets. Here are a few JIRA filters to find the tickets,
that are probably most important to you and have been updated by the bot:

Tickets that *you are assigned to*, which are "stale-assigned"

https://issues.apache.org/jira/issues/?filter=12350499

Tickets that *you reported*, which are stale in anyway:

https://issues.apache.org/jira/issues/?filter=12350500

If you are a maintainer of some components, you might find the following
filters useful (replace with your own components):

*All tickets that are about to be closed*
project = FLINK AND component in ("Build System", "BuildSystem / Shaded",
"Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
flink-docker, "Release System", "Runtime / Coordination", "Runtime /
Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
resolution = Unresolved AND labels in (stale-minor)

*Bugs that are about to be deprioritized or closed*
project = FLINK AND type = BUG AND component in ("Build System",
"BuildSystem / Shaded", "Deployment / Kubernetes", "Deployment / Mesos",
"Deployment / YARN", flink-docker, "Release System", "Runtime /
Coordination", "Runtime / Metrics", "Runtime / Queryable State", "Runtime /
REST", Travis) AND resolution = Unresolved AND labels in (stale-major,
stale-blocker, stale-critical, stale-minor)


*Tickets that are stale-assigned, but already have a PR available*project =
FLINK AND component in ("Build System", "BuildSystem / Shaded", "Deployment
/ Kubernetes", "Deployment / Mesos", "Deployment / YARN", flink-docker,
"Release System", "Runtime / Coordination", "Runtime / Metrics", "Runtime /
Queryable State", "Runtime / REST", Travis) AND resolution = Unresolved AND
labels in (stale-assigned) AND labels in (pull-request-available)

Cheers,

Konstantin

[1] https://github.com/apache/flink-jira-bot/blob/master/config.yaml


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread Matthias Pohl
Hi,
I have a few questions about your case:
* What is the option you're referring to for the bounded shuffle? That
might help to understand what streaming mode solution you're looking for.
* What does the job graph look like? Are you assuming that it's due to a
shuffling operation? Could you provide the logs to get a better
understanding of your case?
* Do you observe the same memory increase for other TaskManager nodes?
* Are you expecting to reach the memory limits considering that you
mentioned a "big state size"? Would increasing the memory limit be an
option or do you fear that it's caused by some memory leak?

Bet,
Matthias

On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:

> The Flink version we used is 1.12.0.
>
> 马阳阳
> ma_yang_y...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
> On 04/16/2021 16:07,马阳阳 
> wrote:
>
> Hi, community,
> When running a Flink streaming job with big state size, one task manager
> process was killed by the yarn node manager. The following log is from the
> yarn node manager:
>
> 2021-04-16 11:51:23,013 WARN
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> Container
> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
> Killing container.
>
> When searching solution for this problem, I found that there is a option
> for this that worked for bounded shuffle. So is there a way to get rid of
> this in streaming mode?
>
> PS:
> memory related options:
> taskmanager.memory.process.size:12288m
> taskmanager.memory.managed.fraction:0.7
>
>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
I saw
https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
and this seems to suggest a straight up filter, but I am not sure how does
that filter works as in would it factor is the lateness when filtering ?

On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi 
wrote:

> Well it was not a solution after all. We now have a session window that is
> stuck with the same issue albeit  after the additional lateness. I had
> increased the lateness to 2 days and that masked the issue which again
> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
> ) before. This is very disconcerting.
>
> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
> an event-time window cannot become earlier than the current watermark by
> merging. Current watermark: 1619053742129 window: TimeWindow{start=
> 161883663, end=1618879580402}
>
>
>
> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi 
> wrote:
>
>> Hey folks,
>>I had a pipe with sessionization restarts and then fail
>> after retries with this exception. The only thing I had done was to
>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>> increasing the lateness created this and the way I solved this was to
>> increase the lateness further. Could this be if there are TMs in the
>> cluster whose time is off ( as in not synchronized )  ?
>>
>> 2021-04-21 11:27:58
>> java.lang.UnsupportedOperationException: The end timestamp of an
>> event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>> 1618878336107, end=1618880140466}
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator$2.merge(WindowOperator.java:339)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator$2.merge(WindowOperator.java:321)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.processElement(WindowOperator.java:319)
>> at org.apache.flink.streaming.runtime.tasks.
>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>> .java:191)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .processElement(StreamTaskNetworkInput.java:204)
>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>> .emitNext(StreamTaskNetworkInput.java:174)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:65)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:396)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:191)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .runMailboxLoop(StreamTask.java:617)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:581)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>> at java.lang.Thread.run(Thread.java:748)
>>
>>
>>


Re: Flink Hadoop config on docker-compose

2021-04-22 Thread Flavio Pompermaier
Great! Thanks for the support

On Thu, Apr 22, 2021 at 2:57 PM Matthias Pohl 
wrote:

> I think you're right, Flavio. I created FLINK-22414 to cover this. Thanks
> for bringing it up.
>
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-22414
>
> On Fri, Apr 16, 2021 at 9:32 AM Flavio Pompermaier 
> wrote:
>
>> Hi Yang,
>> isn't this something to fix? If I look at the documentation at  [1], in
>> the "Passing configuration via environment variables" section, there is:
>>
>> "The environment variable FLINK_PROPERTIES should contain a list of Flink
>> cluster configuration options separated by new line,
>> the same way as in the flink-conf.yaml. FLINK_PROPERTIES takes precedence
>> over configurations in flink-conf.yaml."
>>
>> To me this means that if I specify "env.hadoop.conf.dir" it should be
>> handled as well. Am I wrong?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html
>>
>> Best,
>> Flavio
>>
>> On Fri, Apr 16, 2021 at 4:52 AM Yang Wang  wrote:
>>
>>> It seems that we do not export HADOOP_CONF_DIR as environment variables
>>> in current implementation, even though we have set the env.xxx flink config
>>> options. It is only used to construct the classpath for the JM/TM process.
>>> However, in "HadoopUtils"[2] we do not support getting the hadoop
>>> configuration from classpath.
>>>
>>>
>>> [1].
>>> https://github.com/apache/flink/blob/release-1.11/flink-dist/src/main/flink-bin/bin/config.sh#L256
>>> [2].
>>> https://github.com/apache/flink/blob/release-1.11/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java#L64
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Best,
>>> Yang
>>>
>>> Flavio Pompermaier  于2021年4月16日周五 上午3:55写道:
>>>
 Hi Robert,
 indeed my docker-compose does work only if I add also Hadoop and yarn
 home while I was expecting that those two variables were generated
 automatically just setting env.xxx variables in FLINK_PROPERTIES variable..

 I just want to understand what to expect, if I really need to specify
 Hadoop and yarn home as env variables or not

 Il gio 15 apr 2021, 20:39 Robert Metzger  ha
 scritto:

> Hi,
>
> I'm not aware of any known issues with Hadoop and Flink on Docker.
>
> I also tried what you are doing locally, and it seems to work:
>
> flink-jobmanager| 2021-04-15 18:37:48,300 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - 
> Starting
> StandaloneSessionClusterEntrypoint.
> flink-jobmanager| 2021-04-15 18:37:48,338 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
> default filesystem.
> flink-jobmanager| 2021-04-15 18:37:48,375 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
> security context.
> flink-jobmanager| 2021-04-15 18:37:48,404 INFO
>  org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
> user set to flink (auth:SIMPLE)
> flink-jobmanager| 2021-04-15 18:37:48,408 INFO
>  org.apache.flink.runtime.security.modules.JaasModule [] - Jaas
> file will be created as /tmp/jaas-811306162058602256.conf.
> flink-jobmanager| 2021-04-15 18:37:48,415 INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> Initializing cluster services.
>
> Here's my code:
>
> https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39
>
> Hope this helps!
>
> On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> Hi everybody,
>> I'm trying to set up reading from HDFS using docker-compose and Flink
>> 1.11.3.
>> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
>> using FLINK_PROPERTIES (under environment section of the docker-compose
>> service) I see in the logs the following line:
>>
>> "Could not find Hadoop configuration via any of the supported method"
>>
>> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
>> generated by the run scripts.
>> Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under
>> environment section of the docker-compose service) I don't see that line.
>>
>> Is this the expected behavior?
>>
>> Below the relevant docker-compose service I use (I've removed the
>> content of HADOOP_CLASSPATH content because is too long and I didn't 
>> report
>> the taskmanager that is similar):
>>
>> flink-jobmanager:
>> container_name: flink-jobmanager
>> build:
>>   context: .
>>   dockerfile: Dockerfile.flink
>>   args:
>> FLINK_VERSION: 1.11.3-scala_2.12-java11
>> image: 'flink-test:1.11.3-scala_2.12-java11'
>> ports:
>>   - "8091:8081"
>>   - "8092

Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
Well it was not a solution after all. We now have a session window that is
stuck with the same issue albeit  after the additional lateness. I had
increased the lateness to 2 days and that masked the issue which again
reared it's head after the 2 days ;lateness was over ( instead of the 1 day
) before. This is very disconcerting.

Caused by: java.lang.UnsupportedOperationException: The end timestamp of an
event-time window cannot become earlier than the current watermark by
merging. Current watermark: 1619053742129 window: TimeWindow{start=
161883663, end=1618879580402}



On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi 
wrote:

> Hey folks,
>I had a pipe with sessionization restarts and then fail
> after retries with this exception. The only thing I had done was to
> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
> from SP and it ran for 12 hours plus without issue. I cannot imagine that
> increasing the lateness created this and the way I solved this was to
> increase the lateness further. Could this be if there are TMs in the
> cluster whose time is off ( as in not synchronized )  ?
>
> 2021-04-21 11:27:58
> java.lang.UnsupportedOperationException: The end timestamp of an
> event-time window cannot become earlier than the current watermark by
> merging. Current watermark: 1618966593999 window: TimeWindow{start=
> 1618878336107, end=1618880140466}
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$2.merge(WindowOperator.java:339)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$2.merge(WindowOperator.java:321)
> at org.apache.flink.streaming.runtime.operators.windowing.
> MergingWindowSet.addWindow(MergingWindowSet.java:209)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:319)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
> .java:191)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:204)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:174)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:396)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:191)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:617)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:581)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> at java.lang.Thread.run(Thread.java:748)
>
>
>


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread dhanesh arole
Hi,

Questions that @matth...@ververica.com  asked are
very valid and might provide more leads. But if you haven't already then
it's worth trying to use jemalloc / tcmalloc. We had similar problems with
slow growth in TM memory resulting in pods getting OOMed by k8s. After
switching to jemalloc, the memory foot print improved dramatically.


-
Dhanesh Arole ( Sent from mobile device. Pardon me for typos )



On Thu, Apr 22, 2021 at 1:39 PM Matthias Pohl 
wrote:

> Hi,
> I have a few questions about your case:
> * What is the option you're referring to for the bounded shuffle? That
> might help to understand what streaming mode solution you're looking for.
> * What does the job graph look like? Are you assuming that it's due to a
> shuffling operation? Could you provide the logs to get a better
> understanding of your case?
> * Do you observe the same memory increase for other TaskManager nodes?
> * Are you expecting to reach the memory limits considering that you
> mentioned a "big state size"? Would increasing the memory limit be an
> option or do you fear that it's caused by some memory leak?
>
> Bet,
> Matthias
>
> On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:
>
>> The Flink version we used is 1.12.0.
>>
>> 马阳阳
>> ma_yang_y...@163.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 04/16/2021 16:07,马阳阳 
>> wrote:
>>
>> Hi, community,
>> When running a Flink streaming job with big state size, one task manager
>> process was killed by the yarn node manager. The following log is from the
>> yarn node manager:
>>
>> 2021-04-16 11:51:23,013 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container
>> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
>> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
>> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
>> Killing container.
>>
>> When searching solution for this problem, I found that there is a option
>> for this that worked for bounded shuffle. So is there a way to get rid of
>> this in streaming mode?
>>
>> PS:
>> memory related options:
>> taskmanager.memory.process.size:12288m
>> taskmanager.memory.managed.fraction:0.7
>>
>>


Re: Flink Statefun Python Batch

2021-04-22 Thread Igal Shilman
Hi Tim,

I've created a tiny PoC, let me know if this helps,
I can't guarantee tho, that this is how we'll eventually approach this, but
it should be somewhere along these lines.

https://github.com/igalshilman/flink-statefun/tree/tim

Thanks,
Igal.


On Thu, Apr 22, 2021 at 6:53 AM Timothy Bess  wrote:

> Hi Igal and Konstantin,
>
> Wow! I appreciate the offer of creating a branch to test with, but for now
> we were able to get it working by tuning a few configs and moving other
> blocking IO out of statefun, so no rush there. That said if you do add
> that, I'd definitely switch over.
>
> That's great! I'll try to think up some suggestions to put into those
> tickets. Yeah I'd be up for a call on Thursday or Friday If you're free
> then, just let me know (my timezone is EDT).
>
> Thanks,
>
> Tim
>
> On Wed, Apr 21, 2021, 4:18 AM Konstantin Knauf 
> wrote:
>
>> Hi Igal, Hi Timothy,
>>
>> this sounds very interesting. Both state introspection as well as
>> OpenTracing support have been requested by multiple users before, so
>> certainly something we are willing to invest into. Timothy, would you have
>> time for a 30min call in the next days to understand your use case and
>> requirements better? In the meantime, let's document these feature requests
>> in Jira.
>>
>> * Exposing Batches to SDKs:
>> https://issues.apache.org/jira/browse/FLINK-22389
>> * Support for OpenTracing:
>> https://issues.apache.org/jira/browse/FLINK-22390
>> * Support for State Introspection:
>> https://issues.apache.org/jira/browse/FLINK-22391
>>
>> Please feel free to edit, comment on these issues directly, too.
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>>
>> Am Mi., 21. Apr. 2021 um 09:15 Uhr schrieb Igal Shilman <
>> i...@ververica.com>:
>>
>>> Hi Tim,
>>>
>>> Yes, I think that this feature can be implemented relatively fast.
>>> If this blocks you at the moment, I can prepare a branch for you to
>>> experiment with, in the following days.
>>>
>>> Regarding to open tracing integration, I think the community can benefit
>>> a lot out of this,
>>> and definitely contributions are welcome!
>>>
>>> @Konstantin Knauf  would you like to understand more
>>> in depth, Tim's use case with opentracing?
>>>
>>> Thanks,
>>> Igal.
>>>
>>>
>>>
>>> On Tue, Apr 20, 2021 at 8:10 PM Timothy Bess  wrote:
>>>
 Hi Igal,

 Yes! that's exactly what I was thinking. The batching will naturally
 happen as the model applies backpressure. We're using pandas and it's
 pretty costly to create a dataframe and everything to process a single
 event. Internally the SDK has access to the batch and is calling my
 function, which creates a dataframe for each individual event. This causes
 a ton of overhead since we basically get destroyed by the constant factors
 around creating and operating on dataframes.

 Knowing how the SDK works, it seems like it'd be easy to do something
 like your example and maybe have a different decorator for "batch
 functions" where the SDK just passes in everything at once.

 Also just out of curiosity are there plans to build out more
 introspection into statefun's flink state? I was thinking it would be super
 useful to add either Queryable state or have some control topic that
 statefun listens to that allows me to send events to introspect or modify
 flink state.

 For example like:

 // control topic request
 {"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
 // response
 {"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }

 Or

 {"type": "SetState", "namespace": "foo", "type": "bar", "id": "1",
 value: "base64bytes"}
 {"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}

 Also having opentracing integration where Statefun passes b3 headers
 with each request so we can trace a message's route through statefun would
 be _super_ useful. We'd literally be able to see the entire path of an
 event from ingress to egress and time spent in each function. Not sure if
 there are any plans around that, but since we're live with a statefun
 project now, it's possible we could contribute some if you guys are open to
 it.

 Thanks,

 Tim

 On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman 
 wrote:

> Hi Tim!
>
> Indeed the StateFun SDK / StateFun runtime, has an internal concept of
> batching, that kicks in the presence of a slow
> /congested remote function. Keep in mind that under normal
> circumstances batching does not happen (effectively a batch of size 1 will
> be sent). [1]
> This batch is not currently exposed via the SDKs (both Java and
> Python) as it is an implementation detail (see [2]).
>
> The way I understand your message (please correct me if I'm wrong): is
> that evaluation of the ML model is costly, and it would benefit from some
> sort of batching (like panda

Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Matthias Pohl
Hi Vishal,
based on the error message and the behavior you described, introducing a
filter for late events is the way to go - just as described in the SO
thread you mentioned. Usually, you would collect late events in some kind
of side output [1].

I hope that helps.
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi 
wrote:

> I saw
> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
> and this seems to suggest a straight up filter, but I am not sure how does
> that filter works as in would it factor is the lateness when filtering ?
>
> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi 
> wrote:
>
>> Well it was not a solution after all. We now have a session window that
>> is stuck with the same issue albeit  after the additional lateness. I had
>> increased the lateness to 2 days and that masked the issue which again
>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>> ) before. This is very disconcerting.
>>
>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>> an event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>> 161883663, end=1618879580402}
>>
>>
>>
>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hey folks,
>>>I had a pipe with sessionization restarts and then fail
>>> after retries with this exception. The only thing I had done was to
>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>> increasing the lateness created this and the way I solved this was to
>>> increase the lateness further. Could this be if there are TMs in the
>>> cluster whose time is off ( as in not synchronized )  ?
>>>
>>> 2021-04-21 11:27:58
>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>> event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>> 1618878336107, end=1618880140466}
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:339)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:321)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:319)
>>> at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:191)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:204)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:174)
>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:65)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:396)
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:191)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:617)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:581)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>


Re: Flink Hadoop config on docker-compose

2021-04-22 Thread Matthias Pohl
I think you're right, Flavio. I created FLINK-22414 to cover this. Thanks
for bringing it up.

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-22414

On Fri, Apr 16, 2021 at 9:32 AM Flavio Pompermaier 
wrote:

> Hi Yang,
> isn't this something to fix? If I look at the documentation at  [1], in
> the "Passing configuration via environment variables" section, there is:
>
> "The environment variable FLINK_PROPERTIES should contain a list of Flink
> cluster configuration options separated by new line,
> the same way as in the flink-conf.yaml. FLINK_PROPERTIES takes precedence
> over configurations in flink-conf.yaml."
>
> To me this means that if I specify "env.hadoop.conf.dir" it should be
> handled as well. Am I wrong?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/standalone/docker.html
>
> Best,
> Flavio
>
> On Fri, Apr 16, 2021 at 4:52 AM Yang Wang  wrote:
>
>> It seems that we do not export HADOOP_CONF_DIR as environment variables
>> in current implementation, even though we have set the env.xxx flink config
>> options. It is only used to construct the classpath for the JM/TM process.
>> However, in "HadoopUtils"[2] we do not support getting the hadoop
>> configuration from classpath.
>>
>>
>> [1].
>> https://github.com/apache/flink/blob/release-1.11/flink-dist/src/main/flink-bin/bin/config.sh#L256
>> [2].
>> https://github.com/apache/flink/blob/release-1.11/flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java#L64
>>
>>
>> Best,
>> Yang
>>
>> Best,
>> Yang
>>
>> Flavio Pompermaier  于2021年4月16日周五 上午3:55写道:
>>
>>> Hi Robert,
>>> indeed my docker-compose does work only if I add also Hadoop and yarn
>>> home while I was expecting that those two variables were generated
>>> automatically just setting env.xxx variables in FLINK_PROPERTIES variable..
>>>
>>> I just want to understand what to expect, if I really need to specify
>>> Hadoop and yarn home as env variables or not
>>>
>>> Il gio 15 apr 2021, 20:39 Robert Metzger  ha
>>> scritto:
>>>
 Hi,

 I'm not aware of any known issues with Hadoop and Flink on Docker.

 I also tried what you are doing locally, and it seems to work:

 flink-jobmanager| 2021-04-15 18:37:48,300 INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Starting
 StandaloneSessionClusterEntrypoint.
 flink-jobmanager| 2021-04-15 18:37:48,338 INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
 default filesystem.
 flink-jobmanager| 2021-04-15 18:37:48,375 INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Install
 security context.
 flink-jobmanager| 2021-04-15 18:37:48,404 INFO
  org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
 user set to flink (auth:SIMPLE)
 flink-jobmanager| 2021-04-15 18:37:48,408 INFO
  org.apache.flink.runtime.security.modules.JaasModule [] - Jaas
 file will be created as /tmp/jaas-811306162058602256.conf.
 flink-jobmanager| 2021-04-15 18:37:48,415 INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
 Initializing cluster services.

 Here's my code:

 https://gist.github.com/rmetzger/0cf4ba081d685d26478525bf69c7bd39

 Hope this helps!

 On Wed, Apr 14, 2021 at 5:37 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> Hi everybody,
> I'm trying to set up reading from HDFS using docker-compose and Flink
> 1.11.3.
> If I pass 'env.hadoop.conf.dir' and 'env.yarn.conf.dir'
> using FLINK_PROPERTIES (under environment section of the docker-compose
> service) I see in the logs the following line:
>
> "Could not find Hadoop configuration via any of the supported method"
>
> If I'm not wrong, this means that the HADOOP_CONF_DIR is actually not
> generated by the run scripts.
> Indeed, If I add HADOOP_CONF_DIR and YARN_CONF_DIR (always under
> environment section of the docker-compose service) I don't see that line.
>
> Is this the expected behavior?
>
> Below the relevant docker-compose service I use (I've removed the
> content of HADOOP_CLASSPATH content because is too long and I didn't 
> report
> the taskmanager that is similar):
>
> flink-jobmanager:
> container_name: flink-jobmanager
> build:
>   context: .
>   dockerfile: Dockerfile.flink
>   args:
> FLINK_VERSION: 1.11.3-scala_2.12-java11
> image: 'flink-test:1.11.3-scala_2.12-java11'
> ports:
>   - "8091:8081"
>   - "8092:8082"
> command: jobmanager
> environment:
>   - |
> FLINK_PROPERTIES=
> jobmanager.rpc.address: flink-jobmanager
> rest.port: 8081
> historyserver.web.po

Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
I can do that, but I am not certain this is the right filter.  Can you
please validate. That aside I already have the lateness configured for the
session window ( the normal withLateNess() )  and this looks like a session
window was not collected and still is alive for some reason ( a flink bug ?
)

if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}


On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
wrote:

> Hi Vishal,
> based on the error message and the behavior you described, introducing a
> filter for late events is the way to go - just as described in the SO
> thread you mentioned. Usually, you would collect late events in some kind
> of side output [1].
>
> I hope that helps.
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi 
> wrote:
>
>> I saw
>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>> and this seems to suggest a straight up filter, but I am not sure how does
>> that filter works as in would it factor is the lateness when filtering ?
>>
>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Well it was not a solution after all. We now have a session window that
>>> is stuck with the same issue albeit  after the additional lateness. I had
>>> increased the lateness to 2 days and that masked the issue which again
>>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>>> ) before. This is very disconcerting.
>>>
>>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>>> an event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>>> 161883663, end=1618879580402}
>>>
>>>
>>>
>>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Hey folks,
I had a pipe with sessionization restarts and then fail
 after retries with this exception. The only thing I had done was to
 increase the lateness by 12 hours ( to  a day )  in this pipe and restart
 from SP and it ran for 12 hours plus without issue. I cannot imagine that
 increasing the lateness created this and the way I solved this was to
 increase the lateness further. Could this be if there are TMs in the
 cluster whose time is off ( as in not synchronized )  ?

 2021-04-21 11:27:58
 java.lang.UnsupportedOperationException: The end timestamp of an
 event-time window cannot become earlier than the current watermark by
 merging. Current watermark: 1618966593999 window: TimeWindow{start=
 1618878336107, end=1618880140466}
 at org.apache.flink.streaming.runtime.operators.windowing.
 WindowOperator$2.merge(WindowOperator.java:339)
 at org.apache.flink.streaming.runtime.operators.windowing.
 WindowOperator$2.merge(WindowOperator.java:321)
 at org.apache.flink.streaming.runtime.operators.windowing.
 MergingWindowSet.addWindow(MergingWindowSet.java:209)
 at org.apache.flink.streaming.runtime.operators.windowing.
 WindowOperator.processElement(WindowOperator.java:319)
 at org.apache.flink.streaming.runtime.tasks.
 OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
 OneInputStreamTask.java:191)
 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
 .processElement(StreamTaskNetworkInput.java:204)
 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
 .emitNext(StreamTaskNetworkInput.java:174)
 at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
 .processInput(StreamOneInputProcessor.java:65)
 at org.apache.flink.streaming.runtime.tasks.StreamTask
 .processInput(StreamTask.java:396)
 at org.apache.flink.streaming.runtime.tasks.mailbox.
 MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
 at org.apache.flink.streaming.runtime.tasks.StreamTask
 .runMailboxLoop(StreamTask.java:617)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
 StreamTask.java:581)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
 at java.lang.Thread.run(Thread.java:748)



>


Re: MemoryStateBackend Issue

2021-04-22 Thread Matthias Pohl
Hi Milind,
I bet someone else might have a faster answer. But could you provide the
logs and config to get a better understanding of what your issue is?
In general, the state is maintained even in cases where a TaskManager fails.

Best,
Matthias

On Thu, Apr 22, 2021 at 5:11 AM Milind Vaidya  wrote:

> Hi
>
> I see MemoryStateBackend being used in TM Log
>
> org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend
> has been configured, using default (Memory / JobManager)
> MemoryStateBackend (data in heap memory / checkpoints to JobManager)
> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE,
> maxStateSize: 5242880)
>
>
>
> I am logging checkpointed value which is just message count
>
> Snapshot the state 500
> Snapshot the state 1000
>
>
> When I restart the job i.e. new TM but the job manager is same I see
>
> Snapshot the state 500
>
> In the JM logs I see following entries
>
> Triggering checkpoint 1
> Triggering checkpoint 2
>
> After restarting job hence new TM
>
> Triggering checkpoint 1
>
> As per my understanding JM should hold the checkpointed
> 
> state across TM ? Am I correct?
>
> I have not configured anything special and using default. Do I need to add
> any setting to make it work ?
> I want to maintain message count across the TMs.
>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
The only thing I can think of is to add the lateness configured to the
filter as in here, as in the time on the element + lateness should always
be greater then the current WM. As in the current issue is



Mon Apr 19 20:46:20 EDT 2021.  Window end

Wed Apr 21 21:09:02 EDT 2021,  WM


an event forced this merged window. And it is likely that it has the time
of Mon Apr 19 20:46:20 EDT 2021. We filtering this event out to not hit
https://github.com/aljoscha/flink/blob/2836eccc8498de7a1cad083e6102944471bbd350/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L125


Either ways the solution is yukky and not sure how it happened the first
place ?


public class LateEventFilter extends ProcessFunction, KeyedTimedValue> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue value, Context ctx,
Collector> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}
}
}

On Thu, Apr 22, 2021 at 8:52 AM Vishal Santoshi 
wrote:

> I saw
> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
> and this seems to suggest a straight up filter, but I am not sure how does
> that filter works as in would it factor is the lateness when filtering ?
>
> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi 
> wrote:
>
>> Well it was not a solution after all. We now have a session window that
>> is stuck with the same issue albeit  after the additional lateness. I had
>> increased the lateness to 2 days and that masked the issue which again
>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>> ) before. This is very disconcerting.
>>
>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>> an event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>> 161883663, end=1618879580402}
>>
>>
>>
>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hey folks,
>>>I had a pipe with sessionization restarts and then fail
>>> after retries with this exception. The only thing I had done was to
>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>> increasing the lateness created this and the way I solved this was to
>>> increase the lateness further. Could this be if there are TMs in the
>>> cluster whose time is off ( as in not synchronized )  ?
>>>
>>> 2021-04-21 11:27:58
>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>> event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>> 1618878336107, end=1618880140466}
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:339)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:321)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>> at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:319)
>>> at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:191)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:204)
>>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:174)
>>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:65)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:396)
>>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:191)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:617)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:581)
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>


Re: Long to Timestamp(3) Conversion

2021-04-22 Thread Matthias Pohl
Hi Aeden,
there are some improvements to time conversions coming up in Flink 1.13.
For now, the best solution to overcome this is to provide a user-defined
function.

Hope, that helps.
Best,
Matthias

On Wed, Apr 21, 2021 at 9:36 PM Aeden Jameson 
wrote:

> I've probably overlooked something simple, but when converting a
> datastream to a table how does one convert a long to timestamp(3) that
> will not be your event or proc time.
>
> I've tried
>
> tEnv.createTemporaryView(
> "myTable"
> ,myDatastream
> ,
> ,$("myLongTS").toTimestamp()
> )
> which produces the exception,
>
> org.apache.flink.table.api.ValidationException: Field reference
> expression or alias on field expression expected.
>
> I've also tried,
>
> $("myLongTS").toTimestamp().as("myLongTS")
> $("myLongTS as myLongTS").toTimestamp()
>
> Haven't found Gooogling to be of much help on this one.
>
> --
> Thank You,
> Aeden


Re: Kubernetes Setup - JM as job vs JM as deployment

2021-04-22 Thread Matthias Pohl
Hi Gil,
I'm not sure whether I understand you correctly. What do you mean by
deploying the job manager as "job" or "deployment"? Are you referring to
the different deployment modes, Flink offers [1]? These would be
independent of Kubernetes. Or do you wonder what the differences are
between the Flink on Kubernetes (native) [2] vs Flink on Kubernetes
(standalone using YAML files)?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/#deployment-modes
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html

On Wed, Apr 21, 2021 at 11:19 PM Gil Amsalem 
wrote:

> Hi,
>
> I found that there are 2 different approaches to setup Flink over
> kubernetes.
> 1. Deploy job manager as Job.
> 2. Deploy job manager as Deployment.
>
> What is the recommended way? What are the benefits of each?
>
> Thanks,
> Gil Amsalem
>


Question about snapshot file

2021-04-22 Thread Abdullah bin Omar
Hi,

(1) what 's the snapshot metadata file (binary) contains ? is it possible
to read the snapshot metadata file by using Flink Deserialization?

(2) is there any function that can be used to see the previous states on
time of operation?

Thank you


Re: Debezium CDC | OOM

2021-04-22 Thread Matthias Pohl
Hi Ayush,
Which state backend have you configured [1]? Have you considered trying out
RocksDB [2]? RocksDB might help with persisting at least keyed state.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#choose-the-right-state-backend
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend

On Thu, Apr 22, 2021 at 7:52 AM Ayush Chauhan 
wrote:

> Hi,
> I am using flink cdc to stream CDC changes in an iceberg table. When I
> first run the flink job for a topic which has all the data for a table, it
> get out of heap memory as flink try to load all the data during my 15mins
> checkpointing interval. Right now, only solution I have is to pass *-ytm
> 8192 -yjm 2048m* for a table with 10M rows and then reduce it after flink
> has consumed all the data. Is there a way to tell flink cdc code to trigger
> checkpoint or throttle the consumption speed(I think backpressure should
> have handled this)?
>
> --
>  Ayush Chauhan
>  Software Engineer | Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>


Re: Question about snapshot file

2021-04-22 Thread Matthias Pohl
Hi Abdullah,
the metadata file contains handles to the operator states of the checkpoint
[1]. You might want to have a look into the State Processor API [2].

Best,
Matthias

[1]
https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> (1) what 's the snapshot metadata file (binary) contains ? is it possible
> to read the snapshot metadata file by using Flink Deserialization?
>
> (2) is there any function that can be used to see the previous states on
> time of operation?
>
> Thank you
>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
As in this is essentially doing what lateness *should* have done  And I
think that is a bug. My code now is . Please look at the allowedLateness on
the session window.

SingleOutputStreamOperator> filteredKeyedValue
= keyedValue
.process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
"late_filter").uid("late_filter");
SingleOutputStreamOperator> lateKeyedValue =
keyedValue
.process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
"late_data").uid("late_data");
SingleOutputStreamOperator> aggregate
= filteredKeyedValue
.filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy(
value -> value.getKey())
.window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
.allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
lateOutputTag)
.trigger(PurgingTrigger.of(CountTrigger.of(1)))
.aggregate(new SortAggregate(),
new SessionIdProcessWindowFunction(this.gapInMinutes, this.
lateNessInMinutes))
.name("session_aggregate").uid("session_aggregate");

On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi 
wrote:

> I can do that, but I am not certain this is the right filter.  Can you
> please validate. That aside I already have the lateness configured for the
> session window ( the normal withLateNess() )  and this looks like a session
> window was not collected and still is alive for some reason ( a flink bug ?
> )
>
> if (ctx.timestamp() + allowedLateness > ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
>
>
> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
> wrote:
>
>> Hi Vishal,
>> based on the error message and the behavior you described, introducing a
>> filter for late events is the way to go - just as described in the SO
>> thread you mentioned. Usually, you would collect late events in some kind
>> of side output [1].
>>
>> I hope that helps.
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I saw
>>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>>> and this seems to suggest a straight up filter, but I am not sure how does
>>> that filter works as in would it factor is the lateness when filtering ?
>>>
>>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Well it was not a solution after all. We now have a session window that
 is stuck with the same issue albeit  after the additional lateness. I had
 increased the lateness to 2 days and that masked the issue which again
 reared it's head after the 2 days ;lateness was over ( instead of the 1 day
 ) before. This is very disconcerting.

 Caused by: java.lang.UnsupportedOperationException: The end timestamp
 of an event-time window cannot become earlier than the current
 watermark by merging. Current watermark: 1619053742129 window:
 TimeWindow{start=161883663, end=1618879580402}



 On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Hey folks,
>I had a pipe with sessionization restarts and then fail
> after retries with this exception. The only thing I had done was to
> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
> from SP and it ran for 12 hours plus without issue. I cannot imagine that
> increasing the lateness created this and the way I solved this was to
> increase the lateness further. Could this be if there are TMs in the
> cluster whose time is off ( as in not synchronized )  ?
>
> 2021-04-21 11:27:58
> java.lang.UnsupportedOperationException: The end timestamp of an
> event-time window cannot become earlier than the current watermark by
> merging. Current watermark: 1618966593999 window: TimeWindow{start=
> 1618878336107, end=1618880140466}
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$2.merge(WindowOperator.java:339)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator$2.merge(WindowOperator.java:321)
> at org.apache.flink.streaming.runtime.operators.windowing.
> MergingWindowSet.addWindow(MergingWindowSet.java:209)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:319)
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(
> OneInputStreamTask.java:191)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .processElement(StreamTaskNetworkInput.java:204)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
> .emitNext(StreamTaskNetworkInput.java:174)
> at org.ap

Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Matthias Pohl
You're saying that you used `allowedLateness`/`sideOutputLateData` as
described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
being added to your pipeline when running into the
UnsupportedOperationException issue previously?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi 
wrote:

> As in this is essentially doing what lateness *should* have done  And I
> think that is a bug. My code now is . Please look at the allowedLateness on
> the session window.
>
> SingleOutputStreamOperator> filteredKeyedValue
> = keyedValue
> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
> "late_filter").uid("late_filter");
> SingleOutputStreamOperator> lateKeyedValue =
> keyedValue
> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
> "late_data").uid("late_data");
> SingleOutputStreamOperator>
> aggregate = filteredKeyedValue
> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).keyBy
> (value -> value.getKey())
> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
> lateOutputTag)
> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
> .aggregate(new SortAggregate(),
> new SessionIdProcessWindowFunction(this.gapInMinutes, this.
> lateNessInMinutes))
> .name("session_aggregate").uid("session_aggregate");
>
> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi 
> wrote:
>
>> I can do that, but I am not certain this is the right filter.  Can you
>> please validate. That aside I already have the lateness configured for the
>> session window ( the normal withLateNess() )  and this looks like a session
>> window was not collected and still is alive for some reason ( a flink bug ?
>> )
>>
>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>>
>>
>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
>> wrote:
>>
>>> Hi Vishal,
>>> based on the error message and the behavior you described, introducing a
>>> filter for late events is the way to go - just as described in the SO
>>> thread you mentioned. Usually, you would collect late events in some kind
>>> of side output [1].
>>>
>>> I hope that helps.
>>> Matthias
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>
>>> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 I saw
 https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
 and this seems to suggest a straight up filter, but I am not sure how does
 that filter works as in would it factor is the lateness when filtering ?

 On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Well it was not a solution after all. We now have a session window
> that is stuck with the same issue albeit  after the additional lateness. I
> had increased the lateness to 2 days and that masked the issue which again
> reared it's head after the 2 days ;lateness was over ( instead of the 1 
> day
> ) before. This is very disconcerting.
>
> Caused by: java.lang.UnsupportedOperationException: The end timestamp
> of an event-time window cannot become earlier than the current
> watermark by merging. Current watermark: 1619053742129 window:
> TimeWindow{start=161883663, end=1618879580402}
>
>
>
> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Hey folks,
>>I had a pipe with sessionization restarts and then
>> fail after retries with this exception. The only thing I had done was to
>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>> increasing the lateness created this and the way I solved this was to
>> increase the lateness further. Could this be if there are TMs in the
>> cluster whose time is off ( as in not synchronized )  ?
>>
>> 2021-04-21 11:27:58
>> java.lang.UnsupportedOperationException: The end timestamp of an
>> event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>> 1618878336107, end=1618880140466}
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator$2.merge(WindowOperator.java:339)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator$2.merge(WindowOperator.java:321)
>> at org.apache.flink.streaming.runtime.operators.windowing.
>> MergingWindowSet.addWindow(MergingWin

Re: [ANNOUNCE] Flink Jira Bot fully live (& Useful Filters to Work with the Bot)

2021-04-22 Thread Matthias Pohl
Thanks for setting this up, Konstantin. +1

On Thu, Apr 22, 2021 at 11:16 AM Konstantin Knauf  wrote:

> Hi everyone,
>
> all of the Jira Bot rules are live now. Particularly in the beginning the
> Jira Bot will be very active, because the rules apply to a lot of old,
> stale tickets. So, if you get a huge amount of emails from the Flink Jira
> Bot right now, this will get better. In any case, the Flink Jira Bot (or
> the rules that it implements) demand some changes to how we work with Jira.
>
> Here are a few points to make this transition easier for us:
>
> *1) Retrospective*
>
> In 3-4 weeks I would like to collect feedback. What is working well? What
> is not working well or getting in your way? Is the bot moving us closer to
> the goals mentioned in the initial email? Specifically, the
> initial parameterization [1] of the bot was kind of an educated guess. I
> will open a [DISCUSS]ion thread to collect feedback and proposals for
> changes around that time.
>
> *2) Use Sub-Tasks*
>
> The bot will ask you for an update on assigned tickets after quite a short
> time for Flink standards. If you are working on a ticket that takes longer,
> consider splitting it up in Sub-Tasks. Activity on Sub-Tasks counts as
> activity for the parent ticket, too. So, as long as any subtask is moving
> along you won't be nagged by the bot.
>
>
> *3) Useful Filters*
>
> You've probably received a lot of emails already, in particular if you are
> watching many tickets. Here are a few JIRA filters to find the tickets,
> that are probably most important to you and have been updated by the bot:
>
> Tickets that *you are assigned to*, which are "stale-assigned"
>
> https://issues.apache.org/jira/issues/?filter=12350499
>
> Tickets that *you reported*, which are stale in anyway:
>
> https://issues.apache.org/jira/issues/?filter=12350500
>
> If you are a maintainer of some components, you might find the following
> filters useful (replace with your own components):
>
> *All tickets that are about to be closed*
> project = FLINK AND component in ("Build System", "BuildSystem / Shaded",
> "Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
> flink-docker, "Release System", "Runtime / Coordination", "Runtime /
> Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
> resolution = Unresolved AND labels in (stale-minor)
>
> *Bugs that are about to be deprioritized or closed*
> project = FLINK AND type = BUG AND component in ("Build System",
> "BuildSystem / Shaded", "Deployment / Kubernetes", "Deployment / Mesos",
> "Deployment / YARN", flink-docker, "Release System", "Runtime /
> Coordination", "Runtime / Metrics", "Runtime / Queryable State", "Runtime /
> REST", Travis) AND resolution = Unresolved AND labels in (stale-major,
> stale-blocker, stale-critical, stale-minor)
>
>
> *Tickets that are stale-assigned, but already have a PR available*project
> = FLINK AND component in ("Build System", "BuildSystem / Shaded",
> "Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
> flink-docker, "Release System", "Runtime / Coordination", "Runtime /
> Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
> resolution = Unresolved AND labels in (stale-assigned) AND labels in
> (pull-request-available)
>
> Cheers,
>
> Konstantin
>
> [1] https://github.com/apache/flink-jira-bot/blob/master/config.yaml
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Multiple jobs in the same Flink project

2021-04-22 Thread Oğuzhan Mangır
We have a flink project with multiple jobs. That means we can submit
multiple job with the same jar. But there is a limitation here i think.
Because, let's assume;

I create a flink project with 3 jobs, and create a single jar then put it
to the flink cluster (all of these steps are working on a ci/cd pipeline,
and the jar name will be assigned automatically. for example my-jar-v1,
my-jar-v2 .. ). Then I submit 3 jobs using the same jar.

Later, I changed the job2, then created a new jar with the new version e.g.
my-jar-v2,  then re-deploy the job2 again with the new jar. But in this
case, when I look at the submit page in the UI, i don't know which job was
submitted from the specified jar.

my-jar-v1 => job1, job2, jo3 deployed
my-jar-v2 => job2 (re-deployed) =>> in this case, i know job2 deployed with
this jar, but others will not know it because ui does not show this
information

And also, if any problem occurs in job2 when i deploy it using the
my-jar-2, i can use the previous jar(my-jar-v1). But if there are a lot of
jars, it can be very difficult.|

Is there any best practice for that?


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
Yes sir. The allowedLateNess and side output always existed.

On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
wrote:

> You're saying that you used `allowedLateness`/`sideOutputLateData` as
> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
> being added to your pipeline when running into the
> UnsupportedOperationException issue previously?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi 
> wrote:
>
>> As in this is essentially doing what lateness *should* have done  And I
>> think that is a bug. My code now is . Please look at the allowedLateness on
>> the session window.
>>
>> SingleOutputStreamOperator>
>> filteredKeyedValue = keyedValue
>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>> "late_filter").uid("late_filter");
>> SingleOutputStreamOperator> lateKeyedValue =
>> keyedValue
>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
>> "late_data").uid("late_data");
>> SingleOutputStreamOperator>
>> aggregate = filteredKeyedValue
>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>> keyBy(value -> value.getKey())
>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>> lateOutputTag)
>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>> .aggregate(new SortAggregate(),
>> new SessionIdProcessWindowFunction(this.gapInMinutes, this.
>> lateNessInMinutes))
>> .name("session_aggregate").uid("session_aggregate");
>>
>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I can do that, but I am not certain this is the right filter.  Can you
>>> please validate. That aside I already have the lateness configured for the
>>> session window ( the normal withLateNess() )  and this looks like a session
>>> window was not collected and still is alive for some reason ( a flink bug ?
>>> )
>>>
>>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>>> currentWatermark()) {
>>> out.collect(value);
>>> }
>>>
>>>
>>> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
>>> wrote:
>>>
 Hi Vishal,
 based on the error message and the behavior you described, introducing
 a filter for late events is the way to go - just as described in the SO
 thread you mentioned. Usually, you would collect late events in some kind
 of side output [1].

 I hope that helps.
 Matthias

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

 On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> I saw
> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
> and this seems to suggest a straight up filter, but I am not sure how does
> that filter works as in would it factor is the lateness when filtering ?
>
> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Well it was not a solution after all. We now have a session window
>> that is stuck with the same issue albeit  after the additional lateness. 
>> I
>> had increased the lateness to 2 days and that masked the issue which 
>> again
>> reared it's head after the 2 days ;lateness was over ( instead of the 1 
>> day
>> ) before. This is very disconcerting.
>>
>> Caused by: java.lang.UnsupportedOperationException: The end
>> timestamp of an event-time window cannot become earlier than the
>> current watermark by merging. Current watermark: 1619053742129
>> window: TimeWindow{start=161883663, end=1618879580402}
>>
>>
>>
>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hey folks,
>>>I had a pipe with sessionization restarts and then
>>> fail after retries with this exception. The only thing I had done was to
>>> increase the lateness by 12 hours ( to  a day )  in this pipe and 
>>> restart
>>> from SP and it ran for 12 hours plus without issue. I cannot imagine 
>>> that
>>> increasing the lateness created this and the way I solved this was to
>>> increase the lateness further. Could this be if there are TMs in the
>>> cluster whose time is off ( as in not synchronized )  ?
>>>
>>> 2021-04-21 11:27:58
>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>> event-time window cannot become earlier than the current watermark
>>> by merging. Current watermark: 1618966593999 window: TimeWindow
>>> {start=1618878336107, end=1618880140466}
>>> at org.apache.flink.streaming.runtime.operators.windowin

Re: Question about snapshot file

2021-04-22 Thread Abdullah bin Omar
Hi,

I have a savepoint or checkpointed file from my task. However, the file is
binary. I want to see what the file contains.

How is it possible to see what information the file has (or how it is
possible to make it human readable?)

Thank you

On Thu, Apr 22, 2021 at 10:19 AM Matthias Pohl 
wrote:

> Hi Abdullah,
> the metadata file contains handles to the operator states of the
> checkpoint [1]. You might want to have a look into the State Processor API
> [2].
>
> Best,
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
> abdullahbinoma...@gmail.com> wrote:
>
>> Hi,
>>
>> (1) what 's the snapshot metadata file (binary) contains ? is it possible
>> to read the snapshot metadata file by using Flink Deserialization?
>>
>> (2) is there any function that can be used to see the previous states on
>> time of operation?
>>
>> Thank you
>>
>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
And when I added the filter the Exception was not thrown. So the sequence
of events

* Increased lateness from 12 ( that was what it was initially running with
)  to 24 hours
* the pipe ran as desired before it blew up with the Exception
* masked the issue by increasing the lateness to 48 hours.
* It blew up again but now after the added lateness, so essentially the
same issue but added lateness let the pipe run for another few hours.
* Added the Fliter upfront  as below, the pipe has no issues. Also metrics
show that no data is being pushed through the sideoutput and that data in
not pulled from the a simulated sideout ( below )


public class LateEventFilter extends ProcessFunction, KeyedTimedValue> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventFilter(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue value, Context ctx,
Collector> out) throws Exception {
if (ctx.timestamp() + allowedLateness > ctx.timerService().currentWatermark())
{
out.collect(value);
}
}
}


public class LateEventSideOutput extends ProcessFunction, KeyedTimedValue> {
private static final long serialVersionUID = 1L;

long allowedLateness;
public LateEventSideOutput(long allowedLateness){
this.allowedLateness = allowedLateness;
}
@Override
public void processElement(KeyedTimedValue value, Context ctx,
Collector> out) throws Exception {
if (ctx.timestamp() + allowedLateness <= ctx.timerService().currentWatermark())
{
out.collect(value);
}
}
}



 I am using RocksDB as a backend if that helps.

On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi 
wrote:

> Yes sir. The allowedLateNess and side output always existed.
>
> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
> wrote:
>
>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>> being added to your pipeline when running into the
>> UnsupportedOperationException issue previously?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>
>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> As in this is essentially doing what lateness *should* have done  And I
>>> think that is a bug. My code now is . Please look at the allowedLateness on
>>> the session window.
>>>
>>> SingleOutputStreamOperator>
>>> filteredKeyedValue = keyedValue
>>> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
>>> "late_filter").uid("late_filter");
>>> SingleOutputStreamOperator> lateKeyedValue
>>> = keyedValue
>>> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name(
>>> "late_data").uid("late_data");
>>> SingleOutputStreamOperator>
>>> aggregate = filteredKeyedValue
>>> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
>>> keyBy(value -> value.getKey())
>>> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
>>> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
>>> lateOutputTag)
>>> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
>>> .aggregate(new SortAggregate(),
>>> new SessionIdProcessWindowFunction(this.gapInMinutes, this.
>>> lateNessInMinutes))
>>> .name("session_aggregate").uid("session_aggregate");
>>>
>>> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 I can do that, but I am not certain this is the right filter.  Can you
 please validate. That aside I already have the lateness configured for the
 session window ( the normal withLateNess() )  and this looks like a session
 window was not collected and still is alive for some reason ( a flink bug ?
 )

 if (ctx.timestamp() + allowedLateness > ctx.timerService().
 currentWatermark()) {
 out.collect(value);
 }


 On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
 wrote:

> Hi Vishal,
> based on the error message and the behavior you described, introducing
> a filter for late events is the way to go - just as described in the SO
> thread you mentioned. Usually, you would collect late events in some kind
> of side output [1].
>
> I hope that helps.
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>
> On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> I saw
>> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
>> and this seems to suggest a straight up filter, but I am not sure how 
>> does
>> that filter works as in would it factor is the lateness when filtering ?
>>
>> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <
>> vishal.san

Re: Multiple jobs in the same Flink project

2021-04-22 Thread Arvid Heise
Hi Oğuzhan,

I think you know the answer already: it's easiest to have 1 jar per
application. And in most cases, it's easiest to also have 1 repo per
application. You can use the same template for all 3 and all future
applications without any special cases.

My rule of thumb is the following: if the life-cycles of applications are
tightly coupled, they can reside in the same repository. So if
update/restart of app1, also means that app2 needs to be updated/restarted,
then use the same CI/CD process.

If (like in your case) the life cycles are independent, treat them as
separate entities. You can have shared code in a 4. repo or include 1 repo
into the other repos.

I would not optimize in the number of repos but in simplicity of a
particular repo. Ultimately, I like to have all repos exactly the same
using the same gradle plugins or build templates (since I don't enjoy doing
DevOp stuff over and over again). If you use GitLab (and I guess similar
tools), it's very easy to manage a large number of repos.

On Thu, Apr 22, 2021 at 7:42 PM Oğuzhan Mangır <
sosyalmedya.oguz...@gmail.com> wrote:

> We have a flink project with multiple jobs. That means we can submit
> multiple job with the same jar. But there is a limitation here i think.
> Because, let's assume;
>
> I create a flink project with 3 jobs, and create a single jar then put it
> to the flink cluster (all of these steps are working on a ci/cd pipeline,
> and the jar name will be assigned automatically. for example my-jar-v1,
> my-jar-v2 .. ). Then I submit 3 jobs using the same jar.
>
> Later, I changed the job2, then created a new jar with the new version
> e.g. my-jar-v2,  then re-deploy the job2 again with the new jar. But in
> this case, when I look at the submit page in the UI, i don't know which job
> was submitted from the specified jar.
>
> my-jar-v1 => job1, job2, jo3 deployed
> my-jar-v2 => job2 (re-deployed) =>> in this case, i know job2 deployed
> with this jar, but others will not know it because ui does not show this
> information
>
> And also, if any problem occurs in job2 when i deploy it using the
> my-jar-2, i can use the previous jar(my-jar-v1). But if there are a lot of
> jars, it can be very difficult.|
>
> Is there any best practice for that?
>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Vishal Santoshi
 <<  Added the Fliter upfront  as below, the pipe has no issues. Also
metrics show that no data is being pushed through the sideoutput and that
data in not pulled from the a simulated sideout ( below )

>> Added the Fliter upfront  as below, the pipe has no issues. Also metrics
show that no data is being pushed through the sideoutput and that data in
*now* pulled from the simulated sideout , essentially the Process Function
with a reverse predicate to the Filter Process Function.


On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi 
wrote:

> And when I added the filter the Exception was not thrown. So the sequence
> of events
>
> * Increased lateness from 12 ( that was what it was initially running with
> )  to 24 hours
> * the pipe ran as desired before it blew up with the Exception
> * masked the issue by increasing the lateness to 48 hours.
> * It blew up again but now after the added lateness, so essentially the
> same issue but added lateness let the pipe run for another few hours.
> * Added the Fliter upfront  as below, the pipe has no issues. Also metrics
> show that no data is being pushed through the sideoutput and that data in
> not pulled from the a simulated sideout ( below )
>
>
> public class LateEventFilter extends ProcessFunction VALUE>, KeyedTimedValue> {
> private static final long serialVersionUID = 1L;
>
> long allowedLateness;
> public LateEventFilter(long allowedLateness){
> this.allowedLateness = allowedLateness;
> }
> @Override
> public void processElement(KeyedTimedValue value, Context ctx,
> Collector> out) throws Exception {
> if (ctx.timestamp() + allowedLateness > ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
> }
> }
>
>
> public class LateEventSideOutput extends ProcessFunction KEY, VALUE>, KeyedTimedValue> {
> private static final long serialVersionUID = 1L;
>
> long allowedLateness;
> public LateEventSideOutput(long allowedLateness){
> this.allowedLateness = allowedLateness;
> }
> @Override
> public void processElement(KeyedTimedValue value, Context ctx,
> Collector> out) throws Exception {
> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
> }
> }
>
>
>
>  I am using RocksDB as a backend if that helps.
>
> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi 
> wrote:
>
>> Yes sir. The allowedLateNess and side output always existed.
>>
>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
>> wrote:
>>
>>> You're saying that you used `allowedLateness`/`sideOutputLateData` as
>>> described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
>>> being added to your pipeline when running into the
>>> UnsupportedOperationException issue previously?
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output
>>>
>>> On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 As in this is essentially doing what lateness *should* have done  And
 I think that is a bug. My code now is . Please look at the allowedLateness
 on the session window.

 SingleOutputStreamOperator>
 filteredKeyedValue = keyedValue
 .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
 "late_filter").uid("late_filter");
 SingleOutputStreamOperator> lateKeyedValue
 = keyedValue
 .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).name
 ("late_data").uid("late_data");
 SingleOutputStreamOperator>
 aggregate = filteredKeyedValue
 .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
 keyBy(value -> value.getKey())
 .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
 .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
 lateOutputTag)
 .trigger(PurgingTrigger.of(CountTrigger.of(1)))
 .aggregate(new SortAggregate(),
 new SessionIdProcessWindowFunction(this.gapInMinutes, this.
 lateNessInMinutes))
 .name("session_aggregate").uid("session_aggregate");

 On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> I can do that, but I am not certain this is the right filter.  Can you
> please validate. That aside I already have the lateness configured for the
> session window ( the normal withLateNess() )  and this looks like a 
> session
> window was not collected and still is alive for some reason ( a flink bug 
> ?
> )
>
> if (ctx.timestamp() + allowedLateness > ctx.timerService().
> currentWatermark()) {
> out.collect(value);
> }
>
>
> On Thu, Apr 22, 2021 at 9:46 AM Matthias Pohl 
> wrote:
>
>> Hi Vishal,
>> based on the error message and the behavior you described,
>> introducing a filter for late events is the way to go - just as described
>> in the SO thread you mentioned. Usually, you would collect late ev

Official flink java client

2021-04-22 Thread gaurav kulkarni
Hi, 
Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 
Thanks,Gaurav

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Run already deployed job on Flink Cluster using RestClusterClient

I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...
 |

 |

 |





Re: [ANNOUNCE] Flink Jira Bot fully live (& Useful Filters to Work with the Bot)

2021-04-22 Thread Xintong Song
Thanks for driving this, Konstantin.
Great job~!

Thank you~

Xintong Song



On Thu, Apr 22, 2021 at 11:57 PM Matthias Pohl 
wrote:

> Thanks for setting this up, Konstantin. +1
>
> On Thu, Apr 22, 2021 at 11:16 AM Konstantin Knauf 
> wrote:
>
>> Hi everyone,
>>
>> all of the Jira Bot rules are live now. Particularly in the beginning the
>> Jira Bot will be very active, because the rules apply to a lot of old,
>> stale tickets. So, if you get a huge amount of emails from the Flink Jira
>> Bot right now, this will get better. In any case, the Flink Jira Bot (or
>> the rules that it implements) demand some changes to how we work with Jira.
>>
>> Here are a few points to make this transition easier for us:
>>
>> *1) Retrospective*
>>
>> In 3-4 weeks I would like to collect feedback. What is working well? What
>> is not working well or getting in your way? Is the bot moving us closer to
>> the goals mentioned in the initial email? Specifically, the
>> initial parameterization [1] of the bot was kind of an educated guess. I
>> will open a [DISCUSS]ion thread to collect feedback and proposals for
>> changes around that time.
>>
>> *2) Use Sub-Tasks*
>>
>> The bot will ask you for an update on assigned tickets after quite a
>> short time for Flink standards. If you are working on a ticket that takes
>> longer, consider splitting it up in Sub-Tasks. Activity on Sub-Tasks counts
>> as activity for the parent ticket, too. So, as long as any subtask is
>> moving along you won't be nagged by the bot.
>>
>>
>> *3) Useful Filters*
>>
>> You've probably received a lot of emails already, in particular if you
>> are watching many tickets. Here are a few JIRA filters to find the tickets,
>> that are probably most important to you and have been updated by the bot:
>>
>> Tickets that *you are assigned to*, which are "stale-assigned"
>>
>> https://issues.apache.org/jira/issues/?filter=12350499
>>
>> Tickets that *you reported*, which are stale in anyway:
>>
>> https://issues.apache.org/jira/issues/?filter=12350500
>>
>> If you are a maintainer of some components, you might find the following
>> filters useful (replace with your own components):
>>
>> *All tickets that are about to be closed*
>> project = FLINK AND component in ("Build System", "BuildSystem / Shaded",
>> "Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
>> flink-docker, "Release System", "Runtime / Coordination", "Runtime /
>> Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
>> resolution = Unresolved AND labels in (stale-minor)
>>
>> *Bugs that are about to be deprioritized or closed*
>> project = FLINK AND type = BUG AND component in ("Build System",
>> "BuildSystem / Shaded", "Deployment / Kubernetes", "Deployment / Mesos",
>> "Deployment / YARN", flink-docker, "Release System", "Runtime /
>> Coordination", "Runtime / Metrics", "Runtime / Queryable State", "Runtime /
>> REST", Travis) AND resolution = Unresolved AND labels in (stale-major,
>> stale-blocker, stale-critical, stale-minor)
>>
>>
>> *Tickets that are stale-assigned, but already have a PR available*project
>> = FLINK AND component in ("Build System", "BuildSystem / Shaded",
>> "Deployment / Kubernetes", "Deployment / Mesos", "Deployment / YARN",
>> flink-docker, "Release System", "Runtime / Coordination", "Runtime /
>> Metrics", "Runtime / Queryable State", "Runtime / REST", Travis) AND
>> resolution = Unresolved AND labels in (stale-assigned) AND labels in
>> (pull-request-available)
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1] https://github.com/apache/flink-jira-bot/blob/master/config.yaml
>>
>>
>> --
>>
>> Konstantin Knauf
>>
>> https://twitter.com/snntrable
>>
>> https://github.com/knaufk
>>
>


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread 马阳阳
Hi Matthias,
We have “solved” the problem by tuning the join. But I still try to answer the 
questions, hoping this will help.


* What is the option you're referring to for the bounded shuffle? That might 
help to understand what streaming mode solution you're looking for.

|
taskmanager.network.blocking-shuffle.type
| "file" | String | The blocking shuffle type, either "mmap" or "file". The 
"auto" means selecting the property type automatically based on system memory 
architecture (64 bit for mmap and 32 bit for file). Note that the memory usage 
of mmap is not accounted by configured memory limits, but some resource 
frameworks like yarn would track this memory usage and kill the container once 
memory exceeding some threshold. Also note that this option is experimental and 
might be changed future. |
* What does the job graph look like? Are you assuming that it's due to a 
shuffling operation? Could you provide the logs to get a better understanding 
of your case?
   The graph is join of three streams. And we use rocksdb as the statebackend. 
I think the crash is due to rocksdb. And I could not get the logs (because some 
misconfiguration, which caused the logs are empty). 
* Do you observe the same memory increase for other TaskManager nodes?

   After one tm is killed, the job failed. So I didn’t see the exactly same 
memory increase for other tms. But I think other tms would have similiar 
behavior because the data sizes they processed are almost the same.
* Are you expecting to reach the memory limits considering that you mentioned a 
"big state size"? Would increasing the memory limit be an option or do you fear 
that it's caused by some memory leak?
  By change the tm process memory to 18GB instead of 12GB, it didn’t help.


By the answers I provided, I think maybe we should figure out why rocksdb 
overused virtual memory, and caused yarn to kill the container.


On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:

The Flink version we used is 1.12.0.


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


On 04/16/2021 16:07,马阳阳 wrote:
Hi, community,
When running a Flink streaming job with big state size, one task manager 
process was killed by the yarn node manager. The following log is from the yarn 
node manager:


2021-04-16 11:51:23,013 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container 
[pid=521232,containerID=container_e157_1618223445363_16943_01_10] is 
running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0 GB of 
12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used. Killing 
container.


When searching solution for this problem, I found that there is a option for 
this that worked for bounded shuffle. So is there a way to get rid of this in 
streaming mode?


PS:
memory related options:
taskmanager.memory.process.size:12288m
taskmanager.memory.managed.fraction:0.7



Re: Official flink java client

2021-04-22 Thread Yun Gao
Hi gaurav,

Logicall Flink client is bear inside the StreamExecutionEnvironment, and users 
could use the 
StreamExecutionEnvironment to execute their jobs. Could you share more about 
why you 
want to directly use the client? 

Best,
Yun



 --Original Mail --
Sender:gaurav kulkarni 
Send Date:Fri Apr 23 10:14:08 2021
Recipients:User 
Subject:Official flink java client

Hi, 

Is there any official flink client in java that's available? I came across 
RestClusterClient, but I am not sure if its official. I can create my own 
client, but just wanted to check if there is anything official available 
already that I can leverage. 

Thanks,
Gaurav












Run already deployed job on Flink Cluster using 
RestClusterClient
I am trying to run already deployed job on Flink Cluster using Rest request.I 
had success using a simple rest ...




Re: Debezium CDC | OOM

2021-04-22 Thread Ayush Chauhan
Hi Matthias,

I am using RocksDB as a state backend. I think the iceberg sink is not able
to propagate back pressure to the source which is resulting in OOM for my
CDC pipeline.
Please refer to this - https://github.com/apache/iceberg/issues/2504



On Thu, Apr 22, 2021 at 8:44 PM Matthias Pohl 
wrote:

> Hi Ayush,
> Which state backend have you configured [1]? Have you considered trying
> out RocksDB [2]? RocksDB might help with persisting at least keyed state.
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#choose-the-right-state-backend
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>
> On Thu, Apr 22, 2021 at 7:52 AM Ayush Chauhan 
> wrote:
>
>> Hi,
>> I am using flink cdc to stream CDC changes in an iceberg table. When I
>> first run the flink job for a topic which has all the data for a table, it
>> get out of heap memory as flink try to load all the data during my 15mins
>> checkpointing interval. Right now, only solution I have is to pass *-ytm
>> 8192 -yjm 2048m* for a table with 10M rows and then reduce it after
>> flink has consumed all the data. Is there a way to tell flink cdc code to
>> trigger checkpoint or throttle the consumption speed(I think backpressure
>> should have handled this)?
>>
>> --
>>  Ayush Chauhan
>>  Software Engineer | Data Platform
>>  [image: mobile-icon]  +91 9990747111
>>
>>
>> This email is intended only for the person or the entity to whom it is
>> addressed. If you are not the intended recipient, please delete this email
>> and contact the sender.
>>
>

-- 
 Ayush Chauhan
 Software Engineer | Data Platform
 [image: mobile-icon]  +91 9990747111

-- 












This email is intended only for the person or the entity to 
whom it is addressed. If you are not the intended recipient, please delete 
this email and contact the sender.



Re: Question about snapshot file

2021-04-22 Thread Matthias Pohl
What is it you're trying to achieve in general? The JavaDoc of
MetadataV2V3SerializerBase provides a description on the format of the
file. Theoretically, you could come up with custom code using the Flink
sources to parse the content of the file. But maybe, there's another way to
accomplish what you're trying to do.

Matthias

[1]
https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83

On Thu, Apr 22, 2021 at 7:53 PM Abdullah bin Omar <
abdullahbinoma...@gmail.com> wrote:

> Hi,
>
> I have a savepoint or checkpointed file from my task. However, the file is
> binary. I want to see what the file contains.
>
> How is it possible to see what information the file has (or how it is
> possible to make it human readable?)
>
> Thank you
>
> On Thu, Apr 22, 2021 at 10:19 AM Matthias Pohl 
> wrote:
>
>> Hi Abdullah,
>> the metadata file contains handles to the operator states of the
>> checkpoint [1]. You might want to have a look into the State Processor API
>> [2].
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://github.com/apache/flink/blob/adaaed426c2e637b8e5ffa3f0d051326038d30aa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L83
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> On Thu, Apr 22, 2021 at 4:57 PM Abdullah bin Omar <
>> abdullahbinoma...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> (1) what 's the snapshot metadata file (binary) contains ? is it
>>> possible to read the snapshot metadata file by using Flink Deserialization?
>>>
>>> (2) is there any function that can be used to see the previous states on
>>> time of operation?
>>>
>>> Thank you
>>>
>>


Re: event-time window cannot become earlier than the current watermark by merging

2021-04-22 Thread Matthias Pohl
To me, it sounds strange. I would have expected it to work with
`allowedLateness` and `sideOutput` being defined. I pull in David to have a
look at it. Maybe, he has some more insights. I haven't worked that much
with lateness, yet.

Matthias

On Thu, Apr 22, 2021 at 10:57 PM Vishal Santoshi 
wrote:

>  <<  Added the Fliter upfront  as below, the pipe has no issues. Also
> metrics show that no data is being pushed through the sideoutput and that
> data in not pulled from the a simulated sideout ( below )
>
> >> Added the Fliter upfront  as below, the pipe has no issues. Also
> metrics show that no data is being pushed through the sideoutput and that
> data in *now* pulled from the simulated sideout , essentially the Process
> Function with a reverse predicate to the Filter Process Function.
>
>
> On Thu, Apr 22, 2021 at 1:56 PM Vishal Santoshi 
> wrote:
>
>> And when I added the filter the Exception was not thrown. So the sequence
>> of events
>>
>> * Increased lateness from 12 ( that was what it was initially running
>> with )  to 24 hours
>> * the pipe ran as desired before it blew up with the Exception
>> * masked the issue by increasing the lateness to 48 hours.
>> * It blew up again but now after the added lateness, so essentially the
>> same issue but added lateness let the pipe run for another few hours.
>> * Added the Fliter upfront  as below, the pipe has no issues. Also
>> metrics show that no data is being pushed through the sideoutput and that
>> data in not pulled from the a simulated sideout ( below )
>>
>>
>> public class LateEventFilter extends ProcessFunction> VALUE>, KeyedTimedValue> {
>> private static final long serialVersionUID = 1L;
>>
>> long allowedLateness;
>> public LateEventFilter(long allowedLateness){
>> this.allowedLateness = allowedLateness;
>> }
>> @Override
>> public void processElement(KeyedTimedValue value, Context ctx
>> ,
>> Collector> out) throws Exception {
>> if (ctx.timestamp() + allowedLateness > ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>> public class LateEventSideOutput extends ProcessFunction> KEY, VALUE>, KeyedTimedValue> {
>> private static final long serialVersionUID = 1L;
>>
>> long allowedLateness;
>> public LateEventSideOutput(long allowedLateness){
>> this.allowedLateness = allowedLateness;
>> }
>> @Override
>> public void processElement(KeyedTimedValue value, Context ctx
>> ,
>> Collector> out) throws Exception {
>> if (ctx.timestamp() + allowedLateness <= ctx.timerService().
>> currentWatermark()) {
>> out.collect(value);
>> }
>> }
>> }
>>
>>
>>
>>  I am using RocksDB as a backend if that helps.
>>
>> On Thu, Apr 22, 2021 at 1:50 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Yes sir. The allowedLateNess and side output always existed.
>>>
>>> On Thu, Apr 22, 2021 at 11:47 AM Matthias Pohl 
>>> wrote:
>>>
 You're saying that you used `allowedLateness`/`sideOutputLateData` as
 described in [1] but without the `LateEventFilter`/`LateEventSideOutput`
 being added to your pipeline when running into the
 UnsupportedOperationException issue previously?

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

 On Thu, Apr 22, 2021 at 5:32 PM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> As in this is essentially doing what lateness *should* have done  And
> I think that is a bug. My code now is . Please look at the allowedLateness
> on the session window.
>
> SingleOutputStreamOperator>
> filteredKeyedValue = keyedValue
> .process(new LateEventFilter(this.lateNessInMinutes*60*1000l)).name(
> "late_filter").uid("late_filter");
> SingleOutputStreamOperator> lateKeyedValue
> = keyedValue
> .process(new LateEventSideOutput(this.lateNessInMinutes*60*1000l)).
> name("late_data").uid("late_data");
> SingleOutputStreamOperator>
> aggregate = filteredKeyedValue
> .filter((f) -> f.key != null && f.timedValue.getEventTime() != null).
> keyBy(value -> value.getKey())
> .window(EventTimeSessionWindows.withGap(Time.minutes(gapInMinutes)))
> .allowedLateness(Time.minutes(lateNessInMinutes)).sideOutputLateData(
> lateOutputTag)
> .trigger(PurgingTrigger.of(CountTrigger.of(1)))
> .aggregate(new SortAggregate(),
> new SessionIdProcessWindowFunction(this.gapInMinutes, this
> .lateNessInMinutes))
> .name("session_aggregate").uid("session_aggregate");
>
> On Thu, Apr 22, 2021 at 9:59 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> I can do that, but I am not certain this is the right filter.  Can
>> you please validate. That aside I already have the lateness configured 
>> for
>> the session window ( the normal withLateNess() )  and this looks like a
>> session window was not collected and still is alive for some reason ( a
>> flink bug

Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread Matthias Pohl
Thanks for sharing these details. Looking into FLINK-14952 [1] (which
introduced this option) and the related mailing list thread [2], it feels
like your issue is quite similar to what is described in there even though
it sounds like this issue is mostly tied to bounded jobs. But I'm not sure
what is happening under the hood. I guess you tried the option already?
Have you had the chance to profile memory. I'm pulling in Piotr and
Zhijiang. Maybe, they have more insights on that matter.

Matthias

[1] https://issues.apache.org/jira/browse/FLINK-14952
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoGroup-SortMerger-performance-degradation-from-1-6-4-1-9-1-tp31082p31389.html

On Fri, Apr 23, 2021 at 4:53 AM 马阳阳  wrote:

> Hi Matthias,
> We have “solved” the problem by tuning the join. But I still try to answer
> the questions, hoping this will help.
>
> * What is the option you're referring to for the bounded shuffle? That
> might help to understand what streaming mode solution you're looking for.
>
> taskmanager.network.blocking-shuffle.type "file" String The blocking
> shuffle type, either "mmap" or "file". The "auto" means selecting the
> property type automatically based on system memory architecture (64 bit for
> mmap and 32 bit for file). Note that the memory usage of mmap is not
> accounted by configured memory limits, but some resource frameworks like
> yarn would track this memory usage and kill the container once memory
> exceeding some threshold. Also note that this option is experimental and
> might be changed future.
> * What does the job graph look like? Are you assuming that it's due to a
> shuffling operation? Could you provide the logs to get a better
> understanding of your case?
>The graph is join of three streams. And we use rocksdb as the
> statebackend. I think the crash is due to rocksdb. And I could not get the
> logs (because some misconfiguration, which caused the logs are empty).
> * Do you observe the same memory increase for other TaskManager nodes?
>After one tm is killed, the job failed. So I didn’t see the exactly
> same memory increase for other tms. But I think other tms would have
> similiar behavior because the data sizes they processed are almost the same.
> * Are you expecting to reach the memory limits considering that you
> mentioned a "big state size"? Would increasing the memory limit be an
> option or do you fear that it's caused by some memory leak?
>   By change the tm process memory to 18GB instead of 12GB, it didn’t help.
>
> By the answers I provided, I think maybe we should figure out why rocksdb
> overused virtual memory, and caused yarn to kill the container.
>
> On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:
>
>> The Flink version we used is 1.12.0.
>>
>> 马阳阳
>> ma_yang_y...@163.com
>>
>> 
>> 签名由 网易邮箱大师  定制
>>
>> On 04/16/2021 16:07,马阳阳 
>> wrote:
>>
>> Hi, community,
>> When running a Flink streaming job with big state size, one task manager
>> process was killed by the yarn node manager. The following log is from the
>> yarn node manager:
>>
>> 2021-04-16 11:51:23,013 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container
>> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
>> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
>> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
>> Killing container.
>>
>> When searching solution for this problem, I found that there is a option
>> for this that worked for bounded shuffle. So is there a way to get rid of
>> this in streaming mode?
>>
>> PS:
>> memory related options:
>> taskmanager.memory.process.size:12288m
>> taskmanager.memory.managed.fraction:0.7
>>
>>


Re: Flink streaming job's taskmanager process killed by yarn nodemanager because of exceeding 'PHYSICAL' memory limit

2021-04-22 Thread Matthias Pohl
Another few questions: Have you had the chance to monitor/profile the
memory usage? What section of the memory was used excessively?
Additionally, could @dhanesh arole 's proposal
solve your issue?

Matthias

On Fri, Apr 23, 2021 at 8:41 AM Matthias Pohl 
wrote:

> Thanks for sharing these details. Looking into FLINK-14952 [1] (which
> introduced this option) and the related mailing list thread [2], it feels
> like your issue is quite similar to what is described in there even though
> it sounds like this issue is mostly tied to bounded jobs. But I'm not sure
> what is happening under the hood. I guess you tried the option already?
> Have you had the chance to profile memory. I'm pulling in Piotr and
> Zhijiang. Maybe, they have more insights on that matter.
>
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-14952
> [2]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/CoGroup-SortMerger-performance-degradation-from-1-6-4-1-9-1-tp31082p31389.html
>
> On Fri, Apr 23, 2021 at 4:53 AM 马阳阳  wrote:
>
>> Hi Matthias,
>> We have “solved” the problem by tuning the join. But I still try to
>> answer the questions, hoping this will help.
>>
>> * What is the option you're referring to for the bounded shuffle? That
>> might help to understand what streaming mode solution you're looking for.
>>
>> taskmanager.network.blocking-shuffle.type "file" String The blocking
>> shuffle type, either "mmap" or "file". The "auto" means selecting the
>> property type automatically based on system memory architecture (64 bit for
>> mmap and 32 bit for file). Note that the memory usage of mmap is not
>> accounted by configured memory limits, but some resource frameworks like
>> yarn would track this memory usage and kill the container once memory
>> exceeding some threshold. Also note that this option is experimental and
>> might be changed future.
>> * What does the job graph look like? Are you assuming that it's due to a
>> shuffling operation? Could you provide the logs to get a better
>> understanding of your case?
>>The graph is join of three streams. And we use rocksdb as the
>> statebackend. I think the crash is due to rocksdb. And I could not get the
>> logs (because some misconfiguration, which caused the logs are empty).
>> * Do you observe the same memory increase for other TaskManager nodes?
>>After one tm is killed, the job failed. So I didn’t see the exactly
>> same memory increase for other tms. But I think other tms would have
>> similiar behavior because the data sizes they processed are almost the same.
>> * Are you expecting to reach the memory limits considering that you
>> mentioned a "big state size"? Would increasing the memory limit be an
>> option or do you fear that it's caused by some memory leak?
>>   By change the tm process memory to 18GB instead of 12GB, it didn’t help.
>>
>> By the answers I provided, I think maybe we should figure out why rocksdb
>> overused virtual memory, and caused yarn to kill the container.
>>
>> On Fri, Apr 16, 2021 at 10:24 AM 马阳阳  wrote:
>>
>>> The Flink version we used is 1.12.0.
>>>
>>> 马阳阳
>>> ma_yang_y...@163.com
>>>
>>> 
>>> 签名由 网易邮箱大师  定制
>>>
>>> On 04/16/2021 16:07,马阳阳 
>>> wrote:
>>>
>>> Hi, community,
>>> When running a Flink streaming job with big state size, one task manager
>>> process was killed by the yarn node manager. The following log is from the
>>> yarn node manager:
>>>
>>> 2021-04-16 11:51:23,013 WARN
>>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>>> Container
>>> [pid=521232,containerID=container_e157_1618223445363_16943_01_10] is
>>> running 19562496B beyond the 'PHYSICAL' memory limit. Current usage: 12.0
>>> GB of 12 GB physical memory used; 15.2 GB of 25.2 GB virtual memory used.
>>> Killing container.
>>>
>>> When searching solution for this problem, I found that there is a option
>>> for this that worked for bounded shuffle. So is there a way to get rid of
>>> this in streaming mode?
>>>
>>> PS:
>>> memory related options:
>>> taskmanager.memory.process.size:12288m
>>> taskmanager.memory.managed.fraction:0.7
>>>
>>>