[ 
https://issues.apache.org/jira/browse/FLINK-10863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685395#comment-16685395
 ] 

ASF GitHub Bot commented on FLINK-10863:
----------------------------------------

asfgit closed pull request #7085: [FLINK-10863][tests] Assign UIDs to all 
operators in DataStreamAllrou…
URL: https://github.com/apache/flink/pull/7085
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index b14e2af1b52..3c406c7598d 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -78,7 +78,7 @@ public static void main(String[] args) throws Exception {
                setupEnvironment(env, pt);
 
                // add a keyed stateful map operator, which uses Kryo for state 
serialization
-               DataStream<Event> eventStream = 
env.addSource(createEventSource(pt))
+               DataStream<Event> eventStream = 
env.addSource(createEventSource(pt)).uid("0001")
                        
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
                        .keyBy(Event::getKey)
                        .map(createArtificialKeyedStateMapper(
@@ -97,7 +97,7 @@ public static void main(String[] args) throws Exception {
                                                new 
StatefulComplexPayloadSerializer()), // custom stateful serializer
                                        
Collections.singletonList(ComplexPayload.class) // KryoSerializer via type 
extraction
                                )
-                       ).returns(Event.class).name(KEYED_STATE_OPER_NAME + 
"_Kryo_and_Custom_Stateful");
+                       ).returns(Event.class).name(KEYED_STATE_OPER_NAME + 
"_Kryo_and_Custom_Stateful").uid("0002");
 
                // add a keyed stateful map operator, which uses Avro for state 
serialization
                eventStream = eventStream
@@ -124,12 +124,12 @@ public static void main(String[] args) throws Exception {
                                                new 
AvroSerializer<>(ComplexPayloadAvro.class)), // custom AvroSerializer
                                        
Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type 
extraction
                                )
-                       ).returns(Event.class).name(KEYED_STATE_OPER_NAME + 
"_Avro");
+                       ).returns(Event.class).name(KEYED_STATE_OPER_NAME + 
"_Avro").uid("0003");
 
                DataStream<Event> eventStream2 = eventStream
                        
.map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in))
-                       .name(OPERATOR_STATE_OPER_NAME)
-                       .returns(Event.class);
+                       .returns(Event.class)
+                       .name(OPERATOR_STATE_OPER_NAME).uid("0004");
 
                // apply a tumbling window that simply passes forward window 
elements;
                // this allows the job to cover timers state
@@ -141,19 +141,20 @@ public void apply(Integer integer, TimeWindow window, 
Iterable<Event> input, Col
                                                out.collect(e);
                                        }
                                }
-                       }).name(TIME_WINDOW_OPER_NAME);
+                       }).name(TIME_WINDOW_OPER_NAME).uid("0005");
 
                if (isSimulateFailures(pt)) {
                        eventStream3 = eventStream3
                                .map(createFailureMapper(pt))
                                .setParallelism(1)
-                               .name(FAILURE_MAPPER_NAME);
+                               .name(FAILURE_MAPPER_NAME).uid("0006");
                }
 
                eventStream3.keyBy(Event::getKey)
                        .flatMap(createSemanticsCheckMapper(pt))
                        .name(SEMANTICS_CHECK_MAPPER_NAME)
-                       .addSink(new PrintSinkFunction<>());
+                       .uid("0007")
+                       .addSink(new PrintSinkFunction<>()).uid("0008");
 
                env.execute("General purpose test job");
        }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Assign uids to all operators
> ----------------------------
>
>                 Key: FLINK-10863
>                 URL: https://issues.apache.org/jira/browse/FLINK-10863
>             Project: Flink
>          Issue Type: Sub-task
>          Components: E2E Tests
>    Affects Versions: 1.5.5, 1.6.2, 1.7.0
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.6.3, 1.7.0
>
>
> We should assign uids to operators in the test so that we can also properly 
> test removing operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to