Is there a way to find the age of an element in a Global window?

2019-01-18 Thread Kumar Bolar, Harshith
Hi all,

I'm using Global Windows for my application with a custom trigger and custom 
evictor based on some conditions. Now, I also want to evict those elements from 
the window that have stayed there for too long, let's say 30 mins. How would I 
go about doing this? Is there a utility that Flink provides that lets me know 
what the age of an element in a window is?

Thanks,
Harshith



Should the entire cluster be restarted if a single Task Manager crashes?

2019-01-18 Thread Kumar Bolar, Harshith
Hi all,

We're running a standalone Flink cluster with 2 Job Managers and 3 Task 
Managers. Whenever a TM crashes, we simply restart that particular TM and 
proceed with the processing.

But reading the comments on 
this
 question makes it look like we need to restart all the 5 nodes that form a 
cluster to deal with the failure of a single TM. Am I reading this right? What 
would be the consequences if we restart just the crashed TM and let the healthy 
ones run as is?

Thanks,
Harshith



[DISCUSS] Towards a leaner flink-dist

2019-01-18 Thread Chesnay Schepler

Hello,

the binary distribution that we release by now contains quite a lot of 
optional components, including various filesystems, metric reporters and 
libraries. Most users will only use a fraction of these, and as such 
pretty much only increase the size of flink-dist.


With Flink growing more and more in scope I don't believe it to be 
feasible to ship everything we have with every distribution, and instead 
suggest more of a "pick-what-you-need" model, where flink-dist is rather 
lean and additional components are downloaded separately and added by 
the user.


This would primarily affect the /opt directory, but could also be 
extended to cover flink-dist. For example, the yarn and mesos code could 
be spliced out into separate jars that could be added to lib manually.


Let me know what you think.

Regards,

Chesnay



Re: NPE when using spring bean in custom input format

2019-01-18 Thread madan
Suggestions please.

Thinking of options
1. Initilizing spring application context in the 'open' method. Instead of
loading entire context, move service related beans to one/multiple
packages  and scan only those packages. Requires code refactoring.
2. Direct database query - direct query cannot be used since business logic
is around while fetching records
3. Write initially to csv and do transformation on csv. Last possible
option.

Please share your thoughts.

Thank you.

On Wed, Jan 16, 2019 at 2:50 PM madan  wrote:

> Hi,
>
> Need help in the below scenario,
>
> I have CustomInputFormat which loads the records using a bean,
>
> public class CustomInputFormat extends GenericInputFormat {
>
>   private Iterator> recordsIterator;
>
>   @Override
>
> public void open(GenericInputSplit split) throws IOException {
>
>ServiceX serviceX = SpringBeanFinder.getBean(ServiceX.class);
>
> recordsIterator = serviceX.getRecords(..);
>
>  }
>
> }
>
> The above input format works fine when using Flink LocalEnvironment in
> spring application. Problem is when running flink in a cluster mode and
> trying to connect to it using RemoveEnvironment. Since Spring applicaiton
> context will not be initialized, NPE is thrown. Please suggest what could
> be the solution in this scenario.
>
>
>
> --
> Thank you,
> Madan.
>


-- 
Thank you,
Madan.


Re: Should the entire cluster be restarted if a single Task Manager crashes?

2019-01-18 Thread Fabian Hueske
Hi Harshith,

No, you don't need to restart the whole cluster. Flink only needs enough
processing slots to recover the job.
If you have a standby TM, the job should restart immediately (according to
its restart policy). Otherwise, you have to start a new TM to provide more
slots. Once the slots are registered, the job recovers.

Best,
Fabian

Am Fr., 18. Jan. 2019 um 10:53 Uhr schrieb Kumar Bolar, Harshith <
hk...@arity.com>:

> Hi all,
>
>
>
> We're running a standalone Flink cluster with 2 Job Managers and 3 Task
> Managers. Whenever a TM crashes, we simply restart that particular TM and
> proceed with the processing.
>
>
>
> But reading the comments on this
> 
>  question
> makes it look like we need to restart all the 5 nodes that form a cluster
> to deal with the failure of a single TM. Am I reading this right? What
> would be the consequences if we restart just the crashed TM and let the
> healthy ones run as is?
>
>
>
> Thanks,
>
> Harshith
>
>
>


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-18 Thread Fabian Hueske
Hi Chesnay,

Thank you for the proposal.
I think this is a good idea.
We follow a similar approach already for Hadoop dependencies and connectors
(although in application space).

+1

Fabian

Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
ches...@apache.org>:

> Hello,
>
> the binary distribution that we release by now contains quite a lot of
> optional components, including various filesystems, metric reporters and
> libraries. Most users will only use a fraction of these, and as such
> pretty much only increase the size of flink-dist.
>
> With Flink growing more and more in scope I don't believe it to be
> feasible to ship everything we have with every distribution, and instead
> suggest more of a "pick-what-you-need" model, where flink-dist is rather
> lean and additional components are downloaded separately and added by
> the user.
>
> This would primarily affect the /opt directory, but could also be
> extended to cover flink-dist. For example, the yarn and mesos code could
> be spliced out into separate jars that could be added to lib manually.
>
> Let me know what you think.
>
> Regards,
>
> Chesnay
>
>


Re: Re: Should the entire cluster be restarted if a single Task Manager crashes?

2019-01-18 Thread Kumar Bolar, Harshith
Thanks a lot for clarifying :-)

- Harshith

From: Fabian Hueske 
Date: Friday, 18 January 2019 at 4:31 PM
To: Harshith Kumar Bolar 
Cc: "user@flink.apache.org" 
Subject: [External] Re: Should the entire cluster be restarted if a single Task 
Manager crashes?

Hi Harshith,

No, you don't need to restart the whole cluster. Flink only needs enough 
processing slots to recover the job.
If you have a standby TM, the job should restart immediately (according to its 
restart policy). Otherwise, you have to start a new TM to provide more slots. 
Once the slots are registered, the job recovers.

Best,
Fabian

Am Fr., 18. Jan. 2019 um 10:53 Uhr schrieb Kumar Bolar, Harshith 
mailto:hk...@arity.com>>:
Hi all,

We're running a standalone Flink cluster with 2 Job Managers and 3 Task 
Managers. Whenever a TM crashes, we simply restart that particular TM and 
proceed with the processing.

But reading the comments on 
this
 question makes it look like we need to restart all the 5 nodes that form a 
cluster to deal with the failure of a single TM. Am I reading this right? What 
would be the consequences if we restart just the crashed TM and let the healthy 
ones run as is?

Thanks,
Harshith



Flink 1.7.1 job is stuck in running state

2019-01-18 Thread Piotr Szczepanek
Hello,
we have scenario with running Data Processing jobs that generates export
files on demand. Our first approach was using ClusterClient, but recently
we switched to REST API for job submittion. In the meantime we switched to
flink 1.7.1 and that started to cause a problems.
Some of our jobs are stuck, not processing any data. Task Managers have
info that Chain is switching to RUNNING, and then nothing happenes.
In TM's stdout logs we can see that for some reason log is cut, e.g.:

Jan 10, 2019 4:28:33 PM INFO:
org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
initialized will read a total of 615 records.
Jan 10, 2019 4:28:33 PM INFO:
org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
next block
Jan 10, 2019 4:28:33 PM INFO:
org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
in 63 ms. row count = 615
Jan 10, 2019 4:28:33 PM WARNING:
org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
due to context is not a instance of TaskInputOutputContext, but is
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Jan 10, 2019 4:28:33 PM INFO:
org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
initialized will read a total of 140 records.
Jan 10, 2019 4:28:33 PM INFO:
org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
next block
Jan 10, 2019 4:28:33 PM INFO:
org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
in 2 ms. row count = 140
Jan 10, 2019 4:28:33 PM WARNING:
org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
due to context is not a instance of TaskInputOutputContext, but is
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Jan 10, 2019 4:28:33 PM INFO: or

As you can see, last line is cut in the middle, and nothing happenes later
on.
None of counters ( records/bytes sent/read) are increased.
We switched debug on on both TMs and JM but only thing they are showing up
are sending heartbeats between each other.
Do you have any idea what could be a problem? and how we could deal with
them or at least try to investigate? Is there any timeout/config that we
could try to enable?


Re: Flink 1.7.1 job is stuck in running state

2019-01-18 Thread Gary Yao
Hi Piotr,

What was the version you were using before 1.7.1?
How do you deploy your cluster, e.g., YARN, standalone?
Can you attach full TM and JM logs?

Best,
Gary

On Fri, Jan 18, 2019 at 3:05 PM Piotr Szczepanek 
wrote:

> Hello,
> we have scenario with running Data Processing jobs that generates export
> files on demand. Our first approach was using ClusterClient, but recently
> we switched to REST API for job submittion. In the meantime we switched to
> flink 1.7.1 and that started to cause a problems.
> Some of our jobs are stuck, not processing any data. Task Managers have
> info that Chain is switching to RUNNING, and then nothing happenes.
> In TM's stdout logs we can see that for some reason log is cut, e.g.:
>
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
> initialized will read a total of 615 records.
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
> next block
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
> in 63 ms. row count = 615
> Jan 10, 2019 4:28:33 PM WARNING:
> org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
> due to context is not a instance of TaskInputOutputContext, but is
> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
> initialized will read a total of 140 records.
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
> next block
> Jan 10, 2019 4:28:33 PM INFO:
> org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
> in 2 ms. row count = 140
> Jan 10, 2019 4:28:33 PM WARNING:
> org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
> due to context is not a instance of TaskInputOutputContext, but is
> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
> Jan 10, 2019 4:28:33 PM INFO: or
>
> As you can see, last line is cut in the middle, and nothing happenes later
> on.
> None of counters ( records/bytes sent/read) are increased.
> We switched debug on on both TMs and JM but only thing they are showing up
> are sending heartbeats between each other.
> Do you have any idea what could be a problem? and how we could deal with
> them or at least try to investigate? Is there any timeout/config that we
> could try to enable?
>


Re: Flink 1.7.1 job is stuck in running state

2019-01-18 Thread Piotr Szczepanek
Hey Gary,
thanks for your reply.
Before we have been using Flink version 1.5.2.
With both version we're using Flink deployed on Yarn.

Regarding log would you like to have log entries with DEBUG enabled or INFO
would be enough?

Thanks,
Piotr

pt., 18 sty 2019 o 15:14 Gary Yao  napisał(a):

> Hi Piotr,
>
> What was the version you were using before 1.7.1?
> How do you deploy your cluster, e.g., YARN, standalone?
> Can you attach full TM and JM logs?
>
> Best,
> Gary
>
> On Fri, Jan 18, 2019 at 3:05 PM Piotr Szczepanek <
> piotr.szczepa...@gmail.com> wrote:
>
>> Hello,
>> we have scenario with running Data Processing jobs that generates export
>> files on demand. Our first approach was using ClusterClient, but recently
>> we switched to REST API for job submittion. In the meantime we switched to
>> flink 1.7.1 and that started to cause a problems.
>> Some of our jobs are stuck, not processing any data. Task Managers have
>> info that Chain is switching to RUNNING, and then nothing happenes.
>> In TM's stdout logs we can see that for some reason log is cut, e.g.:
>>
>> Jan 10, 2019 4:28:33 PM INFO:
>> org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
>> initialized will read a total of 615 records.
>> Jan 10, 2019 4:28:33 PM INFO:
>> org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
>> next block
>> Jan 10, 2019 4:28:33 PM INFO:
>> org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
>> in 63 ms. row count = 615
>> Jan 10, 2019 4:28:33 PM WARNING:
>> org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
>> due to context is not a instance of TaskInputOutputContext, but is
>> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
>> Jan 10, 2019 4:28:33 PM INFO:
>> org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
>> initialized will read a total of 140 records.
>> Jan 10, 2019 4:28:33 PM INFO:
>> org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
>> next block
>> Jan 10, 2019 4:28:33 PM INFO:
>> org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
>> in 2 ms. row count = 140
>> Jan 10, 2019 4:28:33 PM WARNING:
>> org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
>> due to context is not a instance of TaskInputOutputContext, but is
>> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
>> Jan 10, 2019 4:28:33 PM INFO: or
>>
>> As you can see, last line is cut in the middle, and nothing happenes
>> later on.
>> None of counters ( records/bytes sent/read) are increased.
>> We switched debug on on both TMs and JM but only thing they are showing
>> up are sending heartbeats between each other.
>> Do you have any idea what could be a problem? and how we could deal with
>> them or at least try to investigate? Is there any timeout/config that we
>> could try to enable?
>>
>


Re: Flink 1.7.1 job is stuck in running state

2019-01-18 Thread Gary Yao
Hi Piotr,

Ideally on DEBUG level.

Best,
Gary

On Fri, Jan 18, 2019 at 3:41 PM Piotr Szczepanek 
wrote:

> Hey Gary,
> thanks for your reply.
> Before we have been using Flink version 1.5.2.
> With both version we're using Flink deployed on Yarn.
>
> Regarding log would you like to have log entries with DEBUG enabled or
> INFO would be enough?
>
> Thanks,
> Piotr
>
> pt., 18 sty 2019 o 15:14 Gary Yao  napisał(a):
>
>> Hi Piotr,
>>
>> What was the version you were using before 1.7.1?
>> How do you deploy your cluster, e.g., YARN, standalone?
>> Can you attach full TM and JM logs?
>>
>> Best,
>> Gary
>>
>> On Fri, Jan 18, 2019 at 3:05 PM Piotr Szczepanek <
>> piotr.szczepa...@gmail.com> wrote:
>>
>>> Hello,
>>> we have scenario with running Data Processing jobs that generates export
>>> files on demand. Our first approach was using ClusterClient, but recently
>>> we switched to REST API for job submittion. In the meantime we switched to
>>> flink 1.7.1 and that started to cause a problems.
>>> Some of our jobs are stuck, not processing any data. Task Managers have
>>> info that Chain is switching to RUNNING, and then nothing happenes.
>>> In TM's stdout logs we can see that for some reason log is cut, e.g.:
>>>
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
>>> initialized will read a total of 615 records.
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
>>> next block
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
>>> in 63 ms. row count = 615
>>> Jan 10, 2019 4:28:33 PM WARNING:
>>> org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
>>> due to context is not a instance of TaskInputOutputContext, but is
>>> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: RecordReader
>>> initialized will read a total of 140 records.
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: at row 0. reading
>>> next block
>>> Jan 10, 2019 4:28:33 PM INFO:
>>> org.apache.parquet.hadoop.InternalParquetRecordReader: block read in memory
>>> in 2 ms. row count = 140
>>> Jan 10, 2019 4:28:33 PM WARNING:
>>> org.apache.parquet.hadoop.ParquetRecordReader: Can not initialize counter
>>> due to context is not a instance of TaskInputOutputContext, but is
>>> org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
>>> Jan 10, 2019 4:28:33 PM INFO: or
>>>
>>> As you can see, last line is cut in the middle, and nothing happenes
>>> later on.
>>> None of counters ( records/bytes sent/read) are increased.
>>> We switched debug on on both TMs and JM but only thing they are showing
>>> up are sending heartbeats between each other.
>>> Do you have any idea what could be a problem? and how we could deal with
>>> them or at least try to investigate? Is there any timeout/config that we
>>> could try to enable?
>>>
>>


Re: Re: Is there a way to find the age of an element in a Global window?

2019-01-18 Thread Kumar Bolar, Harshith
Thanks. That makes sense :)

From: Kostas Kloudas 
Date: Friday, 18 January 2019 at 8:25 PM
To: Harshith Kumar Bolar 
Cc: "user@flink.apache.org" 
Subject: [External] Re: Is there a way to find the age of an element in a 
Global window?

Hi Harshith,

The evictor has 2 methods:
void evictBefore(Iterable> elements, int size, W window, 
EvictorContext evictorContext);
void evictAfter(Iterable> elements, int size, W window, 
EvictorContext evictorContext);

In the iterables, you have access to the elements and their timestamps, and the 
evictor context gives you access to the current watermark
and current processing time.

Based on this information, you can call remove on the iterator created by the 
iterable and clean up the elements that you want to remove.
If you operate on event time, and you want to clean up base on processing time, 
then you can put a processFunction or a map before
you window operator, put the System.currentTimeMillis in the record itself, and 
the use the evictor and the currentProcessing time to clean up.

I hope this helps,
Kostas


On Fri, Jan 18, 2019 at 9:25 AM Kumar Bolar, Harshith 
mailto:hk...@arity.com>> wrote:
Hi all,

I'm using Global Windows for my application with a custom trigger and custom 
evictor based on some conditions. Now, I also want to evict those elements from 
the window that have stayed there for too long, let's say 30 mins. How would I 
go about doing this? Is there a utility that Flink provides that lets me know 
what the age of an element in a window is?

Thanks,
Harshith



Re: Is there a way to find the age of an element in a Global window?

2019-01-18 Thread Kostas Kloudas
Hi Harshith,

The evictor has 2 methods:
void evictBefore(Iterable> elements, int size, W
window, EvictorContext evictorContext);
void evictAfter(Iterable> elements, int size, W window,
EvictorContext evictorContext);

In the iterables, you have access to the elements and their timestamps, and
the evictor context gives you access to the current watermark
and current processing time.

Based on this information, you can call remove on the iterator created by
the iterable and clean up the elements that you want to remove.
If you operate on event time, and you want to clean up base on processing
time, then you can put a processFunction or a map before
you window operator, put the System.currentTimeMillis in the record itself,
and the use the evictor and the currentProcessing time to clean up.

I hope this helps,
Kostas


On Fri, Jan 18, 2019 at 9:25 AM Kumar Bolar, Harshith 
wrote:

> Hi all,
>
>
>
> I'm using Global Windows for my application with a custom trigger and
> custom evictor based on some conditions. Now, I also want to evict those
> elements from the window that have stayed there for too long, let's say 30
> mins. How would I go about doing this? Is there a utility that Flink
> provides that lets me know what the age of an element in a window is?
>
>
>
> Thanks,
>
> Harshith
>
>
>


Re: ElasticSearch RestClient throws NoSuchMethodError due to shade mechanism

2019-01-18 Thread Gary Yao
Hi Henry,

Can you share your pom.xml and the full stacktrace with us? It is expected
behavior that org.elasticsearch.client.RestClientBuilder is not shaded. That
class comes from the elasticsearch Java client, and we only shade its
transitive dependencies. Could it be that you have a dependency in your
job's pom.xml to a different version of the elasticsearch client?

Best,
Gary

On Tue, Jan 15, 2019 at 11:39 AM 徐涛  wrote:

> Hi All,
> I use the following code try to build a RestClient
> org.elasticsearch.client.RestClient.builder(  new HttpHost(xxx,
> xxx,"http")  ).build()
> but when in running time, a NoSuchMethodError throws out, I think the
> reason is:
> There are two RestClient classes, one is in the jar I include, the other
> one is in flink-connector-elasticsearch5, but the argument of build method
> in flink-connector-elasticsearch5 is
> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.apache.http.HttpHost.
> So I want to know why org.elasticsearch.client.RestClientBuilder is not
> shaded, so runtime class conflict could be avoided?
>
>* public static RestClientBuilder
> builder(org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.apache.http.HttpHost...
> hosts) {*
> *return new RestClientBuilder(hosts);*
> *}*
>
> Best
> Henry
>


Re: [DISCUSS] Towards a leaner flink-dist

2019-01-18 Thread Jamie Grier
I'm not sure if this is required.  It's quite convenient to be able to just
grab a single tarball and you've got everything you need.

I just did this for the latest binary release and it was 273MB and took
about 25 seconds to download.  Of course I know connection speeds vary
quite a bit but I don't think 273 MB seems onerous to download and I like
the simplicity of it the way it is.



On Fri, Jan 18, 2019 at 3:34 AM Fabian Hueske  wrote:

> Hi Chesnay,
>
> Thank you for the proposal.
> I think this is a good idea.
> We follow a similar approach already for Hadoop dependencies and
> connectors (although in application space).
>
> +1
>
> Fabian
>
> Am Fr., 18. Jan. 2019 um 10:59 Uhr schrieb Chesnay Schepler <
> ches...@apache.org>:
>
>> Hello,
>>
>> the binary distribution that we release by now contains quite a lot of
>> optional components, including various filesystems, metric reporters and
>> libraries. Most users will only use a fraction of these, and as such
>> pretty much only increase the size of flink-dist.
>>
>> With Flink growing more and more in scope I don't believe it to be
>> feasible to ship everything we have with every distribution, and instead
>> suggest more of a "pick-what-you-need" model, where flink-dist is rather
>> lean and additional components are downloaded separately and added by
>> the user.
>>
>> This would primarily affect the /opt directory, but could also be
>> extended to cover flink-dist. For example, the yarn and mesos code could
>> be spliced out into separate jars that could be added to lib manually.
>>
>> Let me know what you think.
>>
>> Regards,
>>
>> Chesnay
>>
>>


Re: Any advice on how to replay an event-timed stream?

2019-01-18 Thread Jamie Grier
So, do you mean to have your application running in real-time and use the
same instance of it to also process historical data at the same time?

If that's the case then I would advise not to try to do it that way.  What
I would recommend instead is to process that historical data with another
instance of the application.

If this isn't what you're trying to accomplish please be more thorough in
your explanation..  Thanks.

-Jamie


On Thu, Jan 17, 2019 at 10:34 PM Kanstantsin Kamkou 
wrote:

> Thanks for the reply. As mentioned before the data comes from the
> database. Timestams are from one months ago. And I’m searching a way on how
> to dump this data into a working flink application which already processed
> this data (watermarks are far away from those dates).
>
> On Fri 18. Jan 2019 at 03:22, Jamie Grier  wrote:
>
>> I don't think I understood all of your question but with regard to the
>> watermarking and keys..  You are correct that watermarking (event time
>> advancement) is not per key.  Event-time is a local property of each Task
>> in an executing Flink job.  It has nothing to do with keys.  It has only to
>> do with the input data timestamps seen by each task and the watermarking
>> function (which isn't per-key).
>>
>> I hope that helps.
>>
>> With regard to how to play historical data..  Well there are many ways to
>> approach that.  Can you narrow down your constraints?  Where does the
>> historical data live?
>>
>> -Jamie
>>
>>
>>
>>
>> On Thu, Jan 17, 2019 at 4:36 PM Kanstantsin Kamkou 
>> wrote:
>>
>>> Hi guys! As I understood (I hope I’m wrong) the current design concept
>>> of the watermarking mechanism is that it tight to the latest watermark and
>>> there is no way to separate those watermarks by key in keyed stream (I hope
>>> at some point it’l be mentioned in the documentation as it unfortunately
>>> misleading). Could you share your thoughts on how to replay historical data
>>> in event–time manner (i.e. from db to working application)? The solution
>>> with the processing time is not suitable here as the sessions windows are
>>> needed.
>>>
>>> Thank you!
>>>
>>
>
> --
> Best regards, Kanstantsin Kamkou
> email:  kkam...@gmail.com
> web: http://2ka.by/
> mobile: +49 172 5432334
> skype: kkamkou
>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Jamie Grier
Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
PURGES but only FIRES what I said is semantically true.  The window
contents are never cleared.

What I missed is that in this case since you're using a function that
incrementally reduces on the fly rather than processing all the data when
it's triggered your state is always kept to one element per key.  Your'e
correct but in general with non-incremental window functions the state
would grow unbounded in this configuration.

So it looks like your approach should work just fine.

-Jamie



On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:

> Hello Jamie.
>
> Thanks for taking a look at this. So, yes, I want to write only the last
> data for each key every X minutes. In other words, I want a snapshot of the
> whole database every X minutes.
>
> >  The issue is that the window never get's PURGED so the data just
> > continues to accumulate in the window.  This will grow without bound.
>
> The window not being purged does not necessarily mean that the data will be
> accumulated indefinitely. How so? Well, Flink has two mechanisms to remove
> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.
>
> The reduce function has an implicit evictor that automatically removes
> events from the window pane that are no longer needed. i.e. it keeps in
> state only the element that was reduced. Here is an example:
>
> env.socketTextStream("localhost", )
>   .keyBy { it.first().toString() }
>   .window(GlobalWindows.create())
>
> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>   .reduce { left, right ->
> println("left: $left, right: $right")
> if (left.length > right.length) {
>   left
> } else {
>   right
> }
>   }
>   .printToErr()
>
> For your claim to hold true, every time the trigger fires one would expect
> to see ALL the elements by a key being printed over and over again in the
> reduce function. However, if you run a job similar to this one in your lang
> of choice, you will notice that the print statement is effectively called
> only once per event per key.
>
> In fact, not using purge is intentional. Because I want to hold every
> record
> (the last one by its primary key) of the database in state so that I can
> write a snapshot of the whole database.
>
> So for instance, let's say my table has two columns: id and time. And I
> have
> the following events:
>
> 1,January
> 2,February
> 1,March
>
> I want to write to S3 two records: "1,March", and "2,February".
>
> Now, let's say two more events come into the stream:
>
> 3,April
> 1,June
>
> Then I want to write to S3 three records: "1,June", "2,February" and
> "3,April".
>
> In other words, I can't just purge the windows, because I would lose the
> record with id 2.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Any advice on how to replay an event-timed stream?

2019-01-18 Thread Kanstantsin Kamkou
Yeah, that’s what I have so far in my solutions pocket. Another problem is
to spawn a huge application just to process a hundred entries... :(

If you want the whole picture: there is a number of devices with internal
acknowledgment system to guarantee the order. Nevertheless sometimes the
network might be down for one particular device for days. The task more or
less is to replay the whole missing set or process this out-of-order data
preserving session window functionality.

On Fri 18. Jan 2019 at 17:04, Jamie Grier  wrote:

> So, do you mean to have your application running in real-time and use the
> same instance of it to also process historical data at the same time?
>
> If that's the case then I would advise not to try to do it that way.  What
> I would recommend instead is to process that historical data with another
> instance of the application.
>
> If this isn't what you're trying to accomplish please be more thorough in
> your explanation..  Thanks.
>
> -Jamie
>
>
> On Thu, Jan 17, 2019 at 10:34 PM Kanstantsin Kamkou 
> wrote:
>
>> Thanks for the reply. As mentioned before the data comes from the
>> database. Timestams are from one months ago. And I’m searching a way on how
>> to dump this data into a working flink application which already processed
>> this data (watermarks are far away from those dates).
>>
>> On Fri 18. Jan 2019 at 03:22, Jamie Grier  wrote:
>>
>>> I don't think I understood all of your question but with regard to the
>>> watermarking and keys..  You are correct that watermarking (event time
>>> advancement) is not per key.  Event-time is a local property of each Task
>>> in an executing Flink job.  It has nothing to do with keys.  It has only to
>>> do with the input data timestamps seen by each task and the watermarking
>>> function (which isn't per-key).
>>>
>>> I hope that helps.
>>>
>>> With regard to how to play historical data..  Well there are many ways
>>> to approach that.  Can you narrow down your constraints?  Where does the
>>> historical data live?
>>>
>>> -Jamie
>>>
>>>
>>>
>>>
>>> On Thu, Jan 17, 2019 at 4:36 PM Kanstantsin Kamkou 
>>> wrote:
>>>
 Hi guys! As I understood (I hope I’m wrong) the current design concept
 of the watermarking mechanism is that it tight to the latest watermark and
 there is no way to separate those watermarks by key in keyed stream (I hope
 at some point it’l be mentioned in the documentation as it unfortunately
 misleading). Could you share your thoughts on how to replay historical data
 in event–time manner (i.e. from db to working application)? The solution
 with the processing time is not suitable here as the sessions windows are
 needed.

 Thank you!

>>>
>>
>> --
>> Best regards, Kanstantsin Kamkou
>> email:  kkam...@gmail.com
>> web: http://2ka.by/
>> mobile: +49 172 5432334
>> skype: kkamkou
>>
> --
Best regards, Kanstantsin Kamkou
email:  kkam...@gmail.com
web: http://2ka.by/
mobile: +49 172 5432334
skype: kkamkou


Re: YARN reserved container prevents new Flink TMs

2019-01-18 Thread suraj7
Hi,

Sharing new findings. 
The issue I have mentioned above seems to be happening only with the latest
version of EMR(emr-5.20.0, hadoop: Amazon 2.8.5, Flink: 1.6.2) and it is
reproducible with our setup every time. I have verified the same setup
working and scaling without any issues on an older EMR version(emr-5.16.0,
hadoop: Amazon 2.8.4, Flink: 1.5.0).

Hope the above details help in resolving the issue and help others facing
this issue.

Regards,
Suraj



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Query on retract stream

2019-01-18 Thread Gagan Agrawal
Hi,
I have a requirement and need to understand if same can be achieved with
Flink retract stream. Let's say we have stream with 4 attributes userId,
orderId, status, event_time where orderId is unique and hence any change in
same orderId updates previous value as below

*Changelog* *Event Stream*

*user, order, status, event_time*
u1, o1, pending, t1
u2, o2, failed, t2
*u1, o3, pending, t3*
*u1, o3, success, t4*
u2, o4, pending, t5
u2, o4, pending, t6

*Snapshot view at time t6 (as viewed in mysql)*
u1, o1, pending, t1
u2, o2, failed, t2
u1, o3, success, t4
u4, o4, pending, t6
(Here rows at time t3 and t5 are deleted as they have been updated for
respective order ids)

What I need is to maintain count of "Pending" orders against a user and if
they go beyond configured threshold, then push that user and pending count
to Kafka. Here there can be multiple updates to order status e.g Pending ->
Success or Pending -> Failed. Also in some cases there may not be any
change in status but we may still get a row (may be due to some other
attribute update which we are not concerned about). So is it possible to
have running count in flink as below at respective event times. Here
Pending count is decreased from 2 to 1 for user u1 at t4 since one of it's
order status was changed from Pending to Success. Similarly for user u2, at
time t6, there was no change in running count as there was no change in
status for order o4

t1 -> u1 : 1, u2 : 0
t2 -> u1 : 1, u2 : 0
t3 -> u1 : 2, u2 : 0
*t4 -> u1 : 1, u2 : 0 (since o3 moved pending to success, so count is
decreased for u1)*
t5 -> u1 : 1, u2 : 1
*t6 -> u1 : 1, u2 : 1 (no increase in count of u2 as o4 update has no
change)*

As I understand may be retract stream can achieve this. However I am not
sure how. Any samples around this would be of great help.

Gagan


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Cristian C
Well, the problem is that, conceptually, the way I'm trying to approach
this is ok. But in practice, it has some edge cases.

So back to my original premise: if you both, trigger and checkpoint happen
around the same time, there is a chance that the streaming file sink rolls
the bucket BEFORE it has received all the data. In other words, it would
create incomplete snapshots of the table.

Keep in mind that every snapshot is written to a different folder. And they
are supposed to represent the state of the whole table at a point in time.

On Fri, Jan 18, 2019, 8:26 AM Jamie Grier  Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
> PURGES but only FIRES what I said is semantically true.  The window
> contents are never cleared.
>
> What I missed is that in this case since you're using a function that
> incrementally reduces on the fly rather than processing all the data when
> it's triggered your state is always kept to one element per key.  Your'e
> correct but in general with non-incremental window functions the state
> would grow unbounded in this configuration.
>
> So it looks like your approach should work just fine.
>
> -Jamie
>
>
>
> On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:
>
>> Hello Jamie.
>>
>> Thanks for taking a look at this. So, yes, I want to write only the last
>> data for each key every X minutes. In other words, I want a snapshot of
>> the
>> whole database every X minutes.
>>
>> >  The issue is that the window never get's PURGED so the data just
>> > continues to accumulate in the window.  This will grow without bound.
>>
>> The window not being purged does not necessarily mean that the data will
>> be
>> accumulated indefinitely. How so? Well, Flink has two mechanisms to remove
>> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.
>>
>> The reduce function has an implicit evictor that automatically removes
>> events from the window pane that are no longer needed. i.e. it keeps in
>> state only the element that was reduced. Here is an example:
>>
>> env.socketTextStream("localhost", )
>>   .keyBy { it.first().toString() }
>>   .window(GlobalWindows.create())
>>
>> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>>   .reduce { left, right ->
>> println("left: $left, right: $right")
>> if (left.length > right.length) {
>>   left
>> } else {
>>   right
>> }
>>   }
>>   .printToErr()
>>
>> For your claim to hold true, every time the trigger fires one would expect
>> to see ALL the elements by a key being printed over and over again in the
>> reduce function. However, if you run a job similar to this one in your
>> lang
>> of choice, you will notice that the print statement is effectively called
>> only once per event per key.
>>
>> In fact, not using purge is intentional. Because I want to hold every
>> record
>> (the last one by its primary key) of the database in state so that I can
>> write a snapshot of the whole database.
>>
>> So for instance, let's say my table has two columns: id and time. And I
>> have
>> the following events:
>>
>> 1,January
>> 2,February
>> 1,March
>>
>> I want to write to S3 two records: "1,March", and "2,February".
>>
>> Now, let's say two more events come into the stream:
>>
>> 3,April
>> 1,June
>>
>> Then I want to write to S3 three records: "1,June", "2,February" and
>> "3,April".
>>
>> In other words, I can't just purge the windows, because I would lose the
>> record with id 2.
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Jamie Grier
Sorry my earlier comment should read: "It would just read all the files in
order and NOT worry about which data rows are in which files"

On Fri, Jan 18, 2019 at 10:00 AM Jamie Grier  wrote:

> Hmm..  I would have to look into the code for the StreamingFileSink more
> closely to understand the concern but typically you should not be concerned
> at all with *when* checkpoints happen.  They are meant to be a completely
> asynchronous background process that has absolutely no bearing on
> application semantics.  The output should be thought of as a stream rather
> than a snapshot.
>
> Can you rework the downstream consumer of the output data such that you
> don't have to worry about this?  It would just read all the files in order
> and worry about which data rows are in which files.
>
> Anyway, maybe Kostas can add more since he wrote the StreamingFileSink
> code.  I've cc'd him directly.
>
> -Jamie
>
>
> On Fri, Jan 18, 2019 at 9:44 AM Cristian C 
> wrote:
>
>> Well, the problem is that, conceptually, the way I'm trying to approach
>> this is ok. But in practice, it has some edge cases.
>>
>> So back to my original premise: if you both, trigger and checkpoint
>> happen around the same time, there is a chance that the streaming file sink
>> rolls the bucket BEFORE it has received all the data. In other words, it
>> would create incomplete snapshots of the table.
>>
>> Keep in mind that every snapshot is written to a different folder. And
>> they are supposed to represent the state of the whole table at a point in
>> time.
>>
>> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier >
>>> Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
>>> PURGES but only FIRES what I said is semantically true.  The window
>>> contents are never cleared.
>>>
>>> What I missed is that in this case since you're using a function that
>>> incrementally reduces on the fly rather than processing all the data when
>>> it's triggered your state is always kept to one element per key.  Your'e
>>> correct but in general with non-incremental window functions the state
>>> would grow unbounded in this configuration.
>>>
>>> So it looks like your approach should work just fine.
>>>
>>> -Jamie
>>>
>>>
>>>
>>> On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:
>>>
 Hello Jamie.

 Thanks for taking a look at this. So, yes, I want to write only the last
 data for each key every X minutes. In other words, I want a snapshot of
 the
 whole database every X minutes.

 >  The issue is that the window never get's PURGED so the data just
 > continues to accumulate in the window.  This will grow without bound.

 The window not being purged does not necessarily mean that the data
 will be
 accumulated indefinitely. How so? Well, Flink has two mechanisms to
 remove
 data from a window: triggering a FIRE/FIRE_AND_PURGE or using an
 evictor.

 The reduce function has an implicit evictor that automatically removes
 events from the window pane that are no longer needed. i.e. it keeps in
 state only the element that was reduced. Here is an example:

 env.socketTextStream("localhost", )
   .keyBy { it.first().toString() }
   .window(GlobalWindows.create())


 .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
   .reduce { left, right ->
 println("left: $left, right: $right")
 if (left.length > right.length) {
   left
 } else {
   right
 }
   }
   .printToErr()

 For your claim to hold true, every time the trigger fires one would
 expect
 to see ALL the elements by a key being printed over and over again in
 the
 reduce function. However, if you run a job similar to this one in your
 lang
 of choice, you will notice that the print statement is effectively
 called
 only once per event per key.

 In fact, not using purge is intentional. Because I want to hold every
 record
 (the last one by its primary key) of the database in state so that I can
 write a snapshot of the whole database.

 So for instance, let's say my table has two columns: id and time. And I
 have
 the following events:

 1,January
 2,February
 1,March

 I want to write to S3 two records: "1,March", and "2,February".

 Now, let's say two more events come into the stream:

 3,April
 1,June

 Then I want to write to S3 three records: "1,June", "2,February" and
 "3,April".

 In other words, I can't just purge the windows, because I would lose the
 record with id 2.



 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Jamie Grier
Hmm..  I would have to look into the code for the StreamingFileSink more
closely to understand the concern but typically you should not be concerned
at all with *when* checkpoints happen.  They are meant to be a completely
asynchronous background process that has absolutely no bearing on
application semantics.  The output should be thought of as a stream rather
than a snapshot.

Can you rework the downstream consumer of the output data such that you
don't have to worry about this?  It would just read all the files in order
and worry about which data rows are in which files.

Anyway, maybe Kostas can add more since he wrote the StreamingFileSink
code.  I've cc'd him directly.

-Jamie


On Fri, Jan 18, 2019 at 9:44 AM Cristian C  wrote:

> Well, the problem is that, conceptually, the way I'm trying to approach
> this is ok. But in practice, it has some edge cases.
>
> So back to my original premise: if you both, trigger and checkpoint happen
> around the same time, there is a chance that the streaming file sink rolls
> the bucket BEFORE it has received all the data. In other words, it would
> create incomplete snapshots of the table.
>
> Keep in mind that every snapshot is written to a different folder. And
> they are supposed to represent the state of the whole table at a point in
> time.
>
> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier 
>> Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
>> PURGES but only FIRES what I said is semantically true.  The window
>> contents are never cleared.
>>
>> What I missed is that in this case since you're using a function that
>> incrementally reduces on the fly rather than processing all the data when
>> it's triggered your state is always kept to one element per key.  Your'e
>> correct but in general with non-incremental window functions the state
>> would grow unbounded in this configuration.
>>
>> So it looks like your approach should work just fine.
>>
>> -Jamie
>>
>>
>>
>> On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:
>>
>>> Hello Jamie.
>>>
>>> Thanks for taking a look at this. So, yes, I want to write only the last
>>> data for each key every X minutes. In other words, I want a snapshot of
>>> the
>>> whole database every X minutes.
>>>
>>> >  The issue is that the window never get's PURGED so the data just
>>> > continues to accumulate in the window.  This will grow without bound.
>>>
>>> The window not being purged does not necessarily mean that the data will
>>> be
>>> accumulated indefinitely. How so? Well, Flink has two mechanisms to
>>> remove
>>> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.
>>>
>>> The reduce function has an implicit evictor that automatically removes
>>> events from the window pane that are no longer needed. i.e. it keeps in
>>> state only the element that was reduced. Here is an example:
>>>
>>> env.socketTextStream("localhost", )
>>>   .keyBy { it.first().toString() }
>>>   .window(GlobalWindows.create())
>>>
>>> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>>>   .reduce { left, right ->
>>> println("left: $left, right: $right")
>>> if (left.length > right.length) {
>>>   left
>>> } else {
>>>   right
>>> }
>>>   }
>>>   .printToErr()
>>>
>>> For your claim to hold true, every time the trigger fires one would
>>> expect
>>> to see ALL the elements by a key being printed over and over again in the
>>> reduce function. However, if you run a job similar to this one in your
>>> lang
>>> of choice, you will notice that the print statement is effectively called
>>> only once per event per key.
>>>
>>> In fact, not using purge is intentional. Because I want to hold every
>>> record
>>> (the last one by its primary key) of the database in state so that I can
>>> write a snapshot of the whole database.
>>>
>>> So for instance, let's say my table has two columns: id and time. And I
>>> have
>>> the following events:
>>>
>>> 1,January
>>> 2,February
>>> 1,March
>>>
>>> I want to write to S3 two records: "1,March", and "2,February".
>>>
>>> Now, let's say two more events come into the stream:
>>>
>>> 3,April
>>> 1,June
>>>
>>> Then I want to write to S3 three records: "1,June", "2,February" and
>>> "3,April".
>>>
>>> In other words, I can't just purge the windows, because I would lose the
>>> record with id 2.
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Dump snapshot of big table in real time using StreamingFileSink

2019-01-18 Thread Cristian C
The output is a bunch of files in parquet format. The thing reading them
would be presto, so I can really tell it to ignore some rows but not
others. Not to mention that the files would keep piling making sql queries
super slow.

On Fri, Jan 18, 2019, 10:01 AM Jamie Grier  Sorry my earlier comment should read: "It would just read all the files in
> order and NOT worry about which data rows are in which files"
>
> On Fri, Jan 18, 2019 at 10:00 AM Jamie Grier  wrote:
>
>> Hmm..  I would have to look into the code for the StreamingFileSink more
>> closely to understand the concern but typically you should not be concerned
>> at all with *when* checkpoints happen.  They are meant to be a completely
>> asynchronous background process that has absolutely no bearing on
>> application semantics.  The output should be thought of as a stream rather
>> than a snapshot.
>>
>> Can you rework the downstream consumer of the output data such that you
>> don't have to worry about this?  It would just read all the files in order
>> and worry about which data rows are in which files.
>>
>> Anyway, maybe Kostas can add more since he wrote the StreamingFileSink
>> code.  I've cc'd him directly.
>>
>> -Jamie
>>
>>
>> On Fri, Jan 18, 2019 at 9:44 AM Cristian C 
>> wrote:
>>
>>> Well, the problem is that, conceptually, the way I'm trying to approach
>>> this is ok. But in practice, it has some edge cases.
>>>
>>> So back to my original premise: if you both, trigger and checkpoint
>>> happen around the same time, there is a chance that the streaming file sink
>>> rolls the bucket BEFORE it has received all the data. In other words, it
>>> would create incomplete snapshots of the table.
>>>
>>> Keep in mind that every snapshot is written to a different folder. And
>>> they are supposed to represent the state of the whole table at a point in
>>> time.
>>>
>>> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier >>
 Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
 PURGES but only FIRES what I said is semantically true.  The window
 contents are never cleared.

 What I missed is that in this case since you're using a function that
 incrementally reduces on the fly rather than processing all the data when
 it's triggered your state is always kept to one element per key.  Your'e
 correct but in general with non-incremental window functions the state
 would grow unbounded in this configuration.

 So it looks like your approach should work just fine.

 -Jamie



 On Thu, Jan 17, 2019 at 10:18 PM knur  wrote:

> Hello Jamie.
>
> Thanks for taking a look at this. So, yes, I want to write only the
> last
> data for each key every X minutes. In other words, I want a snapshot
> of the
> whole database every X minutes.
>
> >  The issue is that the window never get's PURGED so the data just
> > continues to accumulate in the window.  This will grow without bound.
>
> The window not being purged does not necessarily mean that the data
> will be
> accumulated indefinitely. How so? Well, Flink has two mechanisms to
> remove
> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an
> evictor.
>
> The reduce function has an implicit evictor that automatically removes
> events from the window pane that are no longer needed. i.e. it keeps in
> state only the element that was reduced. Here is an example:
>
> env.socketTextStream("localhost", )
>   .keyBy { it.first().toString() }
>   .window(GlobalWindows.create())
>
>
> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>   .reduce { left, right ->
> println("left: $left, right: $right")
> if (left.length > right.length) {
>   left
> } else {
>   right
> }
>   }
>   .printToErr()
>
> For your claim to hold true, every time the trigger fires one would
> expect
> to see ALL the elements by a key being printed over and over again in
> the
> reduce function. However, if you run a job similar to this one in your
> lang
> of choice, you will notice that the print statement is effectively
> called
> only once per event per key.
>
> In fact, not using purge is intentional. Because I want to hold every
> record
> (the last one by its primary key) of the database in state so that I
> can
> write a snapshot of the whole database.
>
> So for instance, let's say my table has two columns: id and time. And
> I have
> the following events:
>
> 1,January
> 2,February
> 1,March
>
> I want to write to S3 two records: "1,March", and "2,February".
>
> Now, let's say two more events come into the stream:
>
> 3,April
> 1,June
>
> Then I want to write to S3 three