您好!
我在使用flink时遇到一些问题。
flink-1.14.4
sqlserver-cdc-2.2.1
yarn-per-job
我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点;
sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据;
检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据;
以上,可能是什么问题,应该如何解决呢?
期待回复!
best wishes!
附日志:
2022-06-24 14:55:45,950 ERROR com.ververica.cdc.debezium.internal.Handover
[] - Reporting error:
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
An exception occurred in the change event producer. This connector will be
stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_301]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_301]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_301]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数
cdc.fn_cdc_get_all_changes_ ... 提供的参数数目不足。
at
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at
com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at
com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at
com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at
io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608)
~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at
io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
... 7 more
2022-06-24 14:55:45,953 INFO io.debezium.embedded.EmbeddedEngine
[] - Stopping the embedded engine
2022-06-24 14:55:45,954 INFO
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
[] - Source: TableSourceScan(table=[[default_catalog, default_database,
carflow]], fields=[id, plate_license, site_id, create_time, flow_type,
circle_id]) -> Calc(select=[id, plate_license, site_id, create_time,
(create_time + -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) ->
WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 discarding 0
drained requests
2022-06-24 14:55:45,955 INFO io.debezium.embedded.EmbeddedEngine
[] - Stopping the embedded engine
2022-06-24 14:55:45,957 WARN org.apache.flink.runtime.taskmanager.Task
[] - Source: TableSourceScan(table=[[default_catalog,
default_database, carflow]], fields=[id, plate_license, site_id, create_time,
flow_type, circle_id]) -> Calc(select=[id, plate_license, site_id, create_time,
(create_time + -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) ->
WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0
(71206ba8149ac20bb39d8169ff3d2f02) switched from RUNNING to FAILED with failure
cause:
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
An exception occurred in the change event producer. This connector will be
stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
at
io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
at
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数
cdc.fn_cdc_get_all_changes_ ... 提供的参数数目不足。
at
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
at
com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
at
com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
at
com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
at
io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
at
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608)
at
io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
at
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
... 7 more
2022-06-24 14:55:45,957 INFO org.apache.flink.runtime.taskmanager.Task
[] - Freeing task resources for Source:
TableSourceScan(table=[[default_catalog, default_database, carflow]],
fields=[id, plate_license, site_id, create_time, flow_type, circle_id]) ->
Calc(select=[id, plate_license, site_id, create_time, (create_time +
-28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) ->
WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0
(71206ba8149ac20bb39d8169ff3d2f02).
2022-06-24 15:03:57,819 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor [] -
Un-registering task and sending final execution state FINISHED to JobManager
for task Source: TableSourceScan(table=[[default_catalog, default_database,
sitecar]], fields=[car_number, site_id, first_oil_time, is_tq_car,
relation_id]) -> WatermarkAssigner(rowtime=[first_oil_time],
watermark=[first_oil_time]) (1/1)#1 9ebcb0fc15ced6db3f2a579510e415ee.
2022-06-24 15:06:35,005 INFO io.debezium.connector.common.BaseSourceTask
[] - 23591 records sent during previous 00:05:19.499, last recorded
offset: {transaction_id=null, event_serial_no=1,
commit_lsn=0000162c:00016205:00d0, change_lsn=0000162c:00016205:00c7}
[email protected]