Hi Gabor,

Yeah, I know this, but what if you initially forgot and now you want to add
the uid "after the fact"?

You need to get the operator/vertex id used by Flink for current savepoints
and somehow set this id for the sink.

With the uid method you would need to "hack" the existing hash (get a
string which when hashed produces it). I guess this can be done since
murmur3 is a non-cryptographic hash but Flink has the "setUidHash"
precisely for that. From the javadocs:

*Important*: this should be used as a workaround or for troubleshooting.
> The provided hash needs to be unique per transformation and job. A use case
> for this is in migration between Flink versions or changing the jobs in a
> way that changes the automatically generated hashes. In this case,
> providing the previous hashes directly through this method (e.g. obtained
> from old logs) can help to reestablish a lost mapping from states to their
> target operator.


...but as I said, it seems this method is not supported for new
(non-legacy) sinks...

Regards,

Salva

On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Hi Salva,
>
> Just wondering why not good to set the uid like this?
> ```
> output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
> ```
>
> From the mentioned UID Flink is going to make the hash which is consistent
> from UID -> HASH transformation perspective.
>
> BR,
> G
>
>
>
> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara <salcantara...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I want to add the uid for my Kafka sink in such a way that I can still
>> use the existing savepoint. The problem I'm having is that I cannot set the
>> uid hash. If I try something like this:
>>
>> ```
>> output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
>> ```
>>
>> I get the following error:
>>
>> ```
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Cannot set a custom UID hash on a non-legacy sink
>> at
>> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
>> ```
>>
>> How can one set the operator id directly then for new (non-legacy) sinks?
>>
>> Kind regards,
>>
>> Salva
>>
>>

Reply via email to