During this morning debug I've found that if I comment one of two insert expressions and submit sql, then only one job will be created in the flink cluster. If a corrupted message causes failure of this job then flink behaves correctly: checkpoint does not happen, offset is not committed! Modified sql is: CREATE TABLE IF NOT EXISTS events ( message STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'events', 'properties.bootstrap.servers' = 'localhost:19092', 'properties.group.id' = 'flink-kafka-to-ch-connector', 'properties.auto.offset.reset' = 'latest', -- If my-group is used for the first time, the consumption starts from the latest offset 'format' = 'raw' );
CREATE CATALOG clickhouse WITH ( 'type' = 'clickhouse', 'url' = 'clickhouse://localhost:8123', 'username' = 'default', 'password' = 'secret', 'database-name' = 'default', 'use-local' = 'false', 'sink.max-retries' = '-1' ); USE CATALOG clickhouse; -- write data into the clickhouse `mobile` table -- INSERT INTO mobile_hits_medium -- SELECT -- JSON_VALUE(message, '$.id' RETURNING STRING) -- , JSON_VALUE(message, '$.type' RETURNING STRING) -- , JSON_VALUE(message, '$.platform' RETURNING STRING) -- , JSON_VALUE(message, '$.cnt' RETURNING INTEGER) -- FROM default_catalog.default_database.events -- where JSON_VALUE(message, '$.platform') = 'mobile'; -- write data into the clickhouse `web` table INSERT INTO web_hits SELECT JSON_VALUE(message, '$.id' RETURNING STRING) as id , JSON_VALUE(message, '$.type' RETURNING STRING) as type , JSON_VALUE(message, '$.platform' RETURNING STRING) as platform , JSON_VALUE(message, '$.payload.browser_name' RETURNING STRING) as payload_browser_name , JSON_VALUE(message, '$.payload.browser_version' RETURNING INTEGER) as payload_browser_version , JSON_VALUE(message, '$.payload.fp_score' RETURNING DOUBLE) as payload_fp_score , TO_TIMESTAMP(JSON_VALUE(message, '$.created_at' RETURNING STRING), 'yyyy-MM-dd HH:mm:ss.SSSX') as created_at FROM default_catalog.default_database.events where JSON_VALUE(message, '$.platform') = 'web'; My current guess is that because of* two inserts *(that uses the* same kafka table as a source data*) running simultaneously and* one of them is not failing *then this job successfully commits offset. This causes a buggy situation. ср, 9 окт. 2024 г. в 10:06, Ilya Karpov <idkf...@gmail.com>: > Hi, > I have a local flink-1.20.0 setup, where I test clickhouse connector > <https://github.com/itinycheng/flink-connector-clickhouse>. The problem > occurred in one of the test cases: when I push a corrupted message to kafka > (json field `browser_version` expected to be integer but actually is a > string) then the task fails with exception and restarts over and over > (which is ok and expected). But after (some time passes and) checkpoint > occurs then offset value is incremented and committed to kafka (this is *not > expected*!), then task restarts reads updated offset value and ready to > handle new data - actually it just skipped corrupted message! What I expect > is: checkpoint is stuck/skipped because one of tasks is restarting, offset > value is NOT committed to kafka, tasks is restarting infinitely until > manual increment of the offset or change in flinksql script. > Checkpointing is default (uses hash, one in 3min), number of slots = 10 - > nothing else is changed in default flink conf. > > Please help to figure out where the problem is: in my expectations, in > configuration or its a bug. > > Full logs and flink conf attached. > > Details: > Exception: > 2024-10-09 08:37:58,657 WARN org.apache.flink.runtime.taskmanager.Task [] > - Source: events[4] -> Calc[5] -> Sink: web_hits[6] (1/1)#12 > (7f77085ad8b8d671723bf6bf5a8f6493_cbc357ccb763df2852fee8c4fc7d55f2_0_12) > switched from RUNNING to FAILED with failure cause: java.io.IOException: > Failed to deserialize consumer record due to > at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter > .emitRecord(KafkaRecordEmitter.java:56) > at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter > .emitRecord(KafkaRecordEmitter.java:33) > at org.apache.flink.connector.base.source.reader.SourceReaderBase > .pollNext(SourceReaderBase.java:143) > at org.apache.flink.streaming.api.operators.SourceOperator.emitNext( > SourceOperator.java:385) > at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext( > StreamTaskSourceInput.java:68) > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:65) > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:542) > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:231) > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:831) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask > .java:780) > at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( > Task.java:935) > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: > 914) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.io.IOException: Failed to deserialize consumer record > ConsumerRecord(topic = events, partition = 0, leaderEpoch = 2, offset = 90, > CreateTime = 1728452261616, serialized key size = -1, serialized value > size = 201, headers = RecordHeaders(headers = [], isReadOnly = false), > key = null, value = [B@574ffd26). > at org.apache.flink.connector.kafka.source.reader.deserializer. > KafkaDeserializationSchemaWrapper.deserialize( > KafkaDeserializationSchemaWrapper.java:57) > at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter > .emitRecord(KafkaRecordEmitter.java:53) > ... 14 more > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput > .pushToOperator(CopyingChainingOutput.java:96) > at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect( > CopyingChainingOutput.java:57) > at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect( > CopyingChainingOutput.java:29) > at org.apache.flink.streaming.runtime.tasks. > SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord( > SourceOperatorStreamTask.java:313) > at org.apache.flink.streaming.api.operators.source. > SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) > at org.apache.flink.connector.kafka.source.reader. > KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) > at org.apache.flink.api.common.serialization.DeserializationSchema > .deserialize(DeserializationSchema.java:84) > at org.apache.flink.streaming.connectors.kafka.table. > DynamicKafkaDeserializationSchema.deserialize( > DynamicKafkaDeserializationSchema.java:113) > at org.apache.flink.connector.kafka.source.reader.deserializer. > KafkaDeserializationSchemaWrapper.deserialize( > KafkaDeserializationSchemaWrapper.java:54) > ... 15 more > Caused by: java.lang.ClassCastException: java.lang.String cannot be cast > to java.lang.Integer > at StreamExecCalc$71.processElement_trueFilter13_split15(Unknown Source) > at StreamExecCalc$71.processElement_trueFilter13(Unknown Source) > at StreamExecCalc$71.processElement(Unknown Source) > at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput > .pushToOperator(CopyingChainingOutput.java:82) > ... 23 more > > flinksql script: > CREATE TABLE IF NOT EXISTS events ( > message STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'events', > 'properties.bootstrap.servers' = 'localhost:19092', > 'properties.group.id' = 'flink-kafka-to-ch-connector', > 'properties.auto.offset.reset' = 'latest', -- If my-group is used for the > first time, the consumption starts from the latest offset > 'format' = 'raw' > ); > > CREATE CATALOG clickhouse WITH ( > 'type' = 'clickhouse', > 'url' = 'clickhouse://localhost:8123', > 'username' = 'default', > 'password' = 'secret', > 'database-name' = 'default', > 'use-local' = 'false', > 'sink.max-retries' = '-1' > ); > > INSERT INTO web_hits > SELECT > JSON_VALUE(message, '$.id' RETURNING STRING) as id > , JSON_VALUE(message, '$.type' RETURNING STRING) as type > , JSON_VALUE(message, '$.platform' RETURNING STRING) as platform > , JSON_VALUE(message, '$.payload.browser_name' RETURNING STRING) as > payload_browser_name > , JSON_VALUE(message, '$.payload.browser_version' RETURNING INTEGER) as > payload_browser_version -- this column is failed to deser due to corrupted > msg > , JSON_VALUE(message, '$.payload.fp_score' RETURNING DOUBLE) as > payload_fp_score > , TO_TIMESTAMP(JSON_VALUE(message, '$.created_at' RETURNING STRING), > 'yyyy-MM-dd > HH:mm:ss.SSSX') as created_at > FROM default_catalog.default_database.events > where JSON_VALUE(message, '$.platform') = 'web'; > > Corrupted message: > {"id":"c7ba28aa-73f2-4e29-bfd5-5a9b1fbe20c0","type":"view","platform":"web","payload":{"browser_name":"safari_1","browser_version":"abracadabra","fp_score":0.92},"created_at":"2024-09-30 > 11:07:10.332"} > > > >