[ https://issues.apache.org/jira/browse/FLINK-10275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16613554#comment-16613554 ]
ASF GitHub Bot commented on FLINK-10275: ---------------------------------------- TisonKun closed pull request #6643: [FLINK-10275] StreamTask support object reuse URL: https://github.com/apache/flink/pull/6643 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/execution_configuration.md b/docs/dev/execution_configuration.md index f0103b0f39f..3991ab59ac5 100644 --- a/docs/dev/execution_configuration.md +++ b/docs/dev/execution_configuration.md @@ -59,7 +59,7 @@ With the closure cleaner disabled, it might happen that an anonymous user functi - `enableForceAvro()` / **`disableForceAvro()`**. Avro is not forced by default. Forces the Flink AvroTypeInformation to use the Avro serializer instead of Kryo for serializing Avro POJOs. -- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior. +- `enableObjectReuse()` / **`disableObjectReuse()`** By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-defined function of an operation is not aware of this behavior. - **`enableSysoutLogging()`** / `disableSysoutLogging()` JobManager status updates are printed to `System.out` by default. This setting allows to disable this behavior. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 59fa803791a..d36fd296562 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -601,8 +601,8 @@ public boolean isForceAvroEnabled() { /** * Enables reusing objects that Flink internally uses for deserialization and passing - * data to user-code functions. Keep in mind that this can lead to bugs when the - * user-code function of an operation is not aware of this behaviour. + * data to user-defined functions. Keep in mind that this can lead to bugs when the + * user-defined function of an operation is not aware of this behaviour. */ public ExecutionConfig enableObjectReuse() { objectReuse = true; @@ -611,7 +611,7 @@ public ExecutionConfig enableObjectReuse() { /** * Disables reusing objects that Flink internally uses for deserialization and passing - * data to user-code functions. @see #enableObjectReuse() + * data to user-defined functions. @see #enableObjectReuse() */ public ExecutionConfig disableObjectReuse() { objectReuse = false; diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala index d8ba29ae478..f4185f4c129 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala @@ -51,7 +51,7 @@ class TableSinkITCase( val input = CollectionDataSets.get3TupleDataSet(env) .map(x => x).setParallelism(4) // increase DOP to 4 - val results = input.toTable(tEnv, 'a, 'b, 'c) + input.toTable(tEnv, 'a, 'b, 'c) .where('a < 5 || 'a > 17) .select('c, 'b) .writeToSink(new CsvTableSink(path, fieldDelim = "|")) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index 70e59f3d24d..95cb1df1b04 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -21,10 +21,12 @@ package org.apache.flink.table.runtime.stream.table import java.io.File import java.lang.{Boolean => JBool} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream @@ -54,7 +56,7 @@ class TableSinkITCase extends AbstractTestBase { env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tEnv = TableEnvironment.getTableEnvironment(env) - MemoryTableSourceSinkUtil.clear + MemoryTableSourceSinkUtil.clear() val input = StreamTestData.get3TupleDataStream(env) .assignAscendingTimestamps(r => r._2) @@ -560,8 +562,7 @@ private[flink] class TestAppendSink extends AppendStreamTableSink[Row] { var fTypes: Array[TypeInformation[_]] = _ override def emitDataStream(s: DataStream[Row]): Unit = { - s.map( - new MapFunction[Row, JTuple2[JBool, Row]] { + s.map(new MapFunction[Row, JTuple2[JBool, Row]] { override def map(value: Row): JTuple2[JBool, Row] = new JTuple2(true, value) }) .addSink(new RowSink) @@ -661,12 +662,11 @@ object RowCollector { new mutable.ArrayBuffer[JTuple2[JBool, Row]]() def addValue(value: JTuple2[JBool, Row]): Unit = { + // make a deep copy + val copy = new JTuple2[JBool, Row](value.f0, + new KryoSerializer(classOf[Row], new ExecutionConfig()).copy(value.f1)) - // make a copy - val copy = new JTuple2[JBool, Row](value.f0, Row.copy(value.f1)) - sink.synchronized { - sink += copy - } + sink.synchronized { sink += copy } } def getAndClearValues: List[JTuple2[JBool, Row]] = { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index a9c64b5f6fe..769605ba704 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; +import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -109,6 +110,8 @@ private boolean isFinished; + private IN reusedObject; + @SuppressWarnings("unchecked") public StreamInputProcessor( InputGate[] inputGates, @@ -121,7 +124,8 @@ public StreamInputProcessor( StreamStatusMaintainer streamStatusMaintainer, OneInputStreamOperator<IN, ?> streamOperator, TaskIOMetricGroup metrics, - WatermarkGauge watermarkGauge) throws IOException { + WatermarkGauge watermarkGauge, + boolean objectReuse) throws IOException { InputGate inputGate = InputGateUtil.createInputGate(inputGates); @@ -131,7 +135,10 @@ public StreamInputProcessor( this.lock = checkNotNull(lock); StreamElementSerializer<IN> ser = new StreamElementSerializer<>(inputSerializer); - this.deserializationDelegate = new NonReusingDeserializationDelegate<>(ser); + this.deserializationDelegate = objectReuse ? + new ReusingDeserializationDelegate<>(ser) : + new NonReusingDeserializationDelegate<>(ser); + this.reusedObject = objectReuse ? inputSerializer.createInstance() : null; // Initialize one deserializer per input channel this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; @@ -169,6 +176,10 @@ public boolean processInput() throws Exception { while (true) { if (currentRecordDeserializer != null) { + if (reusedObject != null){ + deserializationDelegate.setInstance(new StreamRecord<>(reusedObject)); + } + DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { @@ -194,6 +205,8 @@ public boolean processInput() throws Exception { } continue; } else { + reusedObject = ((StreamRecord<IN>) recordOrMark).getValue(); + // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index ab4f90dcf23..531d6cbf28c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; +import org.apache.flink.runtime.plugable.ReusingDeserializationDelegate; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -125,6 +126,9 @@ private boolean isFinished; + private IN1 reusedObject1; + private IN2 reusedObject2; + @SuppressWarnings("unchecked") public StreamTwoInputProcessor( Collection<InputGate> inputGates1, @@ -140,7 +144,8 @@ public StreamTwoInputProcessor( TwoInputStreamOperator<IN1, IN2, ?> streamOperator, TaskIOMetricGroup metrics, WatermarkGauge input1WatermarkGauge, - WatermarkGauge input2WatermarkGauge) throws IOException { + WatermarkGauge input2WatermarkGauge, + boolean objectReuse) throws IOException { final InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2); @@ -150,10 +155,17 @@ public StreamTwoInputProcessor( this.lock = checkNotNull(lock); StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<>(inputSerializer1); - this.deserializationDelegate1 = new NonReusingDeserializationDelegate<>(ser1); + this.deserializationDelegate1 = objectReuse ? + new ReusingDeserializationDelegate<>(ser1) : + new NonReusingDeserializationDelegate<>(ser1); + this.reusedObject1 = objectReuse ? inputSerializer1.createInstance() : null; StreamElementSerializer<IN2> ser2 = new StreamElementSerializer<>(inputSerializer2); - this.deserializationDelegate2 = new NonReusingDeserializationDelegate<>(ser2); + this.deserializationDelegate2 = objectReuse ? + new ReusingDeserializationDelegate<>(ser2) : + new NonReusingDeserializationDelegate<>(ser2); + + this.reusedObject2 = objectReuse ? inputSerializer2.createInstance() : null; // Initialize one deserializer per input channel this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()]; @@ -203,8 +215,14 @@ public boolean processInput() throws Exception { if (currentRecordDeserializer != null) { DeserializationResult result; if (currentChannel < numInputChannels1) { + if (reusedObject1 != null){ + deserializationDelegate1.setInstance(new StreamRecord<>(reusedObject1)); + } result = currentRecordDeserializer.getNextRecord(deserializationDelegate1); } else { + if (reusedObject2 != null){ + deserializationDelegate2.setInstance(new StreamRecord<>(reusedObject2)); + } result = currentRecordDeserializer.getNextRecord(deserializationDelegate2); } @@ -231,6 +249,8 @@ else if (recordOrWatermark.isLatencyMarker()) { continue; } else { + reusedObject1 = ((StreamRecord<IN1>) recordOrWatermark).getValue(); + StreamRecord<IN1> record = recordOrWatermark.asRecord(); synchronized (lock) { numRecordsIn.inc(); @@ -258,6 +278,8 @@ else if (recordOrWatermark.isLatencyMarker()) { continue; } else { + reusedObject2 = ((StreamRecord<IN2>) recordOrWatermark).getValue(); + StreamRecord<IN2> record = recordOrWatermark.asRecord(); synchronized (lock) { numRecordsIn.inc(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index ed6022ff592..8503eb21d70 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -225,15 +225,15 @@ else if (tag == TAG_LATENCY_MARKER) { public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException { int tag = source.readByte(); if (tag == TAG_REC_WITH_TIMESTAMP) { - long timestamp = source.readLong(); - T value = typeSerializer.deserialize(source); StreamRecord<T> reuseRecord = reuse.asRecord(); + long timestamp = source.readLong(); + T value = typeSerializer.deserialize(reuseRecord.getValue(), source); reuseRecord.replace(value, timestamp); return reuseRecord; } else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { - T value = typeSerializer.deserialize(source); StreamRecord<T> reuseRecord = reuse.asRecord(); + T value = typeSerializer.deserialize(reuseRecord.getValue(), source); reuseRecord.replace(value); return reuseRecord; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java index 74985186836..91671be0a5c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java @@ -90,7 +90,8 @@ public void init() throws Exception { getStreamStatusMaintainer(), this.headOperator, getEnvironment().getMetricGroup().getIOMetricGroup(), - inputWatermarkGauge); + inputWatermarkGauge, + getExecutionConfig().isObjectReuseEnabled()); } headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); // wrap watermark gauge since registered metrics must be unique diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 546ccdb3bfc..1199251dd44 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -100,7 +100,8 @@ public void init() throws Exception { this.headOperator, getEnvironment().getMetricGroup().getIOMetricGroup(), input1WatermarkGauge, - input2WatermarkGauge); + input2WatermarkGauge, + getExecutionConfig().isObjectReuseEnabled()); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index 96eaa78ed1a..94fdd0fc1b5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -24,9 +24,13 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.Counter; @@ -79,6 +83,7 @@ import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -108,20 +113,20 @@ public void testOpenCloseAndTimestamps() throws Exception { testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - StreamMap<String, String> mapOperator = new StreamMap<String, String>(new TestOpenCloseMapFunction()); + StreamMap<String, String> mapOperator = new StreamMap<>(new TestOpenCloseMapFunction()); streamConfig.setStreamOperator(mapOperator); streamConfig.setOperatorID(new OperatorID()); long initialTime = 0L; - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.invoke(); testHarness.waitForTaskRunning(); - testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1)); - testHarness.processElement(new StreamRecord<String>("Ciao", initialTime + 2)); - expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1)); - expectedOutput.add(new StreamRecord<String>("Ciao", initialTime + 2)); + testHarness.processElement(new StreamRecord<>("Hello", initialTime + 1)); + testHarness.processElement(new StreamRecord<>("Ciao", initialTime + 2)); + expectedOutput.add(new StreamRecord<>("Hello", initialTime + 1)); + expectedOutput.add(new StreamRecord<>("Ciao", initialTime + 2)); testHarness.endInput(); @@ -152,11 +157,11 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap()); + StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap()); streamConfig.setStreamOperator(mapOperator); streamConfig.setOperatorID(new OperatorID()); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); long initialTime = 0L; testHarness.invoke(); @@ -180,10 +185,10 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { testHarness.getOutput()); // contrary to checkpoint barriers these elements are not blocked by watermarks - testHarness.processElement(new StreamRecord<String>("Hello", initialTime)); - testHarness.processElement(new StreamRecord<String>("Ciao", initialTime)); - expectedOutput.add(new StreamRecord<String>("Hello", initialTime)); - expectedOutput.add(new StreamRecord<String>("Ciao", initialTime)); + testHarness.processElement(new StreamRecord<>("Hello", initialTime)); + testHarness.processElement(new StreamRecord<>("Ciao", initialTime)); + expectedOutput.add(new StreamRecord<>("Hello", initialTime)); + expectedOutput.add(new StreamRecord<>("Ciao", initialTime)); testHarness.processElement(new Watermark(initialTime + 4), 0, 0); testHarness.processElement(new Watermark(initialTime + 3), 0, 1); @@ -274,7 +279,7 @@ public void testWatermarksNotForwardedWithinChainWhenIdle() throws Exception { // --------------------- begin test --------------------- - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.invoke(); testHarness.waitForTaskRunning(); @@ -364,11 +369,11 @@ public void testCheckpointBarriers() throws Exception { testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap()); + StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap()); streamConfig.setStreamOperator(mapOperator); streamConfig.setOperatorID(new OperatorID()); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); long initialTime = 0L; testHarness.invoke(); @@ -378,16 +383,16 @@ public void testCheckpointBarriers() throws Exception { // These elements should be buffered until we receive barriers from // all inputs - testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0); - testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0); + testHarness.processElement(new StreamRecord<>("Hello-0-0", initialTime), 0, 0); + testHarness.processElement(new StreamRecord<>("Ciao-0-0", initialTime), 0, 0); // These elements should be forwarded, since we did not yet receive a checkpoint barrier // on that input, only add to same input, otherwise we would not know the ordering // of the output since the Task might read the inputs in any order - testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1); - testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1); - expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime)); - expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime)); + testHarness.processElement(new StreamRecord<>("Hello-1-1", initialTime), 1, 1); + testHarness.processElement(new StreamRecord<>("Ciao-1-1", initialTime), 1, 1); + expectedOutput.add(new StreamRecord<>("Hello-1-1", initialTime)); + expectedOutput.add(new StreamRecord<>("Ciao-1-1", initialTime)); testHarness.waitForInputProcessing(); // we should not yet see the barrier, only the two elements from non-blocked input @@ -401,8 +406,8 @@ public void testCheckpointBarriers() throws Exception { // now we should see the barrier and after that the buffered elements expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation())); - expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); - expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); + expectedOutput.add(new StreamRecord<>("Hello-0-0", initialTime)); + expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime)); testHarness.endInput(); @@ -427,11 +432,11 @@ public void testOvertakingCheckpointBarriers() throws Exception { testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap()); + StreamMap<String, String> mapOperator = new StreamMap<>(new IdentityMap()); streamConfig.setStreamOperator(mapOperator); streamConfig.setOperatorID(new OperatorID()); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); long initialTime = 0L; testHarness.invoke(); @@ -441,16 +446,16 @@ public void testOvertakingCheckpointBarriers() throws Exception { // These elements should be buffered until we receive barriers from // all inputs - testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0); - testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0); + testHarness.processElement(new StreamRecord<>("Hello-0-0", initialTime), 0, 0); + testHarness.processElement(new StreamRecord<>("Ciao-0-0", initialTime), 0, 0); // These elements should be forwarded, since we did not yet receive a checkpoint barrier // on that input, only add to same input, otherwise we would not know the ordering // of the output since the Task might read the inputs in any order - testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1); - testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1); - expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime)); - expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime)); + testHarness.processElement(new StreamRecord<>("Hello-1-1", initialTime), 1, 1); + testHarness.processElement(new StreamRecord<>("Ciao-1-1", initialTime), 1, 1); + expectedOutput.add(new StreamRecord<>("Hello-1-1", initialTime)); + expectedOutput.add(new StreamRecord<>("Ciao-1-1", initialTime)); testHarness.waitForInputProcessing(); // we should not yet see the barrier, only the two elements from non-blocked input @@ -464,8 +469,8 @@ public void testOvertakingCheckpointBarriers() throws Exception { testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1); expectedOutput.add(new CancelCheckpointMarker(0)); - expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); - expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); + expectedOutput.add(new StreamRecord<>("Hello-0-0", initialTime)); + expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime)); expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpointWithDefaultLocation())); testHarness.waitForInputProcessing(); @@ -759,6 +764,95 @@ public void processWatermark(Watermark mark) throws Exception { } } + @Test + public void testMutableObjectReuse() throws Exception { + final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( + OneInputStreamTask::new, + new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), + new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)); + + testHarness.setupOperatorChain(new OperatorID(), new TestMutableObjectReuseOperator(true)) + .chain(new OperatorID(), new TestMutableObjectReuseOperator(), + new TupleSerializer(Tuple2.class, + new TypeSerializer<?>[]{ + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())})) + .finish(); + + ExecutionConfig executionConfig = testHarness.getExecutionConfig(); + executionConfig.enableObjectReuse(); + + long initialTime = 0L; + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("Hello", 1))); + testHarness.processElement(new StreamRecord<>(Tuple2.of("Hello", 2), initialTime + 1)); + testHarness.processElement(new Watermark(initialTime + 1)); + testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao", 1), initialTime + 2)); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation())); + testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao", 2), initialTime + 3)); + + expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 1))); + expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 2), initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 1)); + expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 1), initialTime + 2)); + expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation())); + expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 2), initialTime + 3)); + + testHarness.endInput(); + + testHarness.waitForTaskCompletion(); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", + expectedOutput, + testHarness.getOutput()); + } + + // This must only be used in one test, otherwise the static fields will be changed + // by several tests concurrently + private static class TestMutableObjectReuseOperator + extends AbstractStreamOperator<Tuple2<String, Integer>> + implements OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> { + + private static final long serialVersionUID = 1L; + + private final boolean isHeadOperator; + private static Object headOperatorValue; + + private Object prevRecord = null; + private Object prevValue = null; + + public TestMutableObjectReuseOperator() { + this.isHeadOperator = false; + } + + public TestMutableObjectReuseOperator(boolean isHeadOperator) { + this.isHeadOperator = isHeadOperator; + } + + @Override + public void processElement(StreamRecord<Tuple2<String, Integer>> element) throws Exception { + if (isHeadOperator) { + if (prevRecord != null) { + assertNotEquals("Reuse StreamRecord object in the head operator.", prevRecord, element); + assertEquals("No reuse value object in the head operator.", prevValue, element.getValue()); + } + + prevRecord = element; + prevValue = element.getValue(); + + headOperatorValue = element.getValue(); + } else { + assertEquals("No reuse value object in chain.", headOperatorValue, element.getValue()); + } + + output.collect(element); + } + } + //============================================================================================== // Utility functions and classes //============================================================================================== @@ -807,7 +901,7 @@ private void configureChainedTestingStreamOperator( null ), 0, - Collections.<String>emptyList(), + Collections.emptyList(), null, null ); @@ -973,12 +1067,16 @@ protected void handleWatermark(Watermark mark) { public void processElement(StreamRecord<String> element) throws Exception { output.collect(element); - if (element.getValue().equals(EXPECT_FORWARDED_WATERMARKS_MARKER)) { - this.expectForwardedWatermarks = true; - } else if (element.getValue().equals(NO_FORWARDED_WATERMARKS_MARKER)) { - this.expectForwardedWatermarks = false; - } else { - handleElement(element); + switch (element.getValue()) { + case EXPECT_FORWARDED_WATERMARKS_MARKER: + this.expectForwardedWatermarks = true; + break; + case NO_FORWARDED_WATERMARKS_MARKER: + this.expectForwardedWatermarks = false; + break; + default: + handleElement(element); + break; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java index 5d151573582..e80fdc17154 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java @@ -20,6 +20,10 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; @@ -40,6 +44,7 @@ import org.apache.flink.streaming.api.functions.co.RichCoMapFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.operators.co.CoStreamMap; import org.apache.flink.streaming.api.watermark.Watermark; @@ -56,6 +61,8 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; /** * Tests for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests @@ -83,31 +90,31 @@ public void testOpenCloseAndTimestamps() throws Exception { testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction()); + CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<>(new TestOpenCloseMapFunction()); streamConfig.setStreamOperator(coMapOperator); streamConfig.setOperatorID(new OperatorID()); long initialTime = 0L; - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); testHarness.invoke(); testHarness.waitForTaskRunning(); - testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1), 0, 0); - expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1)); + testHarness.processElement(new StreamRecord<>("Hello", initialTime + 1), 0, 0); + expectedOutput.add(new StreamRecord<>("Hello", initialTime + 1)); // wait until the input is processed to ensure ordering of the output testHarness.waitForInputProcessing(); - testHarness.processElement(new StreamRecord<Integer>(1337, initialTime + 2), 1, 0); + testHarness.processElement(new StreamRecord<>(1337, initialTime + 2), 1, 0); - expectedOutput.add(new StreamRecord<String>("1337", initialTime + 2)); + expectedOutput.add(new StreamRecord<>("1337", initialTime + 2)); testHarness.endInput(); testHarness.waitForTaskCompletion(); - Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled); + assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); } @@ -122,7 +129,7 @@ public void testOpenCloseAndTimestamps() throws Exception { public void testWatermarkAndStreamStatusForwarding() throws Exception { final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = - new TwoInputStreamTaskTestHarness<String, Integer, String>( + new TwoInputStreamTaskTestHarness<>( TwoInputStreamTask::new, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, @@ -131,11 +138,11 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap()); + CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<>(new IdentityMap()); streamConfig.setStreamOperator(coMapOperator); streamConfig.setOperatorID(new OperatorID()); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); long initialTime = 0L; testHarness.invoke(); @@ -158,10 +165,10 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); // contrary to checkpoint barriers these elements are not blocked by watermarks - testHarness.processElement(new StreamRecord<String>("Hello", initialTime), 0, 0); - testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1); - expectedOutput.add(new StreamRecord<String>("Hello", initialTime)); - expectedOutput.add(new StreamRecord<String>("42", initialTime)); + testHarness.processElement(new StreamRecord<>("Hello", initialTime), 0, 0); + testHarness.processElement(new StreamRecord<>(42, initialTime), 1, 1); + expectedOutput.add(new StreamRecord<>("Hello", initialTime)); + expectedOutput.add(new StreamRecord<>("42", initialTime)); testHarness.waitForInputProcessing(); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); @@ -234,18 +241,18 @@ public void testWatermarkAndStreamStatusForwarding() throws Exception { @SuppressWarnings("unchecked") public void testCheckpointBarriers() throws Exception { final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = - new TwoInputStreamTaskTestHarness<String, Integer, String>( + new TwoInputStreamTaskTestHarness<>( TwoInputStreamTask::new, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap()); + CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<>(new IdentityMap()); streamConfig.setStreamOperator(coMapOperator); streamConfig.setOperatorID(new OperatorID()); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); long initialTime = 0L; testHarness.invoke(); @@ -255,21 +262,21 @@ public void testCheckpointBarriers() throws Exception { // This element should be buffered since we received a checkpoint barrier on // this input - testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0); + testHarness.processElement(new StreamRecord<>("Hello-0-0", initialTime), 0, 0); // This one should go through - testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 1); - expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); + testHarness.processElement(new StreamRecord<>("Ciao-0-0", initialTime), 0, 1); + expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime)); testHarness.waitForInputProcessing(); // These elements should be forwarded, since we did not yet receive a checkpoint barrier // on that input, only add to same input, otherwise we would not know the ordering // of the output since the Task might read the inputs in any order - testHarness.processElement(new StreamRecord<Integer>(11, initialTime), 1, 1); - testHarness.processElement(new StreamRecord<Integer>(111, initialTime), 1, 1); - expectedOutput.add(new StreamRecord<String>("11", initialTime)); - expectedOutput.add(new StreamRecord<String>("111", initialTime)); + testHarness.processElement(new StreamRecord<>(11, initialTime), 1, 1); + testHarness.processElement(new StreamRecord<>(111, initialTime), 1, 1); + expectedOutput.add(new StreamRecord<>("11", initialTime)); + expectedOutput.add(new StreamRecord<>("111", initialTime)); testHarness.waitForInputProcessing(); @@ -298,7 +305,7 @@ public void testCheckpointBarriers() throws Exception { // now we should see the barrier and after that the buffered elements expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation())); - expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); + expectedOutput.add(new StreamRecord<>("Hello-0-0", initialTime)); TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, @@ -326,11 +333,11 @@ public void testOvertakingCheckpointBarriers() throws Exception { testHarness.setupOutputForSingletonOperatorChain(); StreamConfig streamConfig = testHarness.getStreamConfig(); - CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap()); + CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<>(new IdentityMap()); streamConfig.setStreamOperator(coMapOperator); streamConfig.setOperatorID(new OperatorID()); - ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); long initialTime = 0L; testHarness.invoke(); @@ -340,16 +347,16 @@ public void testOvertakingCheckpointBarriers() throws Exception { // These elements should be buffered until we receive barriers from // all inputs - testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0); - testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0); + testHarness.processElement(new StreamRecord<>("Hello-0-0", initialTime), 0, 0); + testHarness.processElement(new StreamRecord<>("Ciao-0-0", initialTime), 0, 0); // These elements should be forwarded, since we did not yet receive a checkpoint barrier // on that input, only add to same input, otherwise we would not know the ordering // of the output since the Task might read the inputs in any order - testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1); - testHarness.processElement(new StreamRecord<Integer>(1337, initialTime), 1, 1); - expectedOutput.add(new StreamRecord<String>("42", initialTime)); - expectedOutput.add(new StreamRecord<String>("1337", initialTime)); + testHarness.processElement(new StreamRecord<>(42, initialTime), 1, 1); + testHarness.processElement(new StreamRecord<>(1337, initialTime), 1, 1); + expectedOutput.add(new StreamRecord<>("42", initialTime)); + expectedOutput.add(new StreamRecord<>("1337", initialTime)); testHarness.waitForInputProcessing(); // we should not yet see the barrier, only the two elements from non-blocked input @@ -365,8 +372,8 @@ public void testOvertakingCheckpointBarriers() throws Exception { testHarness.processEvent(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1); expectedOutput.add(new CancelCheckpointMarker(0)); - expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime)); - expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime)); + expectedOutput.add(new StreamRecord<>("Hello-0-0", initialTime)); + expectedOutput.add(new StreamRecord<>("Ciao-0-0", initialTime)); expectedOutput.add(new CheckpointBarrier(1, 1, CheckpointOptions.forCheckpointWithDefaultLocation())); testHarness.waitForInputProcessing(); @@ -611,5 +618,118 @@ public String map2(Integer value) throws Exception { return value.toString(); } } + + @Test + public void testMutableObjectReuse() throws Exception { + final TwoInputStreamTaskTestHarness<String, String, String> testHarness = new TwoInputStreamTaskTestHarness<>( + TwoInputStreamTask::new, + new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), + new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO), + new TupleTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)); + + testHarness.setupOperatorChain(new OperatorID(), new TestMutableObjectReuseHeadOperator()) + .chain(new OperatorID(), new TestMutableObjectReuseHeadOperator.TestMutableObjectReuseNextOperator(), + new TupleSerializer(Tuple2.class, + new TypeSerializer<?>[]{BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig())})) + .finish(); + + ExecutionConfig executionConfig = testHarness.getExecutionConfig(); + executionConfig.enableObjectReuse(); + + long initialTime = 0L; + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>(); + + testHarness.invoke(); + testHarness.waitForTaskRunning(); + + testHarness.processElement(new StreamRecord<>(Tuple2.of("Hello", 1)), 0, 0); + testHarness.processElement(new StreamRecord<>(Tuple2.of("Hello", 2), initialTime + 1), 1, 0); + testHarness.processElement(new Watermark(initialTime + 1), 0, 0); + testHarness.processElement(new Watermark(initialTime + 1), 1, 0); + testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao", 1), initialTime + 2), 0, 0); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0); + testHarness.processEvent(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0); + testHarness.processElement(new StreamRecord<>(Tuple2.of("Ciao", 2), initialTime + 3), 1, 0); + + expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 1))); + expectedOutput.add(new StreamRecord<>(Tuple2.of("Hello", 2), initialTime + 1)); + expectedOutput.add(new Watermark(initialTime + 1)); + expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 1), initialTime + 2)); + expectedOutput.add(new CheckpointBarrier(0, 0, CheckpointOptions.forCheckpointWithDefaultLocation())); + expectedOutput.add(new StreamRecord<>(Tuple2.of("Ciao", 2), initialTime + 3)); + + testHarness.endInput(); + + testHarness.waitForTaskCompletion(); + + TestHarnessUtil.assertOutputEquals("Output was not correct.", + expectedOutput, + testHarness.getOutput()); + } + + // This must only be used in one test, otherwise the static fields will be changed + // by several tests concurrently + private static class TestMutableObjectReuseHeadOperator + extends AbstractStreamOperator<Tuple2<String, Integer>> + implements TwoInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> { + + private static final long serialVersionUID = 1L; + + private static Object headOperatorValue; + + private Object prevRecord1 = null; + private Object prevValue1 = null; + + private Object prevRecord2 = null; + private Object prevValue2 = null; + + @Override + public void processElement1(StreamRecord<Tuple2<String, Integer>> element) throws Exception { + if (prevRecord1 != null) { + assertNotEquals("Reuse StreamRecord object in the 1th input of the head operator.", prevRecord1, element); + assertEquals("No reuse value object in the 1th input of the head operator.", prevValue1, element.getValue()); + } + + prevRecord1 = element; + prevValue1 = element.getValue(); + + headOperatorValue = element.getValue(); + + output.collect(element); + } + + @Override + public void processElement2(StreamRecord<Tuple2<String, Integer>> element) { + if (prevRecord2 != null) { + assertNotEquals("Reuse StreamRecord object in the 2th input of the head operator.", prevRecord2, element); + assertEquals("No reuse value object in the 2th input of the head operator.", prevValue2, element.getValue()); + + if (prevValue1 != null) { + assertTrue("Reuse the same value object in two inputs of the head operator.", prevValue1 != prevValue2); + } + } + + prevRecord2 = element; + prevValue2 = element.getValue(); + + headOperatorValue = element.getValue(); + + output.collect(element); + } + + private static class TestMutableObjectReuseNextOperator + extends AbstractStreamOperator<Tuple2<String, Integer>> + implements OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> { + + private static final long serialVersionUID = 1L; + + @Override + public void processElement(StreamRecord<Tuple2<String, Integer>> element) throws Exception { + assertEquals("No reuse value object in chain.", headOperatorValue, element.getValue()); + + output.collect(element); + } + } + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > StreamTask support object reuse > ------------------------------- > > Key: FLINK-10275 > URL: https://issues.apache.org/jira/browse/FLINK-10275 > Project: Flink > Issue Type: Improvement > Components: Streaming > Affects Versions: 1.7.0 > Reporter: 陈梓立 > Assignee: 陈梓立 > Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > StreamTask support efficient object reuse. The purpose behind this is to > reduce pressure on the garbage collector. > All objects are reused, without backup copies. The operators and UDFs must be > careful to not keep any objects as state or not to modify the objects. -- This message was sent by Atlassian JIRA (v7.6.3#76005)