Re: Flink not outputting windows before all data is seen

2020-08-30 Thread Teodor Spæren

Hey David!

I tried what you said, but it did not solve the problem. The job still 
has to wait until the very end before outputting anything.


I mentioned in my original email that I had set the parallelism to 1 job 
wide, but when I reran the task, I added your line. Are there any 
circumstances where despite having the global level set to 1, you still 
need to set the level on individual operators?


PS: I sent this to you directly I'm sorry about that

Best regards,
Teodor

On Sat, Aug 29, 2020 at 08:37:48PM +0200, David Anderson wrote:

Teodor,

This is happening because of the way that readTextFile works when it is
executing in parallel, which is to divide the input file into a bunch of
splits, which are consumed in parallel. This is making it so that the
watermark isn't able to move forward until much or perhaps all of the file
has been read. If you change the parallelism of the source to 1, like this

   final DataStream linesIn =
env.readTextFile(fileNameInput).setParallelism(1);

then you should see the job make steady forward progress with windows
closing on a regular basis.

Regards,
David

On Sat, Aug 29, 2020 at 4:59 PM Teodor Spæren 
wrote:


Hey!

Second time posting to a mailing lists, lets hope I'm doing this
correctly :)

My usecase is to take data from the mediawiki dumps and stream it into
Flink via the `readTextFile` method. The dumps are TSV files with an
event per line, each event have a timestamp and a type. I want to use
event time processing and simply print out how many of each event type
there is per hour. The data can be out of order, so I have 1 hour
tolerance.

What I expect to happen here is that as it goes through a month of data,
it will print out the hours as the watermark passes 1 hour. So I'll get
output continuously until the end.

What really happens is that the program outputs nothing until it is done
and then it outputs everything. The timestamp is also stuck at
9223372036854776000 in the web management. If I switch to using
CountWindows instead of timewindows, it outputs continuously like I
would expect it too, so it seems to be watermark related.

I'm running Flink version 1.11.1 on JVM version:

OpenJDK 64-Bit Server VM - GraalVM Community - 11/11.0.7+10-jvmci-20.1-b02

The parallel setting is 1 and it's running on my laptop.


I don't know how much code I'm allowed to attach here, so I've created a
github repo with the complete self standing example [1]. To get the data
used, run the following commands:

$ wget
https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
$ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat  | pv -cN bzcat
|  sort -k4 > 2020-07.enwiki.2016-04.sorted.tsv

If you don't have pv installed, just remove that part, I just like to
have an overview.


The main code part is this:

package org.example.prow;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.example.prow.wikimedia.Event;

import java.time.Duration;

public class App {
 public static void main(String[] args) throws Exception {
 final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

 final String fileNameInput =
"file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
 final DataStream linesIn =
env.readTextFile(fileNameInput);


 final SingleOutputStreamOperator jj = linesIn.map(value ->
new Event(value));

 final WatermarkStrategy mew =
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element,
recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000);

 final DataStream props =
jj.assignTimestampsAndWatermarks(mew);

 final KeyedStream praps = props.keyBy(e ->
e.eventEntity.toString());


 
praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!");

 env.execute("FlinkWikipediaHistoryTopEditors");
 }
}

If you see any erors here, please tell me, this is sort of driving me
mad >_<.

Best regards,
Teodor Spæren

[1] https://github.com/rHermes/flink-question-001



Re: Flink not outputting windows before all data is seen

2020-08-30 Thread Teodor Spæren

Hey again David!

I tried your proposed change of setting the paralilism higher. This 
worked, but why does this fix the behavior? I don't understand why this 
would fix it. The only thing that happens to the query plan is that a 
"remapping" node is added.


Thanks for the fix, and for any additional answer :)

Best regards,
Teodor

On Sun, Aug 30, 2020 at 12:29:31PM +0200, Teodor Spæren wrote:

Hey David!

I tried what you said, but it did not solve the problem. The job still 
has to wait until the very end before outputting anything.


I mentioned in my original email that I had set the parallelism to 1 
job wide, but when I reran the task, I added your line. Are there any 
circumstances where despite having the global level set to 1, you 
still need to set the level on individual operators?


PS: I sent this to you directly I'm sorry about that

Best regards,
Teodor

On Sat, Aug 29, 2020 at 08:37:48PM +0200, David Anderson wrote:

Teodor,

This is happening because of the way that readTextFile works when it is
executing in parallel, which is to divide the input file into a bunch of
splits, which are consumed in parallel. This is making it so that the
watermark isn't able to move forward until much or perhaps all of the file
has been read. If you change the parallelism of the source to 1, like this

  final DataStream linesIn =
env.readTextFile(fileNameInput).setParallelism(1);

then you should see the job make steady forward progress with windows
closing on a regular basis.

Regards,
David

On Sat, Aug 29, 2020 at 4:59 PM Teodor Spæren 
wrote:


Hey!

Second time posting to a mailing lists, lets hope I'm doing this
correctly :)

My usecase is to take data from the mediawiki dumps and stream it into
Flink via the `readTextFile` method. The dumps are TSV files with an
event per line, each event have a timestamp and a type. I want to use
event time processing and simply print out how many of each event type
there is per hour. The data can be out of order, so I have 1 hour
tolerance.

What I expect to happen here is that as it goes through a month of data,
it will print out the hours as the watermark passes 1 hour. So I'll get
output continuously until the end.

What really happens is that the program outputs nothing until it is done
and then it outputs everything. The timestamp is also stuck at
9223372036854776000 in the web management. If I switch to using
CountWindows instead of timewindows, it outputs continuously like I
would expect it too, so it seems to be watermark related.

I'm running Flink version 1.11.1 on JVM version:

OpenJDK 64-Bit Server VM - GraalVM Community - 11/11.0.7+10-jvmci-20.1-b02

The parallel setting is 1 and it's running on my laptop.


I don't know how much code I'm allowed to attach here, so I've created a
github repo with the complete self standing example [1]. To get the data
used, run the following commands:

$ wget
https://dumps.wikimedia.org/other/mediawiki_history/2020-07/enwiki/2020-07.enwiki.2016-04.tsv.bz2
$ pv -cN source < 2020-07.enwiki.2016-04.tsv.bz2 | bzcat  | pv -cN bzcat
|  sort -k4 > 2020-07.enwiki.2016-04.sorted.tsv

If you don't have pv installed, just remove that part, I just like to
have an overview.


The main code part is this:

package org.example.prow;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.example.prow.wikimedia.Event;

import java.time.Duration;

public class App {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

final String fileNameInput =
"file:///home/rhermes/madsci/thesis/data/mediawiki_history/2020-07.enwiki.2016-04.sorted.tsv";
final DataStream linesIn =
env.readTextFile(fileNameInput);


final SingleOutputStreamOperator jj = linesIn.map(value ->
new Event(value));

final WatermarkStrategy mew =
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(1)).withTimestampAssigner((element,
recordTimestamp) -> element.eventTimestamp.toEpochSecond() * 1000);

final DataStream props =
jj.assignTimestampsAndWatermarks(mew);

final KeyedStream praps = props.keyBy(e ->
e.eventEntity.toString());


praps.window(TumblingEventTimeWindows.of(Time.hours(1))).sum("something").print("JAJ!");

env.execute("FlinkWikipediaHistoryTopEditors");
}
}

If you see any erors here, please tell me, this is sort of d

Re: Resource leak in DataSourceNode?

2020-08-30 Thread Mark Davis
Hi Robert,

Thank you for confirming that there is an issue.
I do not have a solution for it and would like to hear the committer insights 
what is wrong there.

I think there are actually two issues - the first one is the HBase InputFormat 
does not close a connection in close().
Another is DataSourceNode not calling the close() method.

Cheers,
Mark

‐‐‐ Original Message ‐‐‐
On Thursday, August 27, 2020 3:30 PM, Robert Metzger  
wrote:

> Hi Mark,
>
> Thanks a lot for your message and the good investigation! I believe you've 
> found a bug in Flink. I filed an issue for the problem: 
> https://issues.apache.org/jira/browse/FLINK-19064.
>
> Would you be interested in opening a pull request to fix this?
> Otherwise, I'm sure a committer will pick up the issue soon.
>
> I'm not aware of a simple workaround for the problem.
>
> Best,
> Robert
>
> On Wed, Aug 26, 2020 at 4:05 PM Mark Davis  wrote:
>
>> Hi,
>>
>> I am trying to investigate a problem with non-released resources in my 
>> application.
>>
>> I have a stateful application which submits Flink DataSetjobs using code 
>> very similar to the code in CliFrontend.
>> I noticed what I am getting a lot of non-closed connections to my data store 
>> (HBase in my case). The connections are held by the application not the jobs 
>> themselves.
>>
>> I am using HBaseRowDataInputFormat and it seems that HBase connections 
>> opened in the configure() method during the job graph creation(before the 
>> jobs is executed) are not closed. My search lead me to the method 
>> DataSourceNode.computeOperatorSpecificDefaultEstimates(DataStatistics) where 
>> I see that a format is not closed after being configured.
>>
>> Is that correct? How can I overcome this issue?
>>
>> My application is long running that is probably why I observe the resource 
>> leak. Would I spawn a new JVM to run jobs this problem would not be 
>> noticeable.
>>
>> Thank you!
>>
>> Cheers,
>> Marc

Re: Flink OnCheckpointRollingPolicy streamingfilesink

2020-08-30 Thread Vijayendra Yadav
Thank You Andrey.

Regards,
Vijay


> On Aug 29, 2020, at 3:38 AM, Andrey Zagrebin  wrote:
> 
> 
> Hi Vijay,
> 
> I would apply the same judgement. It is latency vs throughput vs spent 
> resources vs practical need.
> 
> The more concurrent checkpoints your system is capable of handling, the 
> better end-to-end result latency you will observe and see computation results 
> more frequently.
> On the other hand your system needs to provide more resources (maybe higher 
> parallelism) to process more current checkpoints.
> 
> Again lees the checkpoints -> more records are batched together and the 
> throughput is better.
> 
> It usually does not make sense to have a big number of current checkpoints 
> which process only a handful of records in between if you do not observe any 
> practical decrease of latency.
> The system will just waste resources to process the checkpoints.
> 
> Best,
> Andrey
> 
>> On Fri, Aug 28, 2020 at 9:52 PM Vijayendra Yadav  
>> wrote:
>> Hi Andrey,
>> 
>> Thanks, 
>> what is recommendation for :  
>> env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt) ?
>> 
>> 1 or higher based on what factor.
>> 
>> 
>> Regards,
>> Vijay
>> 
>> 
>>> On Tue, Aug 25, 2020 at 8:55 AM Andrey Zagrebin  
>>> wrote:
>>> Hi Vijay,
>>> 
>>> I think it depends on your job requirements, in particular how many records 
>>> are processed per second and how much resources you have to process them.
>>> 
>>> If the checkpointing interval is short then the checkpointing overhead can 
>>> be too high and you need more resources to efficiently keep up with the 
>>> incoming streaming.
>>> 
>>> If the checkpointing interval is long, more records are batched together 
>>> and the throughput is better.
>>> On the other hand, the observed latency is lower because the batched 
>>> results get flushed into the files and become visible in the external 
>>> system only when checkpoint occurs to provide exactly once guarantee.
>>> 
>>> Best,
>>> Andrey
>>> 
 On Mon, Aug 24, 2020 at 6:18 PM Vijayendra Yadav  
 wrote:
 Hi Team,
 
 Bulk Formats can only have `OnCheckpointRollingPolicy`, which rolls (ONLY) 
 on every checkpoint.  
 
 .withRollingPolicy(OnCheckpointRollingPolicy.build())
 
 Question: What are recommended values related to checkpointing to fsstate, 
 should it be more frequent checkpoints, or longer intervals, how many 
 concurrent checkpoints needs to be allowed, how much should be an ideal 
 pause between each checkpoint.
 Is there a way to control batch size here other than time ? any 
 recommendations to all the parameters listed below? 
 Note: I am trying to improve sink throughput. 
 
 
 env.enableCheckpointing(chckptintervalmilli)  
 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.valueOf(ChckptMode))
 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(chckptintervalmilligap)
 env.getCheckpointConfig.setCheckpointTimeout(chckptduration)
 env.getCheckpointConfig.setMaxConcurrentCheckpoints(concurrentchckpt)
 env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.valueOf(CheckpointCleanup))
  
 env.getCheckpointConfig.setPreferCheckpointForRecovery(CheckpointForCleanup)
 
 Thanks,
 Vijay


Re: Resource leak in DataSourceNode?

2020-08-30 Thread Robert Metzger
Hi Mark,

from the discussion in the JIRA ticket, it seems that we've found somebody
in the community who's going to fix this.

I don't think calling close() is necessary in the DataSourceNode. The
problem is that the connection should not be established in configure() but
in open().
Thanks again for the bug report. The fix will be in the next release of the
1.11 line.


On Sun, Aug 30, 2020 at 5:34 PM Mark Davis  wrote:

> Hi Robert,
>
> Thank you for confirming that there is an issue.
> I do not have a solution for it and would like to hear the committer
> insights what is wrong there.
>
> I think there are actually two issues - the first one is the HBase
> InputFormat does not close a connection in close().
> Another is DataSourceNode not calling the close() method.
>
> Cheers,
>   Mark
>
> ‐‐‐ Original Message ‐‐‐
> On Thursday, August 27, 2020 3:30 PM, Robert Metzger 
> wrote:
>
> Hi Mark,
>
> Thanks a lot for your message and the good investigation! I believe you've
> found a bug in Flink. I filed an issue for the problem:
> https://issues.apache.org/jira/browse/FLINK-19064.
>
> Would you be interested in opening a pull request to fix this?
> Otherwise, I'm sure a committer will pick up the issue soon.
>
> I'm not aware of a simple workaround for the problem.
>
> Best,
> Robert
>
>
> On Wed, Aug 26, 2020 at 4:05 PM Mark Davis  wrote:
>
>> Hi,
>>
>> I am trying to investigate a problem with non-released resources in my
>> application.
>>
>> I have a stateful application which submits Flink DataSet jobs using
>> code very similar to the code in CliFrontend.
>> I noticed what I am getting a lot of non-closed connections to my data
>> store (HBase in my case). The connections are held by the application not
>> the jobs themselves.
>>
>> I am using HBaseRowDataInputFormat and it seems that HBase connections
>> opened in the configure() method during the job graph creation(before the
>> jobs is executed) are not closed. My search lead me to the method
>> DataSourceNode.computeOperatorSpecificDefaultEstimates(DataStatistics)
>> where I see that a format is not closed after being configured.
>>
>> Is that correct? How can I overcome this issue?
>>
>> My application is long running that is probably why I observe the
>> resource leak. Would I spawn a new JVM to run jobs this problem would not
>> be noticeable.
>>
>> Thank you!
>>
>> Cheers,
>>   Marc
>>
>
>


How to use Flink IDE

2020-08-30 Thread Piper Piper
Hi,

Till now, I have only been using Flink binaries. How do I setup Flink in my
IntelliJ IDE so that while running/debugging my Flink application program I
can also step into the Flink source code?

Do I first need to import Flink's source repository into my IDE and build
it?

Thanks,

Piper


Re: How to use Flink IDE

2020-08-30 Thread Ardhani Narasimha Swamy
Hi Piper,

Welcome to Flink Community.

Import flink project like any other project into IDE, only difference while
running is you have click on  "Include dependencies with "Provided" scope"
in the main class run configurations. This bundles the Flink dependencies
in the artifact, making it a fat jar and deploy it.


Steps:

1. Open main class run/debug configurations
2. Click on Include dependencies with Provided scope.
3. Apply


Thanks,
Narasimha



On Sun, Aug 30, 2020 at 11:40 PM Piper Piper  wrote:

> Hi,
>
> Till now, I have only been using Flink binaries. How do I setup Flink in
> my IntelliJ IDE so that while running/debugging my Flink application
> program I can also step into the Flink source code?
>
> Do I first need to import Flink's source repository into my IDE and build
> it?
>
> Thanks,
>
> Piper
>

-- 
---

**IMPORTANT**: The contents of this email and any attachments are 
confidential and protected by applicable laws. If you have received this 
email by mistake, please (i) notify the sender immediately; (ii) delete it 
from your database; and (iii) do not disclose the contents to anyone or 
make copies thereof. Razorpay accepts no liability caused due to any 
inadvertent/ unintentional data transmitted through this email.

---



Implementation of setBufferTimeout(timeoutMillis)

2020-08-30 Thread Pankaj Chand
Hello,

The documentation gives the following two sample lines for setting the
buffer timeout for the streaming environment or transformation.



*env.setBufferTimeout(timeoutMillis);env.generateSequence(1,10).map(new
MyMapper()).setBufferTimeout(timeoutMillis);*

I have been trying to find where (file and method) in the Flink source code
are the buffers being flushed by iteratively referring to the value of
timeoutMillis (or the default value), but have been unsuccessful. Please
help.

Thanks,

Pankaj


Re: How to use Flink IDE

2020-08-30 Thread Arvid Heise
Hi Piper,

to step into Flink source code, you don't need to import Flink sources
manually or build Flink at all. It's enough to tell IntelliJ to also
download sources for Maven dependencies. [1]

Flink automatically uploads the source code for each build. For example,
see the 1.11.1 artifacts of flink-runtime. [2]

[1]
https://intellij-support.jetbrains.com/hc/en-us/community/posts/206834305-Automatically-download-sources-documentation-from-maven-working-great
[2]
https://repo1.maven.org/maven2/org/apache/flink/flink-runtime_2.11/1.11.1/

On Sun, Aug 30, 2020 at 8:19 PM Ardhani Narasimha Swamy <
ardhani.narasi...@razorpay.com> wrote:

> Hi Piper,
>
> Welcome to Flink Community.
>
> Import flink project like any other project into IDE, only difference
> while running is you have click on  "Include dependencies with "Provided"
> scope" in the main class run configurations. This bundles the Flink
> dependencies in the artifact, making it a fat jar and deploy it.
>
>
> Steps:
>
> 1. Open main class run/debug configurations
> 2. Click on Include dependencies with Provided scope.
> 3. Apply
>
>
> Thanks,
> Narasimha
>
>
>
> On Sun, Aug 30, 2020 at 11:40 PM Piper Piper  wrote:
>
>> Hi,
>>
>> Till now, I have only been using Flink binaries. How do I setup Flink in
>> my IntelliJ IDE so that while running/debugging my Flink application
>> program I can also step into the Flink source code?
>>
>> Do I first need to import Flink's source repository into my IDE and build
>> it?
>>
>> Thanks,
>>
>> Piper
>>
>
>
> ---
> *IMPORTANT*: The contents of this email and any attachments are
> confidential and protected by applicable laws. If you have received this
> email by mistake, please (i) notify the sender immediately; (ii) delete it
> from your database; and (iii) do not disclose the contents to anyone or
> make copies thereof. Razorpay accepts no liability caused due to any
> inadvertent/ unintentional data transmitted through this email.
>
> ---
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: How to use Flink IDE

2020-08-30 Thread Piper Piper
Thank you, Narasimha and Arvid!

On Sun, Aug 30, 2020 at 3:09 PM Arvid Heise  wrote:

> Hi Piper,
>
> to step into Flink source code, you don't need to import Flink sources
> manually or build Flink at all. It's enough to tell IntelliJ to also
> download sources for Maven dependencies. [1]
>
> Flink automatically uploads the source code for each build. For example,
> see the 1.11.1 artifacts of flink-runtime. [2]
>
> [1]
> https://intellij-support.jetbrains.com/hc/en-us/community/posts/206834305-Automatically-download-sources-documentation-from-maven-working-great
> [2]
> https://repo1.maven.org/maven2/org/apache/flink/flink-runtime_2.11/1.11.1/
>
> On Sun, Aug 30, 2020 at 8:19 PM Ardhani Narasimha Swamy <
> ardhani.narasi...@razorpay.com> wrote:
>
>> Hi Piper,
>>
>> Welcome to Flink Community.
>>
>> Import flink project like any other project into IDE, only difference
>> while running is you have click on  "Include dependencies with
>> "Provided" scope" in the main class run configurations. This bundles the
>> Flink dependencies in the artifact, making it a fat jar and deploy it.
>>
>>
>> Steps:
>>
>> 1. Open main class run/debug configurations
>> 2. Click on Include dependencies with Provided scope.
>> 3. Apply
>>
>>
>> Thanks,
>> Narasimha
>>
>>
>>
>> On Sun, Aug 30, 2020 at 11:40 PM Piper Piper 
>> wrote:
>>
>>> Hi,
>>>
>>> Till now, I have only been using Flink binaries. How do I setup Flink in
>>> my IntelliJ IDE so that while running/debugging my Flink application
>>> program I can also step into the Flink source code?
>>>
>>> Do I first need to import Flink's source repository into my IDE and
>>> build it?
>>>
>>> Thanks,
>>>
>>> Piper
>>>
>>
>>
>> ---
>> *IMPORTANT*: The contents of this email and any attachments are
>> confidential and protected by applicable laws. If you have received this
>> email by mistake, please (i) notify the sender immediately; (ii) delete it
>> from your database; and (iii) do not disclose the contents to anyone or
>> make copies thereof. Razorpay accepts no liability caused due to any
>> inadvertent/ unintentional data transmitted through this email.
>>
>> ---
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


user@flink.apache.org

2020-08-30 Thread Danny Chan
Thanks for the share ~

The query you gave is actually an interval join[1] , a windowed join is two 
windowed stream join together, see [2].

Theoretically, for interval join, the state would be cleaned periodically based 
on the watermark and allowed lateness when the range of RHS had been considered 
“late”.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/joining.html

Best,
Danny Chan
在 2020年8月29日 +0800 AM12:59,Sofya T. Irwin ,写道:
> Hi Danny,
>
> Thank you for your response.
> I'm trying to join two streams that are both fairly high volume. My join 
> looks like this:
>
>   SELECT
>     A.rowtime as rowtime,
>     A.foo,
>     B.bar
>   FROM A
>   LEFT JOIN B
>     ON A.foo = B.foo
>     AND A.rowtime BETWEEN B.rowtime - INTERVAL  '1' HOUR AND B.rowtime
>
> When I run this SQL, the state size metric looks like a sawtooth that 
> gradually keeps growing.
> Currently I disabled this query because of a concern it could impact other 
> jobs.
>
> Based on your statement above, the SQL timed window is not supported?
> Is there another way I can make sure that the state only has data that is 
> only more recent?
>
> Thank you,
> Sofya
>
> > On Thu, Aug 27, 2020 at 10:49 PM Danny Chan  wrote:
> > > Hi, Sofya T. Irwin ~
> > >
> > > Can you share your case why you need a timed-window join there ?
> > >
> > > Now the sql timed window join is not supported yet, and i want to hear 
> > > your voice if it is necessary to support in SQL.
> > >
> > >
> > > > Sofya T. Irwin  于2020年7月30日周四 下午10:44写道:
> > > > > Hi,
> > > > > I'm trying to investigate a SQL job using a time-windowed join that 
> > > > > is exhibiting a large, growing state. The join syntax is most similar 
> > > > > to the interval join 
> > > > > (https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html).
> > > > >
> > > > > A few questions:
> > > > > 1. Am I correct in understanding that State TTL is generally not 
> > > > > applicable for TableAPI&SQL? So we cannot use State TTL to limit 
> > > > > state size for a join?
> > > > >
> > > > > 2. It seems that Flink should be able to expire state even without 
> > > > > explicit settings based on this: "In TableAPI&SQL and DataStream, the 
> > > > > window aggregation and time-windowed join will clear expired state 
> > > > > using Timers which is triggered by watermark."  
> > > > > (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)
> > > > >
> > > > > To clarify: Does the above mean that Flink is expected to detect 
> > > > > expired state and clear it without explicit configuration to allow it 
> > > > > to do so?
> > > > >
> > > > > 3. I've looked into setting the idle state retention time. From what 
> > > > > I can understand, this particular setting is appropriate for my use 
> > > > > case.  "TableConfig#setIdleStateRetentionTime in TableAPI&SQL is a 
> > > > > job level configuration which will enable state ttl for all 
> > > > > non-time-based operator states." 
> > > > > (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/join-state-TTL-td34760.html)
> > > > >
> > > > > To clarify: Would enabling this setting control state growth? Is this 
> > > > > only available for blink planner? Currently we are using the 
> > > > > StreamPlanner. Is there any way to ensure that idle state has limited 
> > > > > retention for applications using the StreamPlanner?
> > > > >
> > > > > Thanks ahead,
> > > > > Sofya


Re: flink watermark strategy

2020-08-30 Thread Danny Chan
Watermark mainly serves for windows for the late arrive data, it actually 
reduces your performance.

Best,
Danny Chan
在 2020年8月29日 +0800 AM3:09,Vijayendra Yadav ,写道:
> Hi Team,
>
> For regular unbounded streaming application streaming through kafka, which 
> does not use any event time or window operations, does setting watermark 
> strategy for kafkaconsumer (connector) help us in any way like performance ?
>
> Regards,
> Vijay


Re: Why consecutive calls of orderBy are forbidden?

2020-08-30 Thread hongfanxo
Hi,
Thanks for your reply.
I'll look in to the Blink planner later.

For the workaround you mentioned, in the actual usage, the second orderBy is
wrapped in a function.
So I've no idea what has been done for the input Table.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


runtime memory management

2020-08-30 Thread lec ssmi
HI:
  Generally speaking, when we submitting the flink program, the number of
taskmanager and the memory of each tn will be specified. And the smallest
real execution unit of flink should be operator.
   Since the calculation logic corresponding to each operator is different,
some need to save the state, and some don't.  Therefore, the memory size
required by each operator should be different. How does the flink program
allocate taskmanager memory to the operator by default?
  In our production practice, with the increase of traffic, some operators
(mainly stateful such as join and groupby) often have insufficient memory,
resulting in slower calculations. The usual approach is to increase the
entire taskmanager memory. But will this part of the increased memory be
allocated to the map-like operator, or that the memory itself is fetched on
demand  in the same taskmanager  whoever needs the memory will fetch it
until the memory is used up,  in other words, there is no preset memory
allocation ratio. For a complex streaming job, is there any way to tilt the
memory towards stateful operators?

 Thanks.


Packaging multiple Flink jobs from a single IntelliJ project

2020-08-30 Thread Manas Kale
Hi,
I have an IntelliJ project that has multiple classes with main() functions.
I want to package this project as a JAR that I can submit to the Flink
cluster and specify the entry class when I start the job. Here are my
questions:

   - I am not really familiar with Maven and would appreciate some
   pointers/examples. From what I understand, I will need to use some sort of
   transformer in the Maven shade plugin to merge all of the classes. *If
   this is correct, can I see a small example? *
   - Also, I can't get a single main class working:


http://maven.apache.org/POM/4.0.0";
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
   4.0.0

   flink_summarization
   flink_summarization
   0.1
   jar

   Flink Quickstart Job
   http://www.myorganization.org

   
  UTF-8
  1.10.1
  1.8
  2.11
  ${java.version}
  ${java.version}
   

   
  
 apache.snapshots
 Apache Development Snapshot Repository
 
https://repository.apache.org/content/repositories/snapshots/
 
false
 
 
true
 
  
   

   
  
  
  
  
 org.apache.flink
 flink-java
 ${flink.version}
 provided
  
  
 org.apache.flink
 flink-streaming-java_${scala.binary.version}
 ${flink.version}
 provided
  

  
 org.apache.flink
 flink-connector-kafka_2.11
 ${flink.version}
  

  
 org.apache.flink
 flink-state-processor-api_2.11
 ${flink.version}
 provided
  

  
 org.apache.flink
 flink-connector-jdbc_2.11
 1.11.0
  

  
  
  
 org.slf4j
 slf4j-log4j12
 1.7.7
 runtime
  
  
 log4j
 log4j
 1.2.17
 runtime
  

  
  
 org.apache.flink
 flink-test-utils_${scala.binary.version}
 ${flink.version}
 test
  
  
 org.apache.flink
 flink-runtime_2.11
 ${flink.version}
 test
 tests
  
  
 org.apache.flink
 flink-streaming-java_2.11
 ${flink.version}
 test
 tests
  
  
 org.assertj
 assertj-core
 
 3.16.1
 test
  


   

   
  

 
 
org.apache.maven.plugins
maven-compiler-plugin
3.1

   ${java.version}
   ${java.version}

 

 
 
 
org.apache.maven.plugins
maven-shade-plugin
3.0.0


   false


   
   
  package
  
 shade
  
  
 

   org.apache.flink:force-shading
   com.google.code.findbugs:jsr305
   org.slf4j:*
   log4j:*

 
 

   
   *:*
   
  META-INF/*.SF
  META-INF/*.DSA
  META-INF/*.RSA
   

 
 


iu.feature_summarization.basic_features.pre.BasicPreProcessJob


 
  
   

 
  

  
 



   org.eclipse.m2e
   lifecycle-mapping
   1.0.0
   
  
 

   
  org.apache.maven.plugins
  maven-shade-plugin
  [3.0.0,)
  
 shade
  
   
   
  
   


   
  org.apache.maven.plugins
  maven-compiler-plugin
  [3.1,)
  
 testCompile
 compile
  
   
   
  
   

 

Re: Implementation of setBufferTimeout(timeoutMillis)

2020-08-30 Thread Yun Gao
Hi Pankaj,

I think it should be in 
org.apache.flink.runtime.io.network.api.writer.RecordWriter$OutputFlusher.

Best,
 Yun



--
Sender:Pankaj Chand
Date:2020/08/31 02:40:15
Recipient:user
Theme:Implementation of setBufferTimeout(timeoutMillis)

Hello,

The documentation gives the following two sample lines for setting the buffer 
timeout for the streaming environment or transformation.

env.setBufferTimeout(timeoutMillis);
env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

I have been trying to find where (file and method) in the Flink source code are 
the buffers being flushed by iteratively referring to the value of 
timeoutMillis (or the default value), but have been unsuccessful. Please help.

Thanks,

Pankaj 


Re: Exception on s3 committer

2020-08-30 Thread Yun Gao

Hi Ivan,

   I think there might be some points to check:

   1. Is the job restored from the latest successful checkpoint after restart ? 
   2. Have you ever changed the timeout settings for uncompleted multipart 
upload ?
   3. Does cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804 
exist or not ?

Best,
 Yun


 --Original Mail --
Sender:Ivan Yang 
Send Date:Sat Aug 29 12:43:28 2020
Recipients:user 
Subject:Exception on s3 committer
Hi all,

We got this exception after a job restart. Does anyone know what may lead to 
this situation? and how to get pass this Checkpoint issue? Prior to this, the 
job failed due to “Checkpoint expired before completing.” We are s3 heavy, 
writing out 10K files to s3 every 10 minutes using StreamingFileSink/BulkFormat 
to various s3 prefixes. Thanks in advance. -Ivan

2020-08-28 15:17:58
java.io.IOException: Recovering commit failed for object 
cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804. Object does 
not exist and MultiPart Upload 
3OnIJwYXCxm8fkHpphQOiCdjgfy3jTBqBcg8SbscYJFg0Etl4GoDpPiBms9HUfF_3f7AwL5CyQF4Ne.KDIOKk4aXecP2QRkTTlbbTT8_SnS3Dky.SF7zvDuuMZP9YWlFwtT79rWErOB9K4YPIzUnc4GhUQv4AQIPDF4Nav0ppiw-
 is not valid.
at 
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:102)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:179)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:148)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:122)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:379)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:63)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:176)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:164)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:148)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.(StreamingFileSinkHelper.java:74)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:399)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: Completing multipart commit on 
cbd/late-landing/event_date=2020-08-28/event_hour=16/part-5-264804: 
com.amazonaws.services.s3.model.AmazonS3Exception: The specified upload does 
not exist. The upload ID may be invalid, or the upload may have been aborted or 
completed. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchUpload; 
Request ID: 9A99AFAD80A8F202; S3 Extended Request ID: 
fjORHBv8K4a5nJ3yULudLjEVgc8vTVro04rYuXC26CQzWs3KMGhoKp/R33g9v4Qi6qN/DsVjENw=), 
S3 Extended Request ID: 
fjORHBv8K4a5nJ3yULudLjEVgc8vTVro04rYuXC26CQzWs3KMGhoKp/R33g9v4Qi6qN/DsVjENw=:NoSuchUpload
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:225)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
at 
org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUplo

Re: Packaging multiple Flink jobs from a single IntelliJ project

2020-08-30 Thread Manas Kale
Hi,
I solved my second issue - I was not following Maven's convention for
placing source code (I had not placed my source in src/main/java).
However, I still would like some help with my first question - what is the
recommended way to set a project with multiple main() classes? At the end,
I would like to be able to run each main() class as a separate job. Should
I create a single JAR and specify different entrypoint classes each time or
should I create separate JARs for each main() class?

On Mon, Aug 31, 2020 at 11:13 AM Manas Kale  wrote:

> Hi,
> I have an IntelliJ project that has multiple classes with main()
> functions. I want to package this project as a JAR that I can submit to the
> Flink cluster and specify the entry class when I start the job. Here are my
> questions:
>
>- I am not really familiar with Maven and would appreciate some
>pointers/examples. From what I understand, I will need to use some sort of
>transformer in the Maven shade plugin to merge all of the classes. *If
>this is correct, can I see a small example? *
>- Also, I can't get a single main class working:
>
>
> http://maven.apache.org/POM/4.0.0"; 
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>4.0.0
>
>flink_summarization
>flink_summarization
>0.1
>jar
>
>Flink Quickstart Job
>http://www.myorganization.org
>
>
>   UTF-8
>   1.10.1
>   1.8
>   2.11
>   ${java.version}
>   ${java.version}
>
>
>
>   
>  apache.snapshots
>  Apache Development Snapshot Repository
>  
> https://repository.apache.org/content/repositories/snapshots/
>  
> false
>  
>  
> true
>  
>   
>
>
>
>   
>   
>   
>   
>  org.apache.flink
>  flink-java
>  ${flink.version}
>  provided
>   
>   
>  org.apache.flink
>  flink-streaming-java_${scala.binary.version}
>  ${flink.version}
>  provided
>   
>
>   
>  org.apache.flink
>  flink-connector-kafka_2.11
>  ${flink.version}
>   
>
>   
>  org.apache.flink
>  flink-state-processor-api_2.11
>  ${flink.version}
>  provided
>   
>
>   
>  org.apache.flink
>  flink-connector-jdbc_2.11
>  1.11.0
>   
>
>   
>   
>   
>  org.slf4j
>  slf4j-log4j12
>  1.7.7
>  runtime
>   
>   
>  log4j
>  log4j
>  1.2.17
>  runtime
>   
>
>   
>   
>  org.apache.flink
>  flink-test-utils_${scala.binary.version}
>  ${flink.version}
>  test
>   
>   
>  org.apache.flink
>  flink-runtime_2.11
>  ${flink.version}
>  test
>  tests
>   
>   
>  org.apache.flink
>  flink-streaming-java_2.11
>  ${flink.version}
>  test
>  tests
>   
>   
>  org.assertj
>  assertj-core
>  
>  3.16.1
>  test
>   
>
>
>
>
>
>   
>
>  
>  
> org.apache.maven.plugins
> maven-compiler-plugin
> 3.1
> 
>${java.version}
>${java.version}
> 
>  
>
>  
>  
>  
> org.apache.maven.plugins
> maven-shade-plugin
> 3.0.0
> 
> 
>false
> 
> 
>
>
>   package
>   
>  shade
>   
>   
>  
> 
>org.apache.flink:force-shading
>com.google.code.findbugs:jsr305
>org.slf4j:*
>log4j:*
> 
>  
>  
> 
>
>*:*
>
>   META-INF/*.SF
>   META-INF/*.DSA
>   META-INF/*.RSA
>
> 
>  
>  
>  implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>
> iu.feature_summarization.basic_features.pre.BasicPreProcessJob
> 
>
>  
>   
>
> 
>  
>   
>
>   
>  
>
> 
> 
>org

Re: runtime memory management

2020-08-30 Thread Xintong Song
Hi,

For a complex streaming job, is there any way to tilt the memory towards
> stateful operators?

If streaming jobs are interested, the quick answer is no. Memory is fetched
on demand for all operators.

Currently, only managed memory for batch jobs are pre-planned for each
operator.

Thank you~

Xintong Song



On Mon, Aug 31, 2020 at 1:33 PM lec ssmi  wrote:

> HI:
>   Generally speaking, when we submitting the flink program, the number of
> taskmanager and the memory of each tn will be specified. And the smallest
> real execution unit of flink should be operator.
>Since the calculation logic corresponding to each operator is
> different, some need to save the state, and some don't.  Therefore, the
> memory size required by each operator should be different. How does the
> flink program allocate taskmanager memory to the operator by default?
>   In our production practice, with the increase of traffic, some operators
> (mainly stateful such as join and groupby) often have insufficient memory,
> resulting in slower calculations. The usual approach is to increase the
> entire taskmanager memory. But will this part of the increased memory be
> allocated to the map-like operator, or that the memory itself is fetched on
> demand  in the same taskmanager  whoever needs the memory will fetch it
> until the memory is used up,  in other words, there is no preset memory
> allocation ratio. For a complex streaming job, is there any way to tilt the
> memory towards stateful operators?
>
>  Thanks.
>
>
>
>