Hey Salva,

Good to hear your issue has been resolved one way or another!
Thanks for confirming that this operator hash trick is working on V2 as
well.

G


On Wed, Jun 12, 2024 at 5:20 AM Salva Alcántara <salcantara...@gmail.com>
wrote:

> Hey Gabor,
>
> I didn't finally need to keep compatibility with existing savepoints so I
> restarted my jobs from scratch so to say.
>
> However, I have given it a try locally and by debugging the job graph I
> can see the hashes manually set following your recipe (see the
> userDefinedOperatorIDs) so it seems to work fine!
>
> Regards,
>
> Salva
>
> On Mon, Jun 10, 2024 at 9:49 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> YW, ping me back whether it works because it's a nifty feature.
>>
>> G
>>
>> On Mon, Jun 10, 2024 at 9:26 AM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>>> Thanks Gabor, I will give it a try!
>>>
>>> On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi <
>>> gabor.g.somo...@gmail.com> wrote:
>>>
>>>> Now I see the intention and then you must have a V2 sink, right? Maybe
>>>> you look for the following:
>>>>
>>>> final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
>>>> final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
>>>> final CustomSinkOperatorUidHashes operatorsUidHashes =
>>>>         CustomSinkOperatorUidHashes.builder()
>>>>                 .setWriterUidHash(writerHash)
>>>>                 .setCommitterUidHash(committerHash)
>>>>                 .build();
>>>> src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
>>>>
>>>> G
>>>>
>>>>
>>>> On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen <zhanghao.c...@outlook.com>
>>>> wrote:
>>>>
>>>>> Hi Salva,
>>>>>
>>>>> The SinkV2 transformation will be translated to multiple operators at
>>>>> the physical level. When setting a UID, Flink will automatically generate
>>>>> UID for sub-operators by filling the configured UID in a pre-defined 
>>>>> naming
>>>>> template. The naming template is carefully maintained to ensure
>>>>> cross-version state compatibility. However, this cannot be easily done 
>>>>> when
>>>>> setting the uidHash, that's why Flink currently does not support setting 
>>>>> it
>>>>> for non-legacy sinks.
>>>>>
>>>>> A possible solution is to convert uidHash back to the uid and apply
>>>>> the same strategy for generating uids to compute the corresponding uidHash
>>>>> for each suboperator. Maybe you can further investigate it and fire a JIRA
>>>>> issue on it.
>>>>>
>>>>> Best,
>>>>> Zhanghao Chen
>>>>> ------------------------------
>>>>> *From:* Salva Alcántara <salcantara...@gmail.com>
>>>>> *Sent:* Sunday, June 9, 2024 14:49
>>>>> *To:* Gabor Somogyi <gabor.g.somo...@gmail.com>
>>>>> *Cc:* user <user@flink.apache.org>
>>>>> *Subject:* Re: Setting uid hash for non-legacy sinks
>>>>>
>>>>> Hi Gabor,
>>>>>
>>>>> Yeah, I know this, but what if you initially forgot and now you want
>>>>> to add the uid "after the fact"?
>>>>>
>>>>> You need to get the operator/vertex id used by Flink for current
>>>>> savepoints and somehow set this id for the sink.
>>>>>
>>>>> With the uid method you would need to "hack" the existing hash (get a
>>>>> string which when hashed produces it). I guess this can be done since
>>>>> murmur3 is a non-cryptographic hash but Flink has the "setUidHash"
>>>>> precisely for that. From the javadocs:
>>>>>
>>>>> *Important*: this should be used as a workaround or for
>>>>> troubleshooting. The provided hash needs to be unique per transformation
>>>>> and job. A use case for this is in migration between Flink versions or
>>>>> changing the jobs in a way that changes the automatically generated 
>>>>> hashes.
>>>>> In this case, providing the previous hashes directly through this method
>>>>> (e.g. obtained from old logs) can help to reestablish a lost mapping from
>>>>> states to their target operator.
>>>>>
>>>>>
>>>>> ...but as I said, it seems this method is not supported for new
>>>>> (non-legacy) sinks...
>>>>>
>>>>> Regards,
>>>>>
>>>>> Salva
>>>>>
>>>>> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi <
>>>>> gabor.g.somo...@gmail.com> wrote:
>>>>>
>>>>> Hi Salva,
>>>>>
>>>>> Just wondering why not good to set the uid like this?
>>>>> ```
>>>>> output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
>>>>> ```
>>>>>
>>>>> From the mentioned UID Flink is going to make the hash which is
>>>>> consistent from UID -> HASH transformation perspective.
>>>>>
>>>>> BR,
>>>>> G
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara <
>>>>> salcantara...@gmail.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I want to add the uid for my Kafka sink in such a way that I can still
>>>>> use the existing savepoint. The problem I'm having is that I cannot set 
>>>>> the
>>>>> uid hash. If I try something like this:
>>>>>
>>>>> ```
>>>>>
>>>>> output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
>>>>> ```
>>>>>
>>>>> I get the following error:
>>>>>
>>>>> ```
>>>>> Exception in thread "main" java.lang.UnsupportedOperationException:
>>>>> Cannot set a custom UID hash on a non-legacy sink
>>>>> at
>>>>> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
>>>>> ```
>>>>>
>>>>> How can one set the operator id directly then for new (non-legacy)
>>>>> sinks?
>>>>>
>>>>> Kind regards,
>>>>>
>>>>> Salva
>>>>>
>>>>>

Reply via email to