Phillippko commented on code in PR #6643: URL: https://github.com/apache/ignite-3/pull/6643#discussion_r2381221857
########## modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/PersistentCompatibilityTest.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.MediaType; +import io.micronaut.http.MutableHttpRequest; +import io.micronaut.http.client.HttpClient; +import io.micronaut.http.client.annotation.Client; +import io.micronaut.http.client.multipart.MultipartBody; +import io.micronaut.http.client.multipart.MultipartBody.Builder; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.compute.JobDescriptor; +import org.apache.ignite.compute.JobTarget; +import org.apache.ignite.deployment.DeploymentUnit; +import org.apache.ignite.internal.lang.IgniteSystemProperties; +import org.apache.ignite.internal.testframework.WithSystemProperty; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +/** + * Legend: v1 is original version, v2 is current version (version from main branch). + * <p> + * It is supposed that data is written in v1, then the storage is flushed and the node is stopped, its binary is replaced with v2, the node + * is started. + * <p> + * The tests have to ensure that aipersist-based tables work correctly: + * <ul> + * <li> All data written in v1 can be read in v2 # including overwritten versions. </li> + * <li> Deleted data cannot be read # updates that are rolled back cannot be read. </li> + * </ul> + * <p> + * Variations: + * <ul> + * <li> When there were no unmerged delta files in v1 (that is, after the writes are finished on + * v1, we wait for all delta files to be merged and removed). </li> + * <li>When there were some unmerged delta-files. </li> + * <li> When unmerged delta-files remain from v1 and we start creating new delta files immediately after starting on v2, and THEN we + * start verifications listed above (while the delta files are still there). </li> + * </ul> + */ +@ParameterizedClass +@MethodSource("baseVersions") +@MicronautTest(rebuildContext = true) +@WithSystemProperty(key = IgniteSystemProperties.THREAD_ASSERTIONS_ENABLED, value = "false") +public class PersistentCompatibilityTest extends CompatibilityTestBase { + private static final String NODE_URL = "http://localhost:" + ClusterConfiguration.DEFAULT_BASE_HTTP_PORT; + + /** Delta files are not compacted before updating the cluster. */ + private static final String TABLE_WITH_DELTA_FILES = "TEST_WITH_DELTA_FILES"; + + /** Delta files are compacted before updating the cluster. */ + private static final String TABLE_WITHOUT_DELTA_FILES = "TEST_WITHOUT_DELTA_FILES"; + + /** Delta files are not compacted before updating the cluster, and new ones created after. */ + private static final String TABLE_WITH_NEW_DELTA_FILES = "TEST_WITH_NEW_DELTA_FILES"; + + @Inject + @Client(NODE_URL + "/management/v1/deployment") + private HttpClient deploymentClient; + + @Override + protected int nodesCount() { + return 1; + } + + @Override + protected void setupBaseVersion(Ignite baseIgnite) { + try { + deployCheckpointJob(); + + createAndPopulateTable(baseIgnite, TABLE_WITHOUT_DELTA_FILES); + createAndPopulateTable(baseIgnite, TABLE_WITH_DELTA_FILES); + createAndPopulateTable(baseIgnite, TABLE_WITH_NEW_DELTA_FILES); + + // Newly allocated pages are written straight to the page files, we need to avoid this. + doCheckpoint(false); + + prepareTable(baseIgnite, TABLE_WITHOUT_DELTA_FILES); + + // Checkpoint for TABLE_WITHOUT_DELTA_FILES and compact all delta files. + doCheckpoint(false); + + prepareTable(baseIgnite, TABLE_WITH_DELTA_FILES); + prepareTable(baseIgnite, TABLE_WITH_NEW_DELTA_FILES); + + // Checkpoint for TABLE_WITH_DELTA_FILES and TABLE_WITH_NEW_DELTA_FILES, cancels compaction to leave delta files. + doCheckpoint(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void createAndPopulateTable(Ignite baseIgnite, String tableName) { + sql(baseIgnite, "CREATE TABLE " + tableName + " (id INT PRIMARY KEY, name VARCHAR)"); + sql(baseIgnite, "INSERT INTO " + tableName + " (id, name) VALUES (1, 'test'), (2, 'deleted_row'), (3, 'original_value')"); + } + + private static void prepareTable(Ignite baseIgnite, String tableName) { + sql(baseIgnite, "DELETE FROM " + tableName + " WHERE id = 2"); + + Transaction tx = baseIgnite.transactions().begin(); + baseIgnite.sql().execute(tx, "UPDATE " + tableName + " SET name = 'rolled_back_value' WHERE id = 3").close(); + tx.rollback(); + } + + @ParameterizedTest + @ValueSource(strings = {TABLE_WITH_DELTA_FILES, TABLE_WITHOUT_DELTA_FILES}) + void testNewVersion(String tableName) throws IOException { + checkRows(tableName); + } + + @Test + void testNewVersionWithNewDeltaFiles() throws IOException { + sql("INSERT INTO " + TABLE_WITH_NEW_DELTA_FILES + " (id, name) VALUES (4, 'new_row')"); + sql("UPDATE " + TABLE_WITH_NEW_DELTA_FILES + " SET name = 'updated_value' WHERE id = 1"); + + doCheckpoint(true); + + List<List<Object>> rows = sql("select * from " + TABLE_WITH_NEW_DELTA_FILES + " order by id"); + + assertThat(rows.size(), is(3)); + assertThat(rows.get(0).get(1), is("updated_value")); + assertThat(rows.get(1).get(1), is("original_value")); + assertThat(rows.get(2).get(1), is("new_row")); + } + + private void doCheckpoint(boolean cancelCompaction) throws IOException { + try (IgniteClient client = cluster.createClient()) { + JobDescriptor<Boolean, Void> job = JobDescriptor.builder(CheckpointJob.class) + .units(new DeploymentUnit(CheckpointJob.class.getName(), "1.0.0")).build(); + + JobTarget jobTarget = JobTarget.anyNode(client.cluster().nodes()); + + client.compute().execute(jobTarget, job, cancelCompaction); + } + } + + private <T, R> void deployCheckpointJob() throws IOException { + Path jarFile = createJar(CheckpointJob.class); + + HttpResponse<Object> deploy = deploy(CheckpointJob.class.getName(), "1.0.0", jarFile.toFile()); + assertThat(deploy.status(), is(HttpStatus.OK)); + } + + private Path createJar(Class<?> clazz) throws IOException { Review Comment: As you like my approach, let's leave it like that! Will change if needed ########## modules/compatibility-tests/src/integrationTest/java/org/apache/ignite/internal/PersistentCompatibilityTest.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.HttpStatus; +import io.micronaut.http.MediaType; +import io.micronaut.http.MutableHttpRequest; +import io.micronaut.http.client.HttpClient; +import io.micronaut.http.client.annotation.Client; +import io.micronaut.http.client.multipart.MultipartBody; +import io.micronaut.http.client.multipart.MultipartBody.Builder; +import io.micronaut.test.extensions.junit5.annotation.MicronautTest; +import jakarta.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import org.apache.ignite.Ignite; +import org.apache.ignite.client.IgniteClient; +import org.apache.ignite.compute.JobDescriptor; +import org.apache.ignite.compute.JobTarget; +import org.apache.ignite.deployment.DeploymentUnit; +import org.apache.ignite.internal.lang.IgniteSystemProperties; +import org.apache.ignite.internal.testframework.WithSystemProperty; +import org.apache.ignite.tx.Transaction; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +/** + * Legend: v1 is original version, v2 is current version (version from main branch). + * <p> + * It is supposed that data is written in v1, then the storage is flushed and the node is stopped, its binary is replaced with v2, the node + * is started. + * <p> + * The tests have to ensure that aipersist-based tables work correctly: + * <ul> + * <li> All data written in v1 can be read in v2 # including overwritten versions. </li> + * <li> Deleted data cannot be read # updates that are rolled back cannot be read. </li> + * </ul> + * <p> + * Variations: + * <ul> + * <li> When there were no unmerged delta files in v1 (that is, after the writes are finished on + * v1, we wait for all delta files to be merged and removed). </li> + * <li>When there were some unmerged delta-files. </li> + * <li> When unmerged delta-files remain from v1 and we start creating new delta files immediately after starting on v2, and THEN we + * start verifications listed above (while the delta files are still there). </li> + * </ul> + */ +@ParameterizedClass +@MethodSource("baseVersions") +@MicronautTest(rebuildContext = true) +@WithSystemProperty(key = IgniteSystemProperties.THREAD_ASSERTIONS_ENABLED, value = "false") +public class PersistentCompatibilityTest extends CompatibilityTestBase { + private static final String NODE_URL = "http://localhost:" + ClusterConfiguration.DEFAULT_BASE_HTTP_PORT; + + /** Delta files are not compacted before updating the cluster. */ + private static final String TABLE_WITH_DELTA_FILES = "TEST_WITH_DELTA_FILES"; + + /** Delta files are compacted before updating the cluster. */ + private static final String TABLE_WITHOUT_DELTA_FILES = "TEST_WITHOUT_DELTA_FILES"; + + /** Delta files are not compacted before updating the cluster, and new ones created after. */ + private static final String TABLE_WITH_NEW_DELTA_FILES = "TEST_WITH_NEW_DELTA_FILES"; + + @Inject + @Client(NODE_URL + "/management/v1/deployment") + private HttpClient deploymentClient; + + @Override + protected int nodesCount() { + return 1; + } + + @Override + protected void setupBaseVersion(Ignite baseIgnite) { + try { + deployCheckpointJob(); + + createAndPopulateTable(baseIgnite, TABLE_WITHOUT_DELTA_FILES); + createAndPopulateTable(baseIgnite, TABLE_WITH_DELTA_FILES); + createAndPopulateTable(baseIgnite, TABLE_WITH_NEW_DELTA_FILES); + + // Newly allocated pages are written straight to the page files, we need to avoid this. + doCheckpoint(false); + + prepareTable(baseIgnite, TABLE_WITHOUT_DELTA_FILES); + + // Checkpoint for TABLE_WITHOUT_DELTA_FILES and compact all delta files. + doCheckpoint(false); + + prepareTable(baseIgnite, TABLE_WITH_DELTA_FILES); + prepareTable(baseIgnite, TABLE_WITH_NEW_DELTA_FILES); + + // Checkpoint for TABLE_WITH_DELTA_FILES and TABLE_WITH_NEW_DELTA_FILES, cancels compaction to leave delta files. + doCheckpoint(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static void createAndPopulateTable(Ignite baseIgnite, String tableName) { + sql(baseIgnite, "CREATE TABLE " + tableName + " (id INT PRIMARY KEY, name VARCHAR)"); + sql(baseIgnite, "INSERT INTO " + tableName + " (id, name) VALUES (1, 'test'), (2, 'deleted_row'), (3, 'original_value')"); + } + + private static void prepareTable(Ignite baseIgnite, String tableName) { + sql(baseIgnite, "DELETE FROM " + tableName + " WHERE id = 2"); + + Transaction tx = baseIgnite.transactions().begin(); + baseIgnite.sql().execute(tx, "UPDATE " + tableName + " SET name = 'rolled_back_value' WHERE id = 3").close(); + tx.rollback(); + } + + @ParameterizedTest + @ValueSource(strings = {TABLE_WITH_DELTA_FILES, TABLE_WITHOUT_DELTA_FILES}) + void testNewVersion(String tableName) throws IOException { + checkRows(tableName); + } + + @Test + void testNewVersionWithNewDeltaFiles() throws IOException { + sql("INSERT INTO " + TABLE_WITH_NEW_DELTA_FILES + " (id, name) VALUES (4, 'new_row')"); + sql("UPDATE " + TABLE_WITH_NEW_DELTA_FILES + " SET name = 'updated_value' WHERE id = 1"); + + doCheckpoint(true); + + List<List<Object>> rows = sql("select * from " + TABLE_WITH_NEW_DELTA_FILES + " order by id"); + + assertThat(rows.size(), is(3)); + assertThat(rows.get(0).get(1), is("updated_value")); + assertThat(rows.get(1).get(1), is("original_value")); + assertThat(rows.get(2).get(1), is("new_row")); + } + + private void doCheckpoint(boolean cancelCompaction) throws IOException { + try (IgniteClient client = cluster.createClient()) { + JobDescriptor<Boolean, Void> job = JobDescriptor.builder(CheckpointJob.class) + .units(new DeploymentUnit(CheckpointJob.class.getName(), "1.0.0")).build(); + + JobTarget jobTarget = JobTarget.anyNode(client.cluster().nodes()); + + client.compute().execute(jobTarget, job, cancelCompaction); + } + } + + private <T, R> void deployCheckpointJob() throws IOException { + Path jarFile = createJar(CheckpointJob.class); + + HttpResponse<Object> deploy = deploy(CheckpointJob.class.getName(), "1.0.0", jarFile.toFile()); + assertThat(deploy.status(), is(HttpStatus.OK)); + } + + private Path createJar(Class<?> clazz) throws IOException { Review Comment: As you like my approach, let's leave it like that! Will change if needed in future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
