Yes data will be keyed by shard.
This is the trigger config we used:
WindowFileNamePolicy policy = new
WindowFileNamePolicy(prefix,options.getDataSource());
TextIO.Write textWriter = TextIO.write()
.to(policy)
.withTempDirectory(tempPrefix)
.withWindowedWrites()
.withNumShards(options.getShardCount());
batchCollection = batchCollection.apply("Fixed Strategy",Window.<String>into(
FixedWindows.of(Utilities.resolveDuration(options.getWindowDuration())))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Utilities.resolveDuration(options.getWindowLateness()))
.discardingFiredPanes()).apply(textWriter);
From: Kenneth Knowles <[email protected]>
Sent: Friday, May 1, 2020 4:19 PM
To: user <[email protected]>
Subject: Re: Notifying the closure of a Window Period
CAUTION: This email originated from outside of D&B. Please do not click links
or open attachments unless you recognize the sender and know the content is
safe.
Is the data keyed by shardNumber? To have a unique final pane for a filename
prefix, you will need to include the key in the prefix.
Can you also provide the triggering configuration you are working with?
Kenn
On Fri, May 1, 2020 at 6:47 AM Truebody, Kyle
<[email protected]<mailto:[email protected]>> wrote:
Hi Kenn,
Thanks for the response…
Not sure if I under this correctly : ‘affected by the fact that windows
processed independently for each key’
I put a high level example below, hope it clarifies what I am trying to ask.
Is there more precise way we can get informed of the final pane of a window
session has been written completely.
Due to nature of coordination set up for downstream consumers, the .trigger
file delivery needs to be on the completion of the absolute last pane.
```
public class WindowFileNamePolicy extends FileBasedSink.FilenamePolicy {
private final ResourceId prefix;
private final String dataSource;
/**
* file names - file source name
* - timestamp (processing timestamp / event timestamp) Based
on the current time window
* - optional : - shard number
* - window start ts
* @param prefix
*/
public WindowFileNamePolicy(ResourceId prefix,String dataSource){
this.prefix = prefix;
this.dataSource = dataSource;
}
public String filenamePrefixForWindow(IntervalWindow window) {
String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
DateTimeFormatter formatter =
DateTimeFormat.forPattern(Utilities.lngDateFormat);
DateTime windowStart =
formatter.parseDateTime(window.start().toString());
DateTimeFormatter resultformat =
DateTimeFormat.forPattern(Utilities.shtDateFormat);
return String.format(
"%s/%s/%s-%s", resultformat.print(windowStart), dataSource,
dataSource, resultformat.print(windowStart));
}
@Override
public ResourceId windowedFilename(int shardNumber, int numShards,
BoundedWindow window, PaneInfo paneInfo, FileBasedSink.OutputFileHints
outputFileHints) {
IntervalWindow intervalWindow = (IntervalWindow) window;
String filename =
String.format(
"%s-%s-%s",
filenamePrefixForWindow(intervalWindow),
shardNumber,
numShards);
if(paneInfo.isLast())
createTriggerFile(/*tigger file name*/ ".trigger"); //writes to
the same directory of the current window. This fires multiple time depending
on the number of panes that have isLast() is true/ or write operators (not sure
exactly).
return prefix.getCurrentDirectory().resolve(filename,
ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
```
Thanks,
Kyle
From: Kenneth Knowles <[email protected]<mailto:[email protected]>>
Sent: Friday, May 1, 2020 2:25 PM
To: user <[email protected]<mailto:[email protected]>>
Subject: Re: Notifying the closure of a Window Period
CAUTION: This email originated from outside of D&B. Please do not click links
or open attachments unless you recognize the sender and know the content is
safe.
I am guessing you will be affected by the fact that windows processed
independently for each key. Is that what you are referring to when you mention
multiple isLast() windows?
Kenn
On Fri, May 1, 2020 at 3:36 AM Truebody, Kyle
<[email protected]<mailto:[email protected]>> wrote:
Hi all,
We are working on a streaming pipeline that we need to compatible with out
legacy platform while we make the move over to Beam Streaming.
Our legacy platform uses a co-ordination framework (oozie). Each step is in the
coordination pipeline is active by the creation of a trigger file.
I am looking for a beam construct or flag that will notify the Context/ driver
of the closure of a Time window. We need to enable to create a trigger flag
only when all the files have been emitted
from set window period.
We have tried creating the trigger flag using the PaneInfo.isLast() through a
custom WindowFileNamePolicy. Noticed that a window has multiple Panes that will
have isLast() as true.
Thanks,
Kyle