Hi Salva, The SinkV2 transformation will be translated to multiple operators at the physical level. When setting a UID, Flink will automatically generate UID for sub-operators by filling the configured UID in a pre-defined naming template. The naming template is carefully maintained to ensure cross-version state compatibility. However, this cannot be easily done when setting the uidHash, that's why Flink currently does not support setting it for non-legacy sinks.
A possible solution is to convert uidHash back to the uid and apply the same strategy for generating uids to compute the corresponding uidHash for each suboperator. Maybe you can further investigate it and fire a JIRA issue on it. Best, Zhanghao Chen ________________________________ From: Salva Alcántara <salcantara...@gmail.com> Sent: Sunday, June 9, 2024 14:49 To: Gabor Somogyi <gabor.g.somo...@gmail.com> Cc: user <user@flink.apache.org> Subject: Re: Setting uid hash for non-legacy sinks Hi Gabor, Yeah, I know this, but what if you initially forgot and now you want to add the uid "after the fact"? You need to get the operator/vertex id used by Flink for current savepoints and somehow set this id for the sink. With the uid method you would need to "hack" the existing hash (get a string which when hashed produces it). I guess this can be done since murmur3 is a non-cryptographic hash but Flink has the "setUidHash" precisely for that. From the javadocs: Important: this should be used as a workaround or for troubleshooting. The provided hash needs to be unique per transformation and job. A use case for this is in migration between Flink versions or changing the jobs in a way that changes the automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g. obtained from old logs) can help to reestablish a lost mapping from states to their target operator. ...but as I said, it seems this method is not supported for new (non-legacy) sinks... Regards, Salva On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi <gabor.g.somo...@gmail.com<mailto:gabor.g.somo...@gmail.com>> wrote: Hi Salva, Just wondering why not good to set the uid like this? ``` output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); ``` From the mentioned UID Flink is going to make the hash which is consistent from UID -> HASH transformation perspective. BR, G On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara <salcantara...@gmail.com<mailto:salcantara...@gmail.com>> wrote: Hi, I want to add the uid for my Kafka sink in such a way that I can still use the existing savepoint. The problem I'm having is that I cannot set the uid hash. If I try something like this: ``` output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); ``` I get the following error: ``` Exception in thread "main" java.lang.UnsupportedOperationException: Cannot set a custom UID hash on a non-legacy sink at org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) ``` How can one set the operator id directly then for new (non-legacy) sinks? Kind regards, Salva