Hi,
I'm not expert on Flink specifially, but your approach might be easier
solve when broken down into two steps - create a "stable" input to
downstream processing, this might include a specific watermark. In
Flink, the "stability" of input for downstream processing is ensured by
a checkpoint. You would therefore need to wait for a checkpoint,
buffering intermediate data in a state (and produce a particular
watermark as a data element, because watermarks in general need not be
'stable'). Once a checkpoint is completed, you would flush the buffer
for downstream operators, one would create the parquet files, the other
would do whatever action needs to be taken based on the watermark. The
checkpoint ensures that the two tasks would be eventually consistent (if
this is sufficient for your case).
In Apache Beam, we call this operation a transform that
'@RequiresStableInput' [1], the implementation in Flink is as I
described above.
Jan
[1]
https://beam.apache.org/releases/javadoc/2.44.0/org/apache/beam/sdk/transforms/DoFn.RequiresStableInput.html
On 2/14/23 13:23, Tobias Fröhlich wrote:
Dear flink team,
I am facing the following problem: I would need to write events to parquet
files using the FileSink. Subsequently, I want to do something else in a global
commit where I need the corresponding watermark. However, the
org.apache.flink.connector.file.sink.FileSink forces the type of the
committables to be org.apache.flink.connector.file.sink.FileSinkCommittable
which can not carry watermarks.
Details:
As far, as I understand the idea of a two-phase commit with a global committer,
the committables are used for passing information from the writer to the global
committer. This is done by implementing two methods in the writer and the
committer, respectively:
1. Collection<CommT>
TwoPhaseCommittingSink.PrecommittingSinkWriter::prepareCommit() that returns a
collection of committables of some type CommT and
2. void Committer::commit(Collection<CommitRequest<CommT>>) that uses this
collection.
In general, the type CommT can be chosen arbitrarily. So, if the watermark is needed
in the global commit, it is possible to use a customized object that contains a field
for the watermark. However, if the class
org.apache.flink.connector.file.sink.FileSink<IN> is used, the type for the
committables is always org.apache.flink.connector.file.sink.FileSinkCommittable which
does not have a field that can be used for the watermark.
The only solution I found, was by forking the flink source code and augmenting
it in the following way:
1. adding a field to FileSinkCommittable ("private long watermark;" with
getter and setter)
2. changing the FileSinkCommittableSerializer accordingly (this makes it
necessary to define a new version)
3. in fileWriter::prepareCommit() adding a loop over all committables to set
the watermark
Am I missing something? Is there an easier way to get the watermarks from the
writer to the global committer? If not, is it justified to propose a feature
request?
Best regards and thanks in advance
Tobias Fröhlich