[ https://issues.apache.org/jira/browse/FLINK-10863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685395#comment-16685395 ]
ASF GitHub Bot commented on FLINK-10863: ---------------------------------------- asfgit closed pull request #7085: [FLINK-10863][tests] Assign UIDs to all operators in DataStreamAllrou… URL: https://github.com/apache/flink/pull/7085 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java index b14e2af1b52..3c406c7598d 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java @@ -78,7 +78,7 @@ public static void main(String[] args) throws Exception { setupEnvironment(env, pt); // add a keyed stateful map operator, which uses Kryo for state serialization - DataStream<Event> eventStream = env.addSource(createEventSource(pt)) + DataStream<Event> eventStream = env.addSource(createEventSource(pt)).uid("0001") .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) .keyBy(Event::getKey) .map(createArtificialKeyedStateMapper( @@ -97,7 +97,7 @@ public static void main(String[] args) throws Exception { new StatefulComplexPayloadSerializer()), // custom stateful serializer Collections.singletonList(ComplexPayload.class) // KryoSerializer via type extraction ) - ).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo_and_Custom_Stateful"); + ).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Kryo_and_Custom_Stateful").uid("0002"); // add a keyed stateful map operator, which uses Avro for state serialization eventStream = eventStream @@ -124,12 +124,12 @@ public static void main(String[] args) throws Exception { new AvroSerializer<>(ComplexPayloadAvro.class)), // custom AvroSerializer Collections.singletonList(ComplexPayloadAvro.class) // AvroSerializer via type extraction ) - ).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro"); + ).returns(Event.class).name(KEYED_STATE_OPER_NAME + "_Avro").uid("0003"); DataStream<Event> eventStream2 = eventStream .map(createArtificialOperatorStateMapper((MapFunction<Event, Event>) in -> in)) - .name(OPERATOR_STATE_OPER_NAME) - .returns(Event.class); + .returns(Event.class) + .name(OPERATOR_STATE_OPER_NAME).uid("0004"); // apply a tumbling window that simply passes forward window elements; // this allows the job to cover timers state @@ -141,19 +141,20 @@ public void apply(Integer integer, TimeWindow window, Iterable<Event> input, Col out.collect(e); } } - }).name(TIME_WINDOW_OPER_NAME); + }).name(TIME_WINDOW_OPER_NAME).uid("0005"); if (isSimulateFailures(pt)) { eventStream3 = eventStream3 .map(createFailureMapper(pt)) .setParallelism(1) - .name(FAILURE_MAPPER_NAME); + .name(FAILURE_MAPPER_NAME).uid("0006"); } eventStream3.keyBy(Event::getKey) .flatMap(createSemanticsCheckMapper(pt)) .name(SEMANTICS_CHECK_MAPPER_NAME) - .addSink(new PrintSinkFunction<>()); + .uid("0007") + .addSink(new PrintSinkFunction<>()).uid("0008"); env.execute("General purpose test job"); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Assign uids to all operators > ---------------------------- > > Key: FLINK-10863 > URL: https://issues.apache.org/jira/browse/FLINK-10863 > Project: Flink > Issue Type: Sub-task > Components: E2E Tests > Affects Versions: 1.5.5, 1.6.2, 1.7.0 > Reporter: Stefan Richter > Assignee: Stefan Richter > Priority: Major > Labels: pull-request-available > Fix For: 1.5.6, 1.6.3, 1.7.0 > > > We should assign uids to operators in the test so that we can also properly > test removing operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)