asfgit closed pull request #7267: [FLINK-11083][Table&SQL] CRowSerializerConfigSnapshot is not instantiable URL: https://github.com/apache/flink/pull/7267
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala index 0ce3aee3739..b3fe5085151 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala @@ -81,7 +81,7 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali // -------------------------------------------------------------------------------------------- override def snapshotConfiguration(): TypeSerializerConfigSnapshot[CRow] = { - new CRowSerializer.CRowSerializerConfigSnapshot(rowSerializer) + new CRowSerializer.CRowSerializerConfigSnapshot(Array(rowSerializer)) } override def ensureCompatibility( @@ -115,9 +115,13 @@ class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSeriali object CRowSerializer { - class CRowSerializerConfigSnapshot(rowSerializers: TypeSerializer[Row]*) + class CRowSerializerConfigSnapshot(rowSerializers: Array[TypeSerializer[Row]]) extends CompositeTypeSerializerConfigSnapshot[CRow](rowSerializers: _*) { + def this() { + this(Array.empty) + } + override def getVersion: Int = CRowSerializerConfigSnapshot.VERSION } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala index 7483b04d9ca..055501a6a01 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowSerializerTest.scala @@ -18,8 +18,20 @@ package org.apache.flink.table.runtime.types -import org.apache.flink.util.TestLogger -import org.junit.Test +import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} +import org.apache.flink.api.common.typeinfo.Types +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, KeyedProcessOperator} +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness +import org.apache.flink.types.Row +import org.apache.flink.util.{Collector, InstantiationUtil, TestLogger} + +import org.junit.{Assert, Test} class CRowSerializerTest extends TestLogger { @@ -29,6 +41,70 @@ class CRowSerializerTest extends TestLogger { @Test def testDefaultConstructor(): Unit = { new CRowSerializer.CRowSerializerConfigSnapshot() + + InstantiationUtil.instantiate(classOf[CRowSerializer.CRowSerializerConfigSnapshot]) + } + + @Test + def testStateRestore(): Unit = { + + class IKeyedProcessFunction extends KeyedProcessFunction[Integer, Integer, Integer] { + var state: ListState[CRow] = _ + override def open(parameters: Configuration): Unit = { + val stateDesc = new ListStateDescriptor[CRow]("CRow", + new CRowTypeInfo(new RowTypeInfo(Types.INT))) + state = getRuntimeContext.getListState(stateDesc) + } + override def processElement(value: Integer, + ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context, + out: Collector[Integer]): Unit = { + state.add(new CRow(Row.of(value), true)) + } + } + + val operator = new KeyedProcessOperator[Integer, Integer, Integer](new IKeyedProcessFunction) + + var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer]( + operator, + new KeySelector[Integer, Integer] { + override def getKey(value: Integer): Integer= -1 + }, + Types.INT, 1, 1, 0) + testHarness.setup() + testHarness.open() + + testHarness.processElement(new StreamRecord[Integer](1, 1L)) + testHarness.processElement(new StreamRecord[Integer](2, 1L)) + testHarness.processElement(new StreamRecord[Integer](3, 1L)) + + Assert.assertEquals(1, numKeyedStateEntries(operator)) + + val snapshot = testHarness.snapshot(0L, 0L) + testHarness.close() + + testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, Integer, Integer]( + operator, + new KeySelector[Integer, Integer] { + override def getKey(value: Integer): Integer= -1 + }, + Types.INT, 1, 1, 0) + testHarness.setup() + + testHarness.initializeState(snapshot) + + testHarness.open() + + Assert.assertEquals(1, numKeyedStateEntries(operator)) + + testHarness.close() + } + + def numKeyedStateEntries(operator: AbstractStreamOperator[_]): Int = { + val keyedStateBackend = operator.getKeyedStateBackend + keyedStateBackend match { + case hksb: HeapKeyedStateBackend[_] => hksb.numKeyValueStateEntries + case _ => throw new UnsupportedOperationException + } } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services