Finally I found that in order to execute many insert statements in one job I need to use STATEMENT SET. This solved the problem.
ср, 9 окт. 2024 г. в 12:17, Ilya Karpov <idkf...@gmail.com>: > During this morning debug I've found that if I comment one of two insert > expressions and submit sql, then only one job will be created in the flink > cluster. If a corrupted message causes failure of this job then flink > behaves correctly: checkpoint does not happen, offset is not committed! > Modified sql is: > CREATE TABLE IF NOT EXISTS events ( > message STRING > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'events', > 'properties.bootstrap.servers' = 'localhost:19092', > 'properties.group.id' = 'flink-kafka-to-ch-connector', > 'properties.auto.offset.reset' = 'latest', -- If my-group is used for the > first time, the consumption starts from the latest offset > 'format' = 'raw' > ); > > CREATE CATALOG clickhouse WITH ( > 'type' = 'clickhouse', > 'url' = 'clickhouse://localhost:8123', > 'username' = 'default', > 'password' = 'secret', > 'database-name' = 'default', > 'use-local' = 'false', > 'sink.max-retries' = '-1' > ); > > USE CATALOG clickhouse; > > -- write data into the clickhouse `mobile` table > -- INSERT INTO mobile_hits_medium > -- SELECT > -- JSON_VALUE(message, '$.id' RETURNING STRING) > -- , JSON_VALUE(message, '$.type' RETURNING STRING) > -- , JSON_VALUE(message, '$.platform' RETURNING STRING) > -- , JSON_VALUE(message, '$.cnt' RETURNING INTEGER) > -- FROM default_catalog.default_database.events > -- where JSON_VALUE(message, '$.platform') = 'mobile'; > > -- write data into the clickhouse `web` table > INSERT INTO web_hits > SELECT > JSON_VALUE(message, '$.id' RETURNING STRING) as id > , JSON_VALUE(message, '$.type' RETURNING STRING) as type > , JSON_VALUE(message, '$.platform' RETURNING STRING) as platform > , JSON_VALUE(message, '$.payload.browser_name' RETURNING STRING) as > payload_browser_name > , JSON_VALUE(message, '$.payload.browser_version' RETURNING INTEGER) as > payload_browser_version > , JSON_VALUE(message, '$.payload.fp_score' RETURNING DOUBLE) as > payload_fp_score > , TO_TIMESTAMP(JSON_VALUE(message, '$.created_at' RETURNING STRING), > 'yyyy-MM-dd > HH:mm:ss.SSSX') as created_at > FROM default_catalog.default_database.events > where JSON_VALUE(message, '$.platform') = 'web'; > > > My current guess is that because of* two inserts *(that uses the* same > kafka table as a source data*) running simultaneously and* one of them is > not failing *then this job successfully commits offset. This causes a > buggy situation. > > ср, 9 окт. 2024 г. в 10:06, Ilya Karpov <idkf...@gmail.com>: > >> Hi, >> I have a local flink-1.20.0 setup, where I test clickhouse connector >> <https://github.com/itinycheng/flink-connector-clickhouse>. The problem >> occurred in one of the test cases: when I push a corrupted message to kafka >> (json field `browser_version` expected to be integer but actually is a >> string) then the task fails with exception and restarts over and over >> (which is ok and expected). But after (some time passes and) checkpoint >> occurs then offset value is incremented and committed to kafka (this is *not >> expected*!), then task restarts reads updated offset value and ready to >> handle new data - actually it just skipped corrupted message! What I expect >> is: checkpoint is stuck/skipped because one of tasks is restarting, offset >> value is NOT committed to kafka, tasks is restarting infinitely until >> manual increment of the offset or change in flinksql script. >> Checkpointing is default (uses hash, one in 3min), number of slots = 10 - >> nothing else is changed in default flink conf. >> >> Please help to figure out where the problem is: in my expectations, in >> configuration or its a bug. >> >> Full logs and flink conf attached. >> >> Details: >> Exception: >> 2024-10-09 08:37:58,657 WARN org.apache.flink.runtime.taskmanager.Task >> [] - Source: events[4] -> Calc[5] -> Sink: web_hits[6] (1/1)#12 >> (7f77085ad8b8d671723bf6bf5a8f6493_cbc357ccb763df2852fee8c4fc7d55f2_0_12) >> switched from RUNNING to FAILED with failure cause: java.io.IOException: >> Failed to deserialize consumer record due to >> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter >> .emitRecord(KafkaRecordEmitter.java:56) >> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter >> .emitRecord(KafkaRecordEmitter.java:33) >> at org.apache.flink.connector.base.source.reader.SourceReaderBase >> .pollNext(SourceReaderBase.java:143) >> at org.apache.flink.streaming.api.operators.SourceOperator.emitNext( >> SourceOperator.java:385) >> at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext( >> StreamTaskSourceInput.java:68) >> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >> .processInput(StreamOneInputProcessor.java:65) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >> StreamTask.java:542) >> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >> .runMailboxLoop(MailboxProcessor.java:231) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( >> StreamTask.java:831) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >> .java:780) >> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( >> Task.java:935) >> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: >> 914) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) >> at java.lang.Thread.run(Thread.java:750) >> Caused by: java.io.IOException: Failed to deserialize consumer record >> ConsumerRecord(topic = events, partition = 0, leaderEpoch = 2, offset = >> 90, CreateTime = 1728452261616, serialized key size = -1, serialized >> value size = 201, headers = RecordHeaders(headers = [], isReadOnly = >> false), key = null, value = [B@574ffd26). >> at org.apache.flink.connector.kafka.source.reader.deserializer. >> KafkaDeserializationSchemaWrapper.deserialize( >> KafkaDeserializationSchemaWrapper.java:57) >> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter >> .emitRecord(KafkaRecordEmitter.java:53) >> ... 14 more >> Caused by: org.apache.flink.streaming.runtime.tasks. >> ExceptionInChainedOperatorException: Could not forward element to next >> operator >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput >> .pushToOperator(CopyingChainingOutput.java:96) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect >> (CopyingChainingOutput.java:57) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect >> (CopyingChainingOutput.java:29) >> at org.apache.flink.streaming.runtime.tasks. >> SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord( >> SourceOperatorStreamTask.java:313) >> at org.apache.flink.streaming.api.operators.source. >> SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) >> at org.apache.flink.connector.kafka.source.reader. >> KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67 >> ) >> at org.apache.flink.api.common.serialization.DeserializationSchema >> .deserialize(DeserializationSchema.java:84) >> at org.apache.flink.streaming.connectors.kafka.table. >> DynamicKafkaDeserializationSchema.deserialize( >> DynamicKafkaDeserializationSchema.java:113) >> at org.apache.flink.connector.kafka.source.reader.deserializer. >> KafkaDeserializationSchemaWrapper.deserialize( >> KafkaDeserializationSchemaWrapper.java:54) >> ... 15 more >> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast >> to java.lang.Integer >> at StreamExecCalc$71.processElement_trueFilter13_split15(Unknown Source) >> at StreamExecCalc$71.processElement_trueFilter13(Unknown Source) >> at StreamExecCalc$71.processElement(Unknown Source) >> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput >> .pushToOperator(CopyingChainingOutput.java:82) >> ... 23 more >> >> flinksql script: >> CREATE TABLE IF NOT EXISTS events ( >> message STRING >> ) WITH ( >> 'connector' = 'kafka', >> 'topic' = 'events', >> 'properties.bootstrap.servers' = 'localhost:19092', >> 'properties.group.id' = 'flink-kafka-to-ch-connector', >> 'properties.auto.offset.reset' = 'latest', -- If my-group is used for >> the first time, the consumption starts from the latest offset >> 'format' = 'raw' >> ); >> >> CREATE CATALOG clickhouse WITH ( >> 'type' = 'clickhouse', >> 'url' = 'clickhouse://localhost:8123', >> 'username' = 'default', >> 'password' = 'secret', >> 'database-name' = 'default', >> 'use-local' = 'false', >> 'sink.max-retries' = '-1' >> ); >> >> INSERT INTO web_hits >> SELECT >> JSON_VALUE(message, '$.id' RETURNING STRING) as id >> , JSON_VALUE(message, '$.type' RETURNING STRING) as type >> , JSON_VALUE(message, '$.platform' RETURNING STRING) as platform >> , JSON_VALUE(message, '$.payload.browser_name' RETURNING STRING) as >> payload_browser_name >> , JSON_VALUE(message, '$.payload.browser_version' RETURNING INTEGER) as >> payload_browser_version -- this column is failed to deser due to corrupted >> msg >> , JSON_VALUE(message, '$.payload.fp_score' RETURNING DOUBLE) as >> payload_fp_score >> , TO_TIMESTAMP(JSON_VALUE(message, '$.created_at' RETURNING STRING), >> 'yyyy-MM-dd >> HH:mm:ss.SSSX') as created_at >> FROM default_catalog.default_database.events >> where JSON_VALUE(message, '$.platform') = 'web'; >> >> Corrupted message: >> {"id":"c7ba28aa-73f2-4e29-bfd5-5a9b1fbe20c0","type":"view","platform":"web","payload":{"browser_name":"safari_1","browser_version":"abracadabra","fp_score":0.92},"created_at":"2024-09-30 >> 11:07:10.332"} >> >> >> >>