Hi Dan,

When I run into such problem I consider using the not so @public api levels:

  *   First of all uids are especially needed for operator that hold state and 
is not so important for operators that don’t hold state primitives, not sure of 
the implications created by disableAutoGeneratedUIDs
  *   A DataStream is actually a Transformation[] assigned to the 
StreamEnvironment (see DataStream#getTransformation())
  *   You can assign name() and uid() directly to Transformations
  *   Transformations export their input transformation: 
Transformation@getInputs()
  *   This this ways you can locate the two Map transformations and assign uids
  *   However the two maps are stateless and technically don’t need a uid

What do you think?

Thias










From: Dan Hill <quietgol...@gmail.com>
Sent: Montag, 13. Dezember 2021 06:30
To: user <user@flink.apache.org>
Subject: CoGroupedStreams and disableAutoGeneratedUIDs

Hi.  I tried to use CoGroupedStreams w/ disableAutoGeneratedUIDs.  
CoGroupedStreams creates two map operators without the ability to set uids on 
them.  These appear as "Map" in my operator graph.  I noticed that the 
CoGroupedStreams.apply function has two map calls without setting uids.  If I 
try to run with disableAutoGeneratedUIDs, I get the following error 
"java.lang.IllegalStateException: Auto generated UIDs have been disabled but no 
UID or hash has been assigned to operator Map".

How can I fix this?  Extend the base CoGroupedStreams class?


```
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, 
TypeInformation<T> resultType) {
    function = 
(CoGroupFunction)this.input1.getExecutionEnvironment().clean(function);
    CoGroupedStreams.UnionTypeInfo<T1, T2> unionType = new 
CoGroupedStreams.UnionTypeInfo(this.input1.getType(), this.input2.getType());
    CoGroupedStreams.UnionKeySelector<T1, T2, KEY> unionKeySelector = new 
CoGroupedStreams.UnionKeySelector(this.keySelector1, this.keySelector2);
    DataStream<CoGroupedStreams.TaggedUnion<T1, T2>> taggedInput1 = 
this.input1.map(new 
CoGroupedStreams.Input1Tagger()).setParallelism(this.input1.getParallelism()).returns(unionType);
    DataStream<CoGroupedStreams.TaggedUnion<T1, T2>> taggedInput2 = 
this.input2.map(new 
CoGroupedStreams.Input2Tagger()).setParallelism(this.input2.getParallelism()).returns(unionType);
    DataStream<CoGroupedStreams.TaggedUnion<T1, T2>> unionStream = 
taggedInput1.union(new DataStream[]{taggedInput2});
    this.windowedStream = (new KeyedStream(unionStream, unionKeySelector, 
this.keyType)).window(this.windowAssigner);
    if (this.trigger != null) {
        this.windowedStream.trigger(this.trigger);
    }

    if (this.evictor != null) {
        this.windowedStream.evictor(this.evictor);
    }

    if (this.allowedLateness != null) {
        this.windowedStream.allowedLateness(this.allowedLateness);
    }

    return this.windowedStream.apply(new 
CoGroupedStreams.CoGroupWindowFunction(function), resultType);
}
```
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to