[ 
https://issues.apache.org/jira/browse/FLINK-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993146#comment-15993146
 ] 

ASF GitHub Bot commented on FLINK-5969:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3778#discussion_r114353483
  
    --- Diff: 
flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkFrom12MigrationTest.java
 ---
    @@ -0,0 +1,224 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.streaming.connectors.fs.bucketing;
    +
    +import static org.junit.Assert.assertTrue;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +import org.apache.commons.io.FileUtils;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.OperatorStateStore;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.streaming.api.operators.StreamSink;
    +import org.apache.flink.streaming.connectors.fs.StringWriter;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
    +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
    +import org.apache.flink.streaming.util.OperatorSnapshotUtil;
    +import org.apache.hadoop.fs.Path;
    +import org.junit.Assert;
    +import org.junit.ClassRule;
    +import org.junit.Ignore;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +/**
    + * Tests for checking whether {@link BucketingSink} can restore from 
snapshots that were done
    + * using the Flink 1.2 {@link BucketingSink}.
    + *
    + * <p>For regenerating the binary snapshot file you have to run the {@code 
write*()} method on
    + * the Flink 1.2 branch.
    + */
    +
    +public class BucketingSinkFrom12MigrationTest {
    +
    +   @ClassRule
    +   public static TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +   private static final String PART_PREFIX = "part";
    +   private static final String PENDING_SUFFIX = ".pending";
    +   private static final String IN_PROGRESS_SUFFIX = ".in-progress";
    +   private static final String VALID_LENGTH_SUFFIX = ".valid";
    +
    +   /**
    +    * Manually run this to write binary snapshot data. Remove @Ignore to 
run.
    +    */
    +   @Ignore
    +   @Test
    +   public void writeSnapshot() throws Exception {
    +
    +           final File outDir = tempFolder.newFolder();
    +
    +           BucketingSink<String> sink = new 
BucketingSink<String>(outDir.getAbsolutePath())
    +                   .setWriter(new StringWriter<String>())
    +                   .setBatchSize(5)
    +                   .setPartPrefix(PART_PREFIX)
    +                   .setInProgressPrefix("")
    +                   .setPendingPrefix("")
    +                   .setValidLengthPrefix("")
    +                   .setInProgressSuffix(IN_PROGRESS_SUFFIX)
    +                   .setPendingSuffix(PENDING_SUFFIX)
    +                   .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                   new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(sink));
    +
    +           testHarness.setup();
    +           testHarness.open();
    +
    +           testHarness.processElement(new StreamRecord<>("test1", 0L));
    +           testHarness.processElement(new StreamRecord<>("test2", 0L));
    +
    +           checkFs(outDir, 1, 1, 0, 0);
    +
    +           testHarness.processElement(new StreamRecord<>("test3", 0L));
    +           testHarness.processElement(new StreamRecord<>("test4", 0L));
    +           testHarness.processElement(new StreamRecord<>("test5", 0L));
    +
    +           checkFs(outDir, 1, 4, 0, 0);
    +
    +           OperatorStateHandles snapshot = testHarness.snapshot(0L, 0L);
    +
    +           OperatorSnapshotUtil.writeStateHandle(snapshot, 
"src/test/resources/bucketing-sink-migration-test-flink1.2-snapshot");
    +           testHarness.close();
    +   }
    +
    +   @Test
    +   public void testRestore() throws Exception {
    +           final File outDir = tempFolder.newFolder();
    +
    +           ValidatingBucketingSink<String> sink = 
(ValidatingBucketingSink<String>) new 
ValidatingBucketingSink<String>(outDir.getAbsolutePath())
    +                   .setWriter(new StringWriter<String>())
    +                   .setBatchSize(5)
    +                   .setPartPrefix(PART_PREFIX)
    +                   .setInProgressPrefix("")
    +                   .setPendingPrefix("")
    +                   .setValidLengthPrefix("")
    +                   .setInProgressSuffix(IN_PROGRESS_SUFFIX)
    +                   .setPendingSuffix(PENDING_SUFFIX)
    +                   .setValidLengthSuffix(VALID_LENGTH_SUFFIX);
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness = 
new OneInputStreamOperatorTestHarness<>(
    +                   new StreamSink<>(sink), 10, 1, 0);
    +           testHarness.setup();
    +           testHarness.initializeState(
    +                           OperatorSnapshotUtil.readStateHandle(
    +                                           
OperatorSnapshotUtil.getResourceFilename("bucketing-sink-migration-test-flink1.2-snapshot")));
    +           testHarness.open();
    +
    +           assertTrue(sink.initializeCalled);
    +
    +           testHarness.processElement(new StreamRecord<>("test1", 0L));
    +           testHarness.processElement(new StreamRecord<>("test2", 0L));
    +
    +           checkFs(outDir, 1, 1, 0, 0);
    +
    +           testHarness.close();
    +   }
    +
    +   private void checkFs(File outDir, int inprogress, int pending, int 
completed, int valid) throws IOException {
    +           int inProg = 0;
    +           int pend = 0;
    +           int compl = 0;
    +           int val = 0;
    +
    +           for (File file: FileUtils.listFiles(outDir, null, true)) {
    +                   if (file.getAbsolutePath().endsWith("crc")) {
    +                           continue;
    +                   }
    +                   String path = file.getPath();
    +                   if (path.endsWith(IN_PROGRESS_SUFFIX)) {
    +                           inProg++;
    +                   } else if (path.endsWith(PENDING_SUFFIX)) {
    +                           pend++;
    +                   } else if (path.endsWith(VALID_LENGTH_SUFFIX)) {
    +                           val++;
    +                   } else if (path.contains(PART_PREFIX)) {
    +                           compl++;
    +                   }
    +           }
    +
    +           Assert.assertEquals(inprogress, inProg);
    +           Assert.assertEquals(pending, pend);
    +           Assert.assertEquals(completed, compl);
    +           Assert.assertEquals(valid, val);
    +   }
    +
    +   static class ValidatingBucketingSink<T> extends BucketingSink<T> {
    +
    +           private static final long serialVersionUID = 
-4263974081712009141L;
    +
    +           public boolean initializeCalled = false;
    +
    +           ValidatingBucketingSink(String basePath) {
    +                   super(basePath);
    +           }
    +
    +           /**
    +            * The actual paths in this depend on the binary checkpoint so 
it you update this the paths
    +            * here have to be updated as well.
    +            */
    +           @Override
    +           public void initializeState(FunctionInitializationContext 
context) throws Exception {
    +                   OperatorStateStore stateStore = 
context.getOperatorStateStore();
    +
    +                   ListState<State<T>> restoredBucketStates = 
stateStore.getSerializableListState("bucket-states");
    +
    +                   int subtaskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
    --- End diff --
    
    Fixing


> Add savepoint backwards compatibility tests from 1.2 to 1.3
> -----------------------------------------------------------
>
>                 Key: FLINK-5969
>                 URL: https://issues.apache.org/jira/browse/FLINK-5969
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 1.3.0
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> We currently only have tests that test migration from 1.1 to 1.3, because we 
> added these tests when releasing Flink 1.2.
> We have to copy/migrate those tests:
>  - {{StatefulUDFSavepointMigrationITCase}}
>  - {{*MigrationTest}}
>  - {{AbstractKeyedCEPPatternOperator}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to