Now I see the intention and then you must have a V2 sink, right? Maybe you
look for the following:

final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
final CustomSinkOperatorUidHashes operatorsUidHashes =
src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);


On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen <>

> Hi Salva,
> The SinkV2 transformation will be translated to multiple operators at the
> physical level. When setting a UID, Flink will automatically generate UID
> for sub-operators by filling the configured UID in a pre-defined naming
> template. The naming template is carefully maintained to ensure
> cross-version state compatibility. However, this cannot be easily done when
> setting the uidHash, that's why Flink currently does not support setting it
> for non-legacy sinks.
> A possible solution is to convert uidHash back to the uid and apply the
> same strategy for generating uids to compute the corresponding uidHash for
> each suboperator. Maybe you can further investigate it and fire a JIRA
> issue on it.
> Best,
> Zhanghao Chen
> ------------------------------
> *From:* Salva Alcántara <>
> *Sent:* Sunday, June 9, 2024 14:49
> *To:* Gabor Somogyi <>
> *Cc:* user <>
> *Subject:* Re: Setting uid hash for non-legacy sinks
> Hi Gabor,
> Yeah, I know this, but what if you initially forgot and now you want to
> add the uid "after the fact"?
> You need to get the operator/vertex id used by Flink for current
> savepoints and somehow set this id for the sink.
> With the uid method you would need to "hack" the existing hash (get a
> string which when hashed produces it). I guess this can be done since
> murmur3 is a non-cryptographic hash but Flink has the "setUidHash"
> precisely for that. From the javadocs:
> *Important*: this should be used as a workaround or for troubleshooting.
> The provided hash needs to be unique per transformation and job. A use case
> for this is in migration between Flink versions or changing the jobs in a
> way that changes the automatically generated hashes. In this case,
> providing the previous hashes directly through this method (e.g. obtained
> from old logs) can help to reestablish a lost mapping from states to their
> target operator.
> ...but as I said, it seems this method is not supported for new
> (non-legacy) sinks...
> Regards,
> Salva
> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi <>
> wrote:
> Hi Salva,
> Just wondering why not good to set the uid like this?
> ```
> output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
> ```
> From the mentioned UID Flink is going to make the hash which is consistent
> from UID -> HASH transformation perspective.
> BR,
> G
> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara <>
> wrote:
> Hi,
> I want to add the uid for my Kafka sink in such a way that I can still use
> the existing savepoint. The problem I'm having is that I cannot set the uid
> hash. If I try something like this:
> ```
> output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
> ```
> I get the following error:
> ```
> Exception in thread "main" java.lang.UnsupportedOperationException: Cannot
> set a custom UID hash on a non-legacy sink
> at
> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(
> ```
> How can one set the operator id directly then for new (non-legacy) sinks?
> Kind regards,
> Salva

Reply via email to