[ https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993146#comment-15993146 ]
ASF GitHub Bot commented on FLINK-5969: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3778#discussion_r114353483 --- Diff: flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.fs.StringWriter; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OperatorSnapshotUtil; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +/** + * Tests for checking whether {@link BucketingSink} can restore from snapshots that were done + * using the Flink 1.2 {@link BucketingSink}. + * + * <p>For regenerating the binary snapshot file you have to run the {@code write*()} method on + * the Flink 1.2 branch. + */ + +public class BucketingSinkFrom12MigrationTest { + + @ClassRule + public static TemporaryFolder tempFolder = new TemporaryFolder(); + + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + /** + * Manually run this to write binary snapshot data. Remove @Ignore to run. + */ + @Ignore + @Test + public void writeSnapshot() throws Exception { + + final File outDir = tempFolder.newFolder(); + + BucketingSink<String> sink = new BucketingSink<String>(outDir.getAbsolutePath()) + .setWriter(new StringWriter<String>()) + .setBatchSize(5) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = + new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink)); + + testHarness.setup(); + testHarness.open(); + + testHarness.processElement(new StreamRecord<>("test1", 0L)); + testHarness.processElement(new StreamRecord<>("test2", 0L)); + + checkFs(outDir, 1, 1, 0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 0L)); + testHarness.processElement(new StreamRecord<>("test4", 0L)); + testHarness.processElement(new StreamRecord<>("test5", 0L)); + + checkFs(outDir, 1, 4, 0, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L); + + OperatorSnapshotUtil.writeStateHandle(snapshot, "src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot"); + testHarness.close(); + } + + @Test + public void testRestore() throws Exception { + final File outDir = tempFolder.newFolder(); + + ValidatingBucketingSink<String> sink = (ValidatingBucketingSink<String>) new ValidatingBucketingSink<String>(outDir.getAbsolutePath()) + .setWriter(new StringWriter<String>()) + .setBatchSize(5) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>( + new StreamSink<>(sink), 10, 1, 0); + testHarness.setup(); + testHarness.initializeState( + OperatorSnapshotUtil.readStateHandle( + OperatorSnapshotUtil.getResourceFilename("bucketing-sink-migration-test-flink1.2-snapshot"))); + testHarness.open(); + + assertTrue(sink.initializeCalled); + + testHarness.processElement(new StreamRecord<>("test1", 0L)); + testHarness.processElement(new StreamRecord<>("test2", 0L)); + + checkFs(outDir, 1, 1, 0, 0); + + testHarness.close(); + } + + private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { + int inProg = 0; + int pend = 0; + int compl = 0; + int val = 0; + + for (File file: FileUtils.listFiles(outDir, null, true)) { + if (file.getAbsolutePath().endsWith("crc")) { + continue; + } + String path = file.getPath(); + if (path.endsWith(IN_PROGRESS_SUFFIX)) { + inProg++; + } else if (path.endsWith(PENDING_SUFFIX)) { + pend++; + } else if (path.endsWith(VALID_LENGTH_SUFFIX)) { + val++; + } else if (path.contains(PART_PREFIX)) { + compl++; + } + } + + Assert.assertEquals(inprogress, inProg); + Assert.assertEquals(pending, pend); + Assert.assertEquals(completed, compl); + Assert.assertEquals(valid, val); + } + + static class ValidatingBucketingSink<T> extends BucketingSink<T> { + + private static final long serialVersionUID = -4263974081712009141L; + + public boolean initializeCalled = false; + + ValidatingBucketingSink(String basePath) { + super(basePath); + } + + /** + * The actual paths in this depend on the binary checkpoint so it you update this the paths + * here have to be updated as well. + */ + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + OperatorStateStore stateStore = context.getOperatorStateStore(); + + ListState<State<T>> restoredBucketStates = stateStore.getSerializableListState("bucket-states"); + + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); --- End diff -- Fixing > Add savepoint backwards compatibility tests from 1.2 to 1.3 > ----------------------------------------------------------- > > Key: FLINK-5969 > URL: https://issues.apache.org/jira/browse/FLINK-5969 > Project: Flink > Issue Type: Improvement > Components: Tests > Affects Versions: 1.3.0 > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Priority: Blocker > Fix For: 1.3.0 > > > We currently only have tests that test migration from 1.1 to 1.3, because we > added these tests when releasing Flink 1.2. > We have to copy/migrate those tests: > - {{StatefulUDFSavepointMigrationITCase}} > - {{*MigrationTest}} > - {{AbstractKeyedCEPPatternOperator}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)