[ https://issues.apache.org/jira/browse/FLINK-27652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jingsong Lee reassigned FLINK-27652: ------------------------------------ Assignee: Jane Chan > CompactManager.Rewriter cannot handle different partition keys invoked > compaction > --------------------------------------------------------------------------------- > > Key: FLINK-27652 > URL: https://issues.apache.org/jira/browse/FLINK-27652 > Project: Flink > Issue Type: Bug > Components: Table Store > Affects Versions: table-store-0.2.0 > Reporter: Jane Chan > Assignee: Jane Chan > Priority: Major > Fix For: table-store-0.2.0 > > > h3. Issue Description > When enabling {{commit.force-compact}} for the partitioned managed table, > there had a chance that the successive synchronized > writes got failure. The current impl of {{CompactManager.Rewriter}} is an > anonymous class in {{FileStoreWriteImpl}}. However, the {{partition}} and > {{bucket}} are referenced as local variables, and the dataFileReader is > initiliazed when rewrite is called; and this may lead to the {{rewrite}} > method messing up with the wrong data file with the {{partition}} and > {{bucket}}. > h3. Root Cause > {code:java} > Caused by: java.io.IOException: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: java.io.FileNotFoundException: File > file:/var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gn/T/junit5920507275110651781/junit4163667468681653619/default_catalog.catalog/default_database.db/T1/f1=Autumn/bucket-0/data-59826283-c5d1-4344-96ae-2203d4e60a57-0 > does not exist or the user running Flink ('jane.cjm') has insufficient > permissions to access it. at > org.apache.flink.table.store.connector.sink.StoreSinkWriter.prepareCommit(StoreSinkWriter.java:172) > {code} > However, data-59826283-c5d1-4344-96ae-2203d4e60a57-0 does not belong to > partition Autumn. It seems like the rewriter found the wrong partition/bucket > with the wrong file. > h3. How to Reproduce > {code:java} > /* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > package org.apache.flink.table.store.connector; > import org.junit.Test; > import java.util.Collections; > import java.util.List; > import java.util.concurrent.ExecutionException; > /** A reproducible case. */ > public class ForceCompactionITCase extends FileStoreTableITCase { > @Override > protected List<String> ddl() { > return Collections.singletonList( > "CREATE TABLE IF NOT EXISTS T1 (" > + "f0 INT, f1 STRING, f2 STRING) PARTITIONED BY > (f1)"); > } > @Test > public void test() throws ExecutionException, InterruptedException { > bEnv.executeSql("ALTER TABLE T1 SET ('num-levels' = '3')"); > bEnv.executeSql("ALTER TABLE T1 SET ('commit.force-compact' = > 'true')"); > bEnv.executeSql( > "INSERT INTO T1 VALUES(1, 'Winter', 'Winter is > Coming')" > + ",(2, 'Winter', 'The First Snowflake'), " > + "(2, 'Spring', 'The First Rose in Spring'), > " > + "(7, 'Summer', 'Summertime Sadness')") > .await(); > bEnv.executeSql("INSERT INTO T1 VALUES(12, 'Winter', 'Last > Christmas')").await(); > bEnv.executeSql("INSERT INTO T1 VALUES(11, 'Winter', 'Winter is > Coming')").await(); > bEnv.executeSql("INSERT INTO T1 VALUES(10, 'Autumn', > 'Refrain')").await(); > bEnv.executeSql( > "INSERT INTO T1 VALUES(6, 'Summer', 'Watermelon > Sugar'), " > + "(4, 'Spring', 'Spring Water')") > .await(); > bEnv.executeSql( > "INSERT INTO T1 VALUES(66, 'Summer', 'Summer Vibe')," > + " (9, 'Autumn', 'Wake Me Up When September > Ends')") > .await(); > bEnv.executeSql( > "INSERT INTO T1 VALUES(666, 'Summer', 'Summer Vibe')," > + " (9, 'Autumn', 'Wake Me Up When September > Ends')") > .await(); > bEnv.executeSql( > "INSERT INTO T1 VALUES(6666, 'Summer', 'Summer > Vibe')," > + " (9, 'Autumn', 'Wake Me Up When September > Ends')") > .await(); > bEnv.executeSql( > "INSERT INTO T1 VALUES(66666, 'Summer', 'Summer > Vibe')," > + " (9, 'Autumn', 'Wake Me Up When September > Ends')") > .await(); > bEnv.executeSql( > "INSERT INTO T1 VALUES(666666, 'Summer', 'Summer > Vibe')," > + " (9, 'Autumn', 'Wake Me Up When September > Ends')") > .await(); > bEnv.executeSql( > "INSERT INTO T1 VALUES(6666666, 'Summer', 'Summer > Vibe')," > + " (9, 'Autumn', 'Wake Me Up When September > Ends')") > .await(); > } > } > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)