[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15985377#comment-15985377 ]
ASF GitHub Bot commented on FLINK-5969: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r113440360 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingFrom12MigrationTest.java --- @@ -0,0 +1,378 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.hdfstests; + +import java.io.FileOutputStream; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.io.TextInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; +import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; +import org.apache.flink.streaming.api.functions.source.FileProcessingMode; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.flink.util.Preconditions; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; + +public class ContinuousFileProcessingFrom12MigrationTest { + + private static final int LINES_PER_FILE = 10; + + private static final long INTERVAL = 100; + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + /** + * Manually run this to write binary snapshot data. Remove @Ignore to run. + */ + @Ignore + @Test + public void writeReaderSnapshot() throws Exception { + + File testFolder = tempFolder.newFolder(); + + TimestampedFileInputSplit split1 = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit split2 = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); + + TimestampedFileInputSplit split3 = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit split4 = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + final OneShotLatch latch = new OneShotLatch(); + BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath())); + TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); + ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>( + format); + initReader.setOutputType(typeInfo, new ExecutionConfig()); + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness = + new OneInputStreamOperatorTestHarness<>(initReader); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + testHarness.open(); + // create some state in the reader + testHarness.processElement(new StreamRecord<>(split1)); + testHarness.processElement(new StreamRecord<>(split2)); + testHarness.processElement(new StreamRecord<>(split3)); + testHarness.processElement(new StreamRecord<>(split4)); + // take a snapshot of the operator's state. This will be used + // to initialize another reader and compare the results of the + // two operators. + + final OperatorStateHandles snapshot; + synchronized (testHarness.getCheckpointLock()) { + snapshot = testHarness.snapshot(0L, 0L); + } + + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/reader-migration-test-flink1.2-snapshot"); + } + + @Test + public void testReaderRestore() throws Exception { + File testFolder = tempFolder.newFolder(); + + TimestampedFileInputSplit split1 = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit split2 = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); + + TimestampedFileInputSplit split3 = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit split4 = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + + final OneShotLatch latch = new OneShotLatch(); + + BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(testFolder.getAbsolutePath())); + TypeInformation<FileInputSplit> typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator<FileInputSplit> initReader = new ContinuousFileReaderOperator<>(format); + initReader.setOutputType(typeInfo, new ExecutionConfig()); + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness = + new OneInputStreamOperatorTestHarness<>(initReader); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + + testHarness.setup(); + OperatorStateHandles operatorStateHandles = OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename( + "reader-migration-test-flink1.2-snapshot")); + testHarness.initializeState(operatorStateHandles); + testHarness.open(); + + latch.trigger(); + + // ... and wait for the operators to close gracefully + + synchronized (testHarness.getCheckpointLock()) { + testHarness.close(); + } + + FileInputSplit fsSplit1 = createSplitFromTimestampedSplit(split1); --- End diff -- you could create the FileInputSplits directly here, remove the TimestampedFileInputSplit instantiation at the start of this method and then remove the helper function. > Add savepoint backwards compatibility tests from 1.2 to 1.3 > ----------------------------------------------------------- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests > Affects Versions: 1.3.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.3.0 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)