Hi Gabor, Yeah, I know this, but what if you initially forgot and now you want to add the uid "after the fact"?
You need to get the operator/vertex id used by Flink for current savepoints and somehow set this id for the sink. With the uid method you would need to "hack" the existing hash (get a string which when hashed produces it). I guess this can be done since murmur3 is a non-cryptographic hash but Flink has the "setUidHash" precisely for that. From the javadocs: *Important*: this should be used as a workaround or for troubleshooting. > The provided hash needs to be unique per transformation and job. A use case > for this is in migration between Flink versions or changing the jobs in a > way that changes the automatically generated hashes. In this case, > providing the previous hashes directly through this method (e.g. obtained > from old logs) can help to reestablish a lost mapping from states to their > target operator. ...but as I said, it seems this method is not supported for new (non-legacy) sinks... Regards, Salva On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Salva, > > Just wondering why not good to set the uid like this? > ``` > output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); > ``` > > From the mentioned UID Flink is going to make the hash which is consistent > from UID -> HASH transformation perspective. > > BR, > G > > > > On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara <salcantara...@gmail.com> > wrote: > >> Hi, >> >> I want to add the uid for my Kafka sink in such a way that I can still >> use the existing savepoint. The problem I'm having is that I cannot set the >> uid hash. If I try something like this: >> >> ``` >> output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); >> ``` >> >> I get the following error: >> >> ``` >> Exception in thread "main" java.lang.UnsupportedOperationException: >> Cannot set a custom UID hash on a non-legacy sink >> at >> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) >> ``` >> >> How can one set the operator id directly then for new (non-legacy) sinks? >> >> Kind regards, >> >> Salva >> >>