Hey Salva, Good to hear your issue has been resolved one way or another! Thanks for confirming that this operator hash trick is working on V2 as well.
G On Wed, Jun 12, 2024 at 5:20 AM Salva Alcántara <salcantara...@gmail.com> wrote: > Hey Gabor, > > I didn't finally need to keep compatibility with existing savepoints so I > restarted my jobs from scratch so to say. > > However, I have given it a try locally and by debugging the job graph I > can see the hashes manually set following your recipe (see the > userDefinedOperatorIDs) so it seems to work fine! > > Regards, > > Salva > > On Mon, Jun 10, 2024 at 9:49 AM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> YW, ping me back whether it works because it's a nifty feature. >> >> G >> >> On Mon, Jun 10, 2024 at 9:26 AM Salva Alcántara <salcantara...@gmail.com> >> wrote: >> >>> Thanks Gabor, I will give it a try! >>> >>> On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi < >>> gabor.g.somo...@gmail.com> wrote: >>> >>>> Now I see the intention and then you must have a V2 sink, right? Maybe >>>> you look for the following: >>>> >>>> final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead"; >>>> final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10"; >>>> final CustomSinkOperatorUidHashes operatorsUidHashes = >>>> CustomSinkOperatorUidHashes.builder() >>>> .setWriterUidHash(writerHash) >>>> .setCommitterUidHash(committerHash) >>>> .build(); >>>> src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME); >>>> >>>> G >>>> >>>> >>>> On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen <zhanghao.c...@outlook.com> >>>> wrote: >>>> >>>>> Hi Salva, >>>>> >>>>> The SinkV2 transformation will be translated to multiple operators at >>>>> the physical level. When setting a UID, Flink will automatically generate >>>>> UID for sub-operators by filling the configured UID in a pre-defined >>>>> naming >>>>> template. The naming template is carefully maintained to ensure >>>>> cross-version state compatibility. However, this cannot be easily done >>>>> when >>>>> setting the uidHash, that's why Flink currently does not support setting >>>>> it >>>>> for non-legacy sinks. >>>>> >>>>> A possible solution is to convert uidHash back to the uid and apply >>>>> the same strategy for generating uids to compute the corresponding uidHash >>>>> for each suboperator. Maybe you can further investigate it and fire a JIRA >>>>> issue on it. >>>>> >>>>> Best, >>>>> Zhanghao Chen >>>>> ------------------------------ >>>>> *From:* Salva Alcántara <salcantara...@gmail.com> >>>>> *Sent:* Sunday, June 9, 2024 14:49 >>>>> *To:* Gabor Somogyi <gabor.g.somo...@gmail.com> >>>>> *Cc:* user <user@flink.apache.org> >>>>> *Subject:* Re: Setting uid hash for non-legacy sinks >>>>> >>>>> Hi Gabor, >>>>> >>>>> Yeah, I know this, but what if you initially forgot and now you want >>>>> to add the uid "after the fact"? >>>>> >>>>> You need to get the operator/vertex id used by Flink for current >>>>> savepoints and somehow set this id for the sink. >>>>> >>>>> With the uid method you would need to "hack" the existing hash (get a >>>>> string which when hashed produces it). I guess this can be done since >>>>> murmur3 is a non-cryptographic hash but Flink has the "setUidHash" >>>>> precisely for that. From the javadocs: >>>>> >>>>> *Important*: this should be used as a workaround or for >>>>> troubleshooting. The provided hash needs to be unique per transformation >>>>> and job. A use case for this is in migration between Flink versions or >>>>> changing the jobs in a way that changes the automatically generated >>>>> hashes. >>>>> In this case, providing the previous hashes directly through this method >>>>> (e.g. obtained from old logs) can help to reestablish a lost mapping from >>>>> states to their target operator. >>>>> >>>>> >>>>> ...but as I said, it seems this method is not supported for new >>>>> (non-legacy) sinks... >>>>> >>>>> Regards, >>>>> >>>>> Salva >>>>> >>>>> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi < >>>>> gabor.g.somo...@gmail.com> wrote: >>>>> >>>>> Hi Salva, >>>>> >>>>> Just wondering why not good to set the uid like this? >>>>> ``` >>>>> output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); >>>>> ``` >>>>> >>>>> From the mentioned UID Flink is going to make the hash which is >>>>> consistent from UID -> HASH transformation perspective. >>>>> >>>>> BR, >>>>> G >>>>> >>>>> >>>>> >>>>> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara < >>>>> salcantara...@gmail.com> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I want to add the uid for my Kafka sink in such a way that I can still >>>>> use the existing savepoint. The problem I'm having is that I cannot set >>>>> the >>>>> uid hash. If I try something like this: >>>>> >>>>> ``` >>>>> >>>>> output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); >>>>> ``` >>>>> >>>>> I get the following error: >>>>> >>>>> ``` >>>>> Exception in thread "main" java.lang.UnsupportedOperationException: >>>>> Cannot set a custom UID hash on a non-legacy sink >>>>> at >>>>> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) >>>>> ``` >>>>> >>>>> How can one set the operator id directly then for new (non-legacy) >>>>> sinks? >>>>> >>>>> Kind regards, >>>>> >>>>> Salva >>>>> >>>>>