Hi Salva,

The SinkV2 transformation will be translated to multiple operators at the 
physical level. When setting a UID, Flink will automatically generate UID for 
sub-operators by filling the configured UID in a pre-defined naming template. 
The naming template is carefully maintained to ensure cross-version state 
compatibility. However, this cannot be easily done when setting the uidHash, 
that's why Flink currently does not support setting it for non-legacy sinks.

A possible solution is to convert uidHash back to the uid and apply the same 
strategy for generating uids to compute the corresponding uidHash for each 
suboperator. Maybe you can further investigate it and fire a JIRA issue on it.

Best,
Zhanghao Chen
________________________________
From: Salva Alcántara <salcantara...@gmail.com>
Sent: Sunday, June 9, 2024 14:49
To: Gabor Somogyi <gabor.g.somo...@gmail.com>
Cc: user <user@flink.apache.org>
Subject: Re: Setting uid hash for non-legacy sinks

Hi Gabor,

Yeah, I know this, but what if you initially forgot and now you want to add the 
uid "after the fact"?

You need to get the operator/vertex id used by Flink for current savepoints and 
somehow set this id for the sink.

With the uid method you would need to "hack" the existing hash (get a string 
which when hashed produces it). I guess this can be done since murmur3 is a 
non-cryptographic hash but Flink has the "setUidHash" precisely for that. From 
the javadocs:

Important: this should be used as a workaround or for troubleshooting. The 
provided hash needs to be unique per transformation and job. A use case for 
this is in migration between Flink versions or changing the jobs in a way that 
changes the automatically generated hashes. In this case, providing the 
previous hashes directly through this method (e.g. obtained from old logs) can 
help to reestablish a lost mapping from states to their target operator.

...but as I said, it seems this method is not supported for new (non-legacy) 
sinks...

Regards,

Salva

On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi 
<gabor.g.somo...@gmail.com<mailto:gabor.g.somo...@gmail.com>> wrote:
Hi Salva,

Just wondering why not good to set the uid like this?
```
output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
```

From the mentioned UID Flink is going to make the hash which is consistent from 
UID -> HASH transformation perspective.

BR,
G



On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara 
<salcantara...@gmail.com<mailto:salcantara...@gmail.com>> wrote:
Hi,

I want to add the uid for my Kafka sink in such a way that I can still use the 
existing savepoint. The problem I'm having is that I cannot set the uid hash. 
If I try something like this:

```
output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
```

I get the following error:

```
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot set 
a custom UID hash on a non-legacy sink
at 
org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
```

How can one set the operator id directly then for new (non-legacy) sinks?

Kind regards,

Salva

Reply via email to