Hi Gabor, Thanks for the insights! However, after inspecting the contents of the latest savepoint, I double checked that the hash value for my source operator was correct ("cbc357ccb763df2852fee8c4fc7d55f2"). This is also consistent with the id that was shown in the jobmanager UI. So, again, unless I'm missing something I don't see `setUidHash` having any effect in practice. The only way I've found to workaround the problem in production is by enabling `allowNonRestoredState`, which seems to confirm my thesis. Indeed, the new ID shown in the jobmanager UI is "bc764cd8ddf7a0cff126f51c16239658" (vs the intended "cbc357ccb763df2852fee8c4fc7d55f2") as reported in my test above. This makes me think there is something wrong with `setUidHash`...
BTW I'm running on Flink 1.18.1. Regards, Salva On Tue, Sep 24, 2024 at 1:07 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Now I see what you're doing. In general you should look for the UID pair > in the vertex, like this (several operators can belong to a vertex): > > Optional<OperatorIDPair> pair = > vertex.get().getOperatorIDs().stream() > .filter(o -> > o.getUserDefinedOperatorID().get().toString().equals(uidHash)) > .findFirst(); > > assertEquals(uidHash, pair.get().getUserDefinedOperatorID().get().toString()); > > > Please load your savepoint with SavepointLoader and analyze what kind of > operators are inside and why is your app blowing up. > > G > > > On Tue, Sep 24, 2024 at 9:29 AM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Hi Salva, >> >> Which version is this? >> >> BR, >> G >> >> >> On Mon, Sep 23, 2024 at 8:46 PM Salva Alcántara <salcantara...@gmail.com> >> wrote: >> >>> I have a pipeline where I'm trying to add a new operator. The problem is >>> that originally I forgot to specify the uid for one source. To remedy this, >>> I'm using setUidHash, providing the operator id to match that in my >>> savepoints / current job graph. >>> >>> Unless I'm doing something wrong, what I've observed is that the value >>> provided to setUidHash is completely ignored by Flink. I've tried to >>> summarize it in the following test: >>> >>> ```java >>> @Test >>> public void testSetUidHash() throws Exception { >>> var uidHash = "cbc357ccb763df2852fee8c4fc7d55f2"; >>> env.fromElements(1, 2, 3) >>> .name("123") >>> .setUidHash(uidHash) >>> //.uid("123") >>> .print() >>> .name("print") >>> .uid("print"); >>> >>> var vertex = StreamSupport >>> >>> .stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(), >>> false) >>> .filter(v -> v.getName().equals("Source: 123")) >>> .findFirst() >>> .orElseThrow(); >>> >>> assertEquals(uidHash, vertex.getID()); >>> // Fails with java.lang.AssertionError (if setUidHash is used): >>> // expected:<cbc357ccb763df2852fee8c4fc7d55f2> but >>> was:<bc764cd8ddf7a0cff126f51c16239658> >>> // Interestingly, the actual value does not even depend on the >>> provided value (uidHash)! >>> >>> // assertEquals("6a7f660d1b2d5b9869cf0ecee3a17e42", vertex.getID()); >>> // Passes (if uid is used instead) >>> } >>> ``` >>> >>> Can anyone confirm whether this is a bug? >>> >>> Regards, >>> >>> Salva >>> >>