kl0u closed pull request #6977: [FLINK-8987] Add end-to-end test for state migration/evolution. URL: https://github.com/apache/flink/pull/6977
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-end-to-end-tests/flink-state-evolution-test/pom.xml b/flink-end-to-end-tests/flink-state-evolution-test/pom.xml new file mode 100644 index 00000000000..aeeb054fd2b --- /dev/null +++ b/flink-end-to-end-tests/flink-state-evolution-test/pom.xml @@ -0,0 +1,113 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.7-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-state-evolution-test</artifactId> + <name>flink-state-evolution-test</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>StatefulStreamingJob</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <finalName>StatefulStreamingJob</finalName> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.test.StatefulStreamingJob</mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>${avro.version}</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory> + <outputDirectory>${project.basedir}/src/main/java/</outputDirectory> + <fieldVisibility>PRIVATE</fieldVisibility> + <includes> + <include>**/*.avsc</include> + </includes> + <!-- + This forces Avro to use Java Strings instead of Avro's Utf8. + This is required since the job relies on equals checks on some String fields + to verify that state restore was successful. + --> + <stringType>String</stringType> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <excludes>**/org/apache/flink/avro/generated/*</excludes> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-end-to-end-tests/flink-state-evolution-test/savepoints/1.6/_metadata b/flink-end-to-end-tests/flink-state-evolution-test/savepoints/1.6/_metadata new file mode 100644 index 00000000000..32b442943a0 Binary files /dev/null and b/flink-end-to-end-tests/flink-state-evolution-test/savepoints/1.6/_metadata differ diff --git a/flink-end-to-end-tests/flink-state-evolution-test/savepoints/1.7/_metadata b/flink-end-to-end-tests/flink-state-evolution-test/savepoints/1.7/_metadata new file mode 100644 index 00000000000..287c05fb174 Binary files /dev/null and b/flink-end-to-end-tests/flink-state-evolution-test/savepoints/1.7/_metadata differ diff --git a/flink-end-to-end-tests/flink-state-evolution-test/src/main/avro/Address.avsc b/flink-end-to-end-tests/flink-state-evolution-test/src/main/avro/Address.avsc new file mode 100644 index 00000000000..453b3242d8c --- /dev/null +++ b/flink-end-to-end-tests/flink-state-evolution-test/src/main/avro/Address.avsc @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + {"namespace": "org.apache.flink.avro.generated", + "type": "record", + "name": "Address", + "fields": [ + {"name": "num", "type": "int"}, + {"name": "street", "type": "string"}, + {"name": "city", "type": "string"}, + {"name": "state", "type": "string"}, + {"name": "zip", "type": "string"}, + {"name": "appno", "type": [ "string", "null" ], "default": "123"} + ] +} \ No newline at end of file diff --git a/flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/avro/generated/Address.java b/flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/avro/generated/Address.java new file mode 100644 index 00000000000..fe6358df143 --- /dev/null +++ b/flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/avro/generated/Address.java @@ -0,0 +1,587 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.flink.avro.generated; + +import org.apache.avro.specific.SpecificData; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@SuppressWarnings("all") +@org.apache.avro.specific.AvroGenerated +public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 6397222640020965888L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Address\",\"namespace\":\"org.apache.flink.avro.generated\",\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"street\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"city\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"state\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"zip\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"appno\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\"},\"null\"],\"default\":\"123\"}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder<Address> ENCODER = + new BinaryMessageEncoder<Address>(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder<Address> DECODER = + new BinaryMessageDecoder<Address>(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageDecoder instance used by this class. + */ + public static BinaryMessageDecoder<Address> getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + */ + public static BinaryMessageDecoder<Address> createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<Address>(MODEL$, SCHEMA$, resolver); + } + + /** Serializes this Address to a ByteBuffer. */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** Deserializes a Address from a ByteBuffer. */ + public static Address fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + private int num; + private java.lang.String street; + private java.lang.String city; + private java.lang.String state; + private java.lang.String zip; + private java.lang.String appno; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use <code>newBuilder()</code>. + */ + public Address() {} + + /** + * All-args constructor. + * @param num The new value for num + * @param street The new value for street + * @param city The new value for city + * @param state The new value for state + * @param zip The new value for zip + * @param appno The new value for appno + */ + public Address(java.lang.Integer num, java.lang.String street, java.lang.String city, java.lang.String state, java.lang.String zip, java.lang.String appno) { + this.num = num; + this.street = street; + this.city = city; + this.state = state; + this.zip = zip; + this.appno = appno; + } + + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return num; + case 1: return street; + case 2: return city; + case 3: return state; + case 4: return zip; + case 5: return appno; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: num = (java.lang.Integer)value$; break; + case 1: street = (java.lang.String)value$; break; + case 2: city = (java.lang.String)value$; break; + case 3: state = (java.lang.String)value$; break; + case 4: zip = (java.lang.String)value$; break; + case 5: appno = (java.lang.String)value$; break; + default: throw new org.apache.avro.AvroRuntimeException("Bad index"); + } + } + + /** + * Gets the value of the 'num' field. + * @return The value of the 'num' field. + */ + public java.lang.Integer getNum() { + return num; + } + + /** + * Sets the value of the 'num' field. + * @param value the value to set. + */ + public void setNum(java.lang.Integer value) { + this.num = value; + } + + /** + * Gets the value of the 'street' field. + * @return The value of the 'street' field. + */ + public java.lang.String getStreet() { + return street; + } + + /** + * Sets the value of the 'street' field. + * @param value the value to set. + */ + public void setStreet(java.lang.String value) { + this.street = value; + } + + /** + * Gets the value of the 'city' field. + * @return The value of the 'city' field. + */ + public java.lang.String getCity() { + return city; + } + + /** + * Sets the value of the 'city' field. + * @param value the value to set. + */ + public void setCity(java.lang.String value) { + this.city = value; + } + + /** + * Gets the value of the 'state' field. + * @return The value of the 'state' field. + */ + public java.lang.String getState() { + return state; + } + + /** + * Sets the value of the 'state' field. + * @param value the value to set. + */ + public void setState(java.lang.String value) { + this.state = value; + } + + /** + * Gets the value of the 'zip' field. + * @return The value of the 'zip' field. + */ + public java.lang.String getZip() { + return zip; + } + + /** + * Sets the value of the 'zip' field. + * @param value the value to set. + */ + public void setZip(java.lang.String value) { + this.zip = value; + } + + /** + * Gets the value of the 'appno' field. + * @return The value of the 'appno' field. + */ + public java.lang.String getAppno() { + return appno; + } + + /** + * Sets the value of the 'appno' field. + * @param value the value to set. + */ + public void setAppno(java.lang.String value) { + this.appno = value; + } + + /** + * Creates a new Address RecordBuilder. + * @return A new Address RecordBuilder + */ + public static org.apache.flink.avro.generated.Address.Builder newBuilder() { + return new org.apache.flink.avro.generated.Address.Builder(); + } + + /** + * Creates a new Address RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new Address RecordBuilder + */ + public static org.apache.flink.avro.generated.Address.Builder newBuilder(org.apache.flink.avro.generated.Address.Builder other) { + return new org.apache.flink.avro.generated.Address.Builder(other); + } + + /** + * Creates a new Address RecordBuilder by copying an existing Address instance. + * @param other The existing instance to copy. + * @return A new Address RecordBuilder + */ + public static org.apache.flink.avro.generated.Address.Builder newBuilder(org.apache.flink.avro.generated.Address other) { + return new org.apache.flink.avro.generated.Address.Builder(other); + } + + /** + * RecordBuilder for Address instances. + */ + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Address> + implements org.apache.avro.data.RecordBuilder<Address> { + + private int num; + private java.lang.String street; + private java.lang.String city; + private java.lang.String state; + private java.lang.String zip; + private java.lang.String appno; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(org.apache.flink.avro.generated.Address.Builder other) { + super(other); + if (isValidValue(fields()[0], other.num)) { + this.num = data().deepCopy(fields()[0].schema(), other.num); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.street)) { + this.street = data().deepCopy(fields()[1].schema(), other.street); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.city)) { + this.city = data().deepCopy(fields()[2].schema(), other.city); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.state)) { + this.state = data().deepCopy(fields()[3].schema(), other.state); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.zip)) { + this.zip = data().deepCopy(fields()[4].schema(), other.zip); + fieldSetFlags()[4] = true; + } + if (isValidValue(fields()[5], other.appno)) { + this.appno = data().deepCopy(fields()[5].schema(), other.appno); + fieldSetFlags()[5] = true; + } + } + + /** + * Creates a Builder by copying an existing Address instance + * @param other The existing instance to copy. + */ + private Builder(org.apache.flink.avro.generated.Address other) { + super(SCHEMA$); + if (isValidValue(fields()[0], other.num)) { + this.num = data().deepCopy(fields()[0].schema(), other.num); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.street)) { + this.street = data().deepCopy(fields()[1].schema(), other.street); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.city)) { + this.city = data().deepCopy(fields()[2].schema(), other.city); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.state)) { + this.state = data().deepCopy(fields()[3].schema(), other.state); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.zip)) { + this.zip = data().deepCopy(fields()[4].schema(), other.zip); + fieldSetFlags()[4] = true; + } + if (isValidValue(fields()[5], other.appno)) { + this.appno = data().deepCopy(fields()[5].schema(), other.appno); + fieldSetFlags()[5] = true; + } + } + + /** + * Gets the value of the 'num' field. + * @return The value. + */ + public java.lang.Integer getNum() { + return num; + } + + /** + * Sets the value of the 'num' field. + * @param value The value of 'num'. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder setNum(int value) { + validate(fields()[0], value); + this.num = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'num' field has been set. + * @return True if the 'num' field has been set, false otherwise. + */ + public boolean hasNum() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'num' field. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder clearNum() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'street' field. + * @return The value. + */ + public java.lang.String getStreet() { + return street; + } + + /** + * Sets the value of the 'street' field. + * @param value The value of 'street'. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder setStreet(java.lang.String value) { + validate(fields()[1], value); + this.street = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'street' field has been set. + * @return True if the 'street' field has been set, false otherwise. + */ + public boolean hasStreet() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'street' field. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder clearStreet() { + street = null; + fieldSetFlags()[1] = false; + return this; + } + + /** + * Gets the value of the 'city' field. + * @return The value. + */ + public java.lang.String getCity() { + return city; + } + + /** + * Sets the value of the 'city' field. + * @param value The value of 'city'. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder setCity(java.lang.String value) { + validate(fields()[2], value); + this.city = value; + fieldSetFlags()[2] = true; + return this; + } + + /** + * Checks whether the 'city' field has been set. + * @return True if the 'city' field has been set, false otherwise. + */ + public boolean hasCity() { + return fieldSetFlags()[2]; + } + + + /** + * Clears the value of the 'city' field. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder clearCity() { + city = null; + fieldSetFlags()[2] = false; + return this; + } + + /** + * Gets the value of the 'state' field. + * @return The value. + */ + public java.lang.String getState() { + return state; + } + + /** + * Sets the value of the 'state' field. + * @param value The value of 'state'. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder setState(java.lang.String value) { + validate(fields()[3], value); + this.state = value; + fieldSetFlags()[3] = true; + return this; + } + + /** + * Checks whether the 'state' field has been set. + * @return True if the 'state' field has been set, false otherwise. + */ + public boolean hasState() { + return fieldSetFlags()[3]; + } + + + /** + * Clears the value of the 'state' field. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder clearState() { + state = null; + fieldSetFlags()[3] = false; + return this; + } + + /** + * Gets the value of the 'zip' field. + * @return The value. + */ + public java.lang.String getZip() { + return zip; + } + + /** + * Sets the value of the 'zip' field. + * @param value The value of 'zip'. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder setZip(java.lang.String value) { + validate(fields()[4], value); + this.zip = value; + fieldSetFlags()[4] = true; + return this; + } + + /** + * Checks whether the 'zip' field has been set. + * @return True if the 'zip' field has been set, false otherwise. + */ + public boolean hasZip() { + return fieldSetFlags()[4]; + } + + + /** + * Clears the value of the 'zip' field. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder clearZip() { + zip = null; + fieldSetFlags()[4] = false; + return this; + } + + /** + * Gets the value of the 'appno' field. + * @return The value. + */ + public java.lang.String getAppno() { + return appno; + } + + /** + * Sets the value of the 'appno' field. + * @param value The value of 'appno'. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder setAppno(java.lang.String value) { + validate(fields()[5], value); + this.appno = value; + fieldSetFlags()[5] = true; + return this; + } + + /** + * Checks whether the 'appno' field has been set. + * @return True if the 'appno' field has been set, false otherwise. + */ + public boolean hasAppno() { + return fieldSetFlags()[5]; + } + + + /** + * Clears the value of the 'appno' field. + * @return This builder. + */ + public org.apache.flink.avro.generated.Address.Builder clearAppno() { + appno = null; + fieldSetFlags()[5] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public Address build() { + try { + Address record = new Address(); + record.num = fieldSetFlags()[0] ? this.num : (java.lang.Integer) defaultValue(fields()[0]); + record.street = fieldSetFlags()[1] ? this.street : (java.lang.String) defaultValue(fields()[1]); + record.city = fieldSetFlags()[2] ? this.city : (java.lang.String) defaultValue(fields()[2]); + record.state = fieldSetFlags()[3] ? this.state : (java.lang.String) defaultValue(fields()[3]); + record.zip = fieldSetFlags()[4] ? this.zip : (java.lang.String) defaultValue(fields()[4]); + record.appno = fieldSetFlags()[5] ? this.appno : (java.lang.String) defaultValue(fields()[5]); + return record; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter<Address> + WRITER$ = (org.apache.avro.io.DatumWriter<Address>)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader<Address> + READER$ = (org.apache.avro.io.DatumReader<Address>)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + +} diff --git a/flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/test/StatefulStreamingJob.java b/flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/test/StatefulStreamingJob.java new file mode 100644 index 00000000000..198b2c827c2 --- /dev/null +++ b/flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/test/StatefulStreamingJob.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.avro.generated.Address; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.types.Either; + +import java.io.IOException; +import java.util.Objects; +import java.util.function.Supplier; + +/** + * A simple stateful job that will be used to test avro state evolution and + * general state migration. + */ +public class StatefulStreamingJob { + + private static final String EXPECTED_DEFAULT_VALUE = "123"; + + /** + * Stub source that emits one record per second. + */ + public static class MySource extends RichParallelSourceFunction<Integer> { + + private static final long serialVersionUID = 1L; + + private volatile boolean cancel; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + while (!cancel) { + synchronized (ctx.getCheckpointLock()) { + ctx.collect(1); + } + Thread.sleep(100L); + } + } + + @Override + public void cancel() { + cancel = true; + } + } + + /** + * A stateful {@link RichMapFunction} that keeps the required types of state. + * That is: + * <ol> + * <li>an Avro,</li> + * <li>a Tuple2, and</li> + * <li>an Either type.</li> + * </ol> + */ + public static class MyStatefulFunction extends RichMapFunction<Integer, String> { + + private static final long serialVersionUID = 1L; + + private static final ValueStateDescriptor<Address> AVRO_DESCRIPTOR = + new ValueStateDescriptor<>("test-state", Address.class); + + private static final ValueStateDescriptor<Tuple2<String, Integer>> TUPLE_DESCRIPTOR = + new ValueStateDescriptor<>("tuple-state", + TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})); + + private static final ValueStateDescriptor<Either<String, Boolean>> EITHER_DESCRIPTOR = + new ValueStateDescriptor<>("either-state", + TypeInformation.of(new TypeHint<Either<String, Boolean>>() {})); + + private transient ValueState<Address> avroState; + private transient ValueState<Tuple2<String, Integer>> tupleState; + private transient ValueState<Either<String, Boolean>> eitherState; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.avroState = getRuntimeContext().getState(AVRO_DESCRIPTOR); + this.tupleState = getRuntimeContext().getState(TUPLE_DESCRIPTOR); + this.eitherState = getRuntimeContext().getState(EITHER_DESCRIPTOR); + } + + @Override + public String map(Integer value) throws Exception { + touchState(tupleState, () -> Tuple2.of("19", 19)); + touchState(eitherState, () -> Either.Left("255")); + + final Address newAddress = Address.newBuilder() + .setCity("New York") + .setZip("10036") + .setStreet("555 W 42nd St") + .setState("NY") + .setNum(555) + .build(); + + Address existingAddress = avroState.value(); + if (existingAddress != null) { + if (!Objects.equals(existingAddress.getAppno(), EXPECTED_DEFAULT_VALUE)) { + // this is expected to fail the job, if found in the output files. + System.out.println("Wrong Default Value."); + } + } + avroState.update(newAddress); + + return ""; + } + + private static <T> void touchState(ValueState<T> state, Supplier<T> elements) throws IOException { + T elem = state.value(); + if (elem == null) { + elem = elements.get(); + } + state.update(elem); + } + } + + public static void main(String[] args) throws Exception { + final ParameterTool pt = ParameterTool.fromArgs(args); + final String checkpointDir = pt.getRequired("checkpoint.dir"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(new FsStateBackend(checkpointDir)); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.enableCheckpointing(1000L); + env.getConfig().disableGenericTypes(); + + env.addSource(new MySource()).uid("my-source") + .keyBy(anInt -> 0) + .map(new MyStatefulFunction()).uid("my-map") + .addSink(new DiscardingSink<>()).uid("my-sink"); + env.execute(); + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 2e060da3a4d..6b31881f684 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -54,6 +54,7 @@ under the License. <module>flink-stream-state-ttl-test</module> <module>flink-sql-client-test</module> <module>flink-streaming-file-sink-test</module> + <module>flink-state-evolution-test</module> </modules> <build> diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh b/flink-end-to-end-tests/run-pre-commit-tests.sh index ce26348978e..b4e375d8456 100755 --- a/flink-end-to-end-tests/run-pre-commit-tests.sh +++ b/flink-end-to-end-tests/run-pre-commit-tests.sh @@ -42,6 +42,8 @@ echo "Flink distribution directory: $FLINK_DIR" # Template for adding a test: # run_test "<description>" "$END_TO_END_DIR/test-scripts/<script_name>" +run_test "State Migration end-to-end test from 1.6" "$END_TO_END_DIR/test-scripts/test_state_migration.sh" +run_test "State Evolution end-to-end test" "$END_TO_END_DIR/test-scripts/test_state_evolution.sh" run_test "Batch Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_python_wordcount.sh" run_test "Streaming Python Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_python_wordcount.sh" run_test "Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wordcount.sh" diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index 5eb2c93a0f6..e815a85afdd 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -341,6 +341,7 @@ function check_logs_for_exceptions { | grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException" \ | grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration" \ | grep -v "java.lang.Exception: Execution was suspended" \ + | grep -v "java.io.InvalidClassException: org.apache.flink.formats.avro.typeutils.AvroSerializer" \ | grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \ | grep -ic "exception") if [[ ${exception_count} -gt 0 ]]; then diff --git a/flink-end-to-end-tests/test-scripts/test_state_evolution.sh b/flink-end-to-end-tests/test-scripts/test_state_evolution.sh new file mode 100755 index 00000000000..75e6749162f --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_state_evolution.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +# For the case of evolution, this test will try to start from a savepoint taken +# with 1.7 (same version) but in which the schema of the (old) avro type differs +# from the current one. The new schema adds an optional field with a default value +# to the old one and verifies that it can see the old value. +# +# The old avro schema of the Address data type was: +# +# {"namespace": "org.apache.flink.formats.parquet.generated", +# "type": "record", +# "name": "Address", +# "fields": [ +# {"name": "num", "type": "int"}, +# {"name": "street", "type": "string"}, +# {"name": "city", "type": "string"}, +# {"name": "state", "type": "string"}, +# {"name": "zip", "type": "string"} +# ] +# } +# +# while the new is: +# +# {"namespace": "org.apache.flink.avro.generated", +# "type": "record", +# "name": "Address", +# "fields": [ +# {"name": "num", "type": "int"}, +# {"name": "street", "type": "string"}, +# {"name": "city", "type": "string"}, +# {"name": "state", "type": "string"}, +# {"name": "zip", "type": "string"}, +# {"name": "appno", "type": [ "string", "null" ], "default": "123"} +# ] +# } +function run_test { + PARALLELISM=$1 + TEST_JOB_JAR="${END_TO_END_DIR}/flink-state-evolution-test/target/StatefulStreamingJob.jar" + BASE_DIR="${TEST_DATA_DIR}/flink-state-evolution-test" + CHECKPOINT_DIR="file://${BASE_DIR}/checkpoints/" + SAVEPOINT_PARENT_DIR="file://${BASE_DIR}/savepoints/" + + SAVEPOINT_DIR="file:///${END_TO_END_DIR}/flink-state-evolution-test/savepoints/1.7/" + + set_conf "state.backend.fs.memory-threshold" 1048576 + start_cluster + + # restart the job from the savepoint from Flink 1.7 with the old Avro Schema + JOB_ID=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_DIR -p $PARALLELISM -d $TEST_JOB_JAR --checkpoint.dir $CHECKPOINT_DIR \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + + wait_job_running ${JOB_ID} + wait_num_checkpoints ${JOB_ID} 3 + + SECOND_SAVEPOINT_DIR=$($FLINK_DIR/bin/flink cancel -s $SAVEPOINT_PARENT_DIR $JOB_ID | grep "Savepoint stored in file" | sed 's/.* //g' | sed 's/\.$//') + echo "Took Savepoint @ ${SECOND_SAVEPOINT_DIR}" +} + +run_test 1 diff --git a/flink-end-to-end-tests/test-scripts/test_state_migration.sh b/flink-end-to-end-tests/test-scripts/test_state_migration.sh new file mode 100755 index 00000000000..a19c0dfb4d7 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_state_migration.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +function run_test { + PARALLELISM=$1 + TEST_JOB_JAR="${END_TO_END_DIR}/flink-state-evolution-test/target/StatefulStreamingJob.jar" + BASE_DIR="${TEST_DATA_DIR}/flink-state-evolution-test" + CHECKPOINT_DIR="file://${BASE_DIR}/checkpoints/" + SAVEPOINT_PARENT_DIR="file://${BASE_DIR}/savepoints/" + + SAVEPOINT_DIR="file:///${END_TO_END_DIR}/flink-state-evolution-test/savepoints/1.6/" + + set_conf "state.backend.fs.memory-threshold" 1048576 + start_cluster + + # restart the job from the savepoint from Flink 1.6 + JOB_ID=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_DIR -p $PARALLELISM -d $TEST_JOB_JAR --checkpoint.dir $CHECKPOINT_DIR \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + + wait_job_running ${JOB_ID} + wait_num_checkpoints ${JOB_ID} 3 + + SECOND_SAVEPOINT_DIR=$($FLINK_DIR/bin/flink cancel -s $SAVEPOINT_PARENT_DIR $JOB_ID | grep "Savepoint stored in file" | sed 's/.* //g' | sed 's/\.$//') + + # restart the job from the savepoint from Flink 1.7 + FINAL_JOB_ID=$($FLINK_DIR/bin/flink run -s $SECOND_SAVEPOINT_DIR -p $PARALLELISM -d $TEST_JOB_JAR --checkpoint.dir $CHECKPOINT_DIR \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + + wait_job_running ${FINAL_JOB_ID} + wait_num_checkpoints ${FINAL_JOB_ID} 3 + + cancel_job ${FINAL_JOB_ID} +} + +run_test 1 diff --git a/pom.xml b/pom.xml index 2a5dfced140..055e64896e5 100644 --- a/pom.xml +++ b/pom.xml @@ -1294,6 +1294,8 @@ under the License. <exclude>flink-core/src/test/resources/type-without-avro-serialized-using-kryo</exclude> <exclude>flink-formats/flink-avro/src/test/resources/flink-1.4-serializer-java-serialized</exclude> + <exclude>flink-end-to-end-tests/flink-state-evolution-test/src/main/java/org/apache/flink/avro/generated/*</exclude> + <exclude>flink-end-to-end-tests/flink-state-evolution-test/savepoints/*</exclude> <exclude>flink-formats/flink-avro/src/test/resources/testdata.avro</exclude> <exclude>flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/*.java</exclude> <exclude>flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/*.java</exclude> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services