Hi Eron,

I think this is a useful addition for storage systems that act as
pass-through for Flink to reduce recovery time. It is only useful if you
combine it with regional fail-over as only a small part of the pipeline is
restarted.

A couple of thoughts on the implications:
1. Watermarks are closely coupled to the used system (=Flink). I have a
hard time imagining that it's useful to use a different stream processor
downstream. So for now, I'm assuming that both upstream and downstream are
Flink applications. In that case, we probably define both parts of the
pipeline in the same Flink job similar to KafkaStream's #through.
2. The schema of the respective intermediate stream/topic would need to be
managed by Flink to encode both records and watermarks. This reduces the
usability quite a bit and needs to be carefully crafted.
3. It's not clear to me if constructs like SchemaRegistry can be properly
supported (and also if they should be supported) in terms of schema
evolution.
4. Potentially, StreamStatus and LatencyMarker would also need to be
encoded.
5. It's important to have some way to transport backpressure from the
downstream to the upstream. Or else you would have the same issue as
KafkaStreams where two separate pipelines can drift so far away that you
experience data loss if the data retention period is smaller than the drift.
6. It's clear that you trade a huge chunk of throughput for lower overall
latency in case of failure. So it's an interesting feature for use cases
with SLAs.

Since we are phasing out SinkFunction, I'd prefer to only support
SinkWriter. Having a no-op default sounds good to me.

We have some experimental feature for Kafka [1], which pretty much reflects
your idea. Here we have an ugly workaround to be able to process the
watermark by using a custom StreamSink task. We could also try to create a
FLIP that abstracts the actual system away and then we could use the
approach for both Pulsar and Kafka.

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffle.java#L103


On Mon, May 17, 2021 at 10:44 PM Eron Wright
<ewri...@streamnative.io.invalid> wrote:

> I would like to propose an enhancement to the Sink API, the ability to
> receive upstream watermarks.   I'm aware that the sink context provides the
> current watermark for a given record.  I'd like to be able to write a sink
> function that is invoked whenever the watermark changes.  Out of scope
> would be event-time timers (since sinks aren't keyed).
>
> For context, imagine that a stream storage system had the ability to
> persist watermarks in addition to ordinary elements, e.g. to serve as
> source watermarks in a downstream processor.  Ideally one could compose a
> multi-stage, event-driven application, with watermarks flowing end-to-end
> without need for a heuristics-based watermark at each stage.
>
> The specific proposal would be a new method on `SinkFunction` and/or on
> `SinkWriter`, called 'processWatermark' or 'writeWatermark', with a default
> implementation that does nothing.
>
> Thoughts?
>
> Thanks!
> Eron Wright
> StreamNative
>

Reply via email to