[ 
https://issues.apache.org/jira/browse/FLINK-37587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu updated FLINK-37587:
-------------------------------
    Fix Version/s: cdc-3.5.0
                       (was: cdc-3.4.0)

> Fail to add a new column that has the same column name with source table 
> using transform
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-37587
>                 URL: https://issues.apache.org/jira/browse/FLINK-37587
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.3.0
>            Reporter: Yanquan Lv
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: cdc-3.5.0
>
>
> YAML configuration:
> {code:java}
> transform:
>   - source-table: gc_fps_receivable_[0-9]+_[0-9]+.account_flow_[0-9]+
>     projection: \*, __schema_name__ || '.' || __table_name__ as 
> identifier,'0' as instance,DATE_FORMAT(transaction_time, 'yyyy-MM') 
> part_month,op_ts,    __data_event_type__ AS op_type {code}
> However, one source table has the same column name op_type with tiny_int 
> type, and there is a exception raised:
> org.apache.flink.cdc.runtime.operators.transform.exceptions.TransformException:
>  Failed to post-transform with
>     DataChangeEvent\{xxx}
> for table
>     gc_fps_receivable.invoice_manage_log
> from schema
>     columns=\{`id` INT NOT NULL,`ref_id` BIGINT,`log_type` TINYINT,`operator` 
> VARCHAR(255),`op_type` TINYINT,`create_time` TIMESTAMP(0)}, primaryKeys=id, 
> options=()
> to schema
>     columns=\{`id` INT NOT NULL,`ref_id` BIGINT,`log_type` TINYINT,`operator` 
> VARCHAR(255),`op_type` TINYINT NOT NULL,`create_time` TIMESTAMP(0),`op_ts` 
> BIGINT NOT NULL}, primaryKeys=id, options=().
>     at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:266)
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:245)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:217)
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:169)
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:617)
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1072)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1021)
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
>     at java.lang.Thread.run(Thread.java:879)
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to compile 
> expression TransformExpressionKey\{expression='import static 
> org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;$0', 
> argumentNames=[$0, __time_zone__, __epoch_time__], argumentClasses=[class 
> java.lang.String, class java.lang.String, class java.lang.Long], 
> returnClass=class java.lang.Byte, columnNameMap={__data_event_type__=$0}}
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.compileExpression(TransformExpressionCompiler.java:88)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.ProjectionColumnProcessor.<init>(ProjectionColumnProcessor.java:71)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.ProjectionColumnProcessor.of(ProjectionColumnProcessor.java:83)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformProjectionProcessor.lambda$cacheProjectionColumnProcessors$1(TransformProjectionProcessor.java:227)
>     at java.util.Optional.map(Optional.java:215)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformProjectionProcessor.cacheProjectionColumnProcessors(TransformProjectionProcessor.java:225)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformProjectionProcessor.<init>(TransformProjectionProcessor.java:76)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformProjectionProcessor.of(TransformProjectionProcessor.java:95)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processDataChangeEvent(PostTransformOperator.java:482)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processEvent(PostTransformOperator.java:283)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:251)
>     ... 13 more
> Caused by: 
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>  org.apache.flink.api.common.InvalidProgramException: Expression cannot be 
> compiled. This is a bug. Please file an issue.
>     Expression: import static 
> org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;$0
>     Column name map: \{$0 -> __data_event_type__}
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.compileExpression(TransformExpressionCompiler.java:53)
>     ... 23 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Expression 
> cannot be compiled. This is a bug. Please file an issue.
>     Expression: import static 
> org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;$0
>     Column name map: \{$0 -> __data_event_type__}
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:78)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
>     at 
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
>     ... 26 more
> Caused by: 
> org.apache.flink.cdc.shaded.org.codehaus.commons.compiler.CompileException: 
> Line 1, Column 76: Assignment conversion not possible from type 
> "java.lang.String" to type "java.lang.Byte"
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13080)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.assignmentConversion(UnitCompiler.java:11297)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2850)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.access$2800(UnitCompiler.java:237)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler$6.visitReturnStatement(UnitCompiler.java:1591)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler$6.visitReturnStatement(UnitCompiler.java:1576)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.Java$ReturnStatement.accept(Java.java:3888)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1576)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1662)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3665)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3330)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1448)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1421)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:830)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:443)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:237)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler$3.visitPackageMemberClassDeclaration(UnitCompiler.java:423)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler$3.visitPackageMemberClassDeclaration(UnitCompiler.java:419)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1688)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:419)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.access$000(UnitCompiler.java:237)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:364)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:362)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.Java$CompilationUnit.accept(Java.java:371)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:362)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:273)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:303)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:870)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:787)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:771)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.ExpressionEvaluator.cook2(ExpressionEvaluator.java:497)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.ExpressionEvaluator.cook(ExpressionEvaluator.java:452)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.ExpressionEvaluator.cook(ExpressionEvaluator.java:443)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.ExpressionEvaluator.cook(ExpressionEvaluator.java:409)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.janino.ExpressionEvaluator.cook(ExpressionEvaluator.java:394)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.commons.compiler.Cookable.cook(Cookable.java:82)
>     at 
> org.apache.flink.cdc.shaded.org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77)
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:75)
>     ... 31 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to