Finally I found that in order to execute many insert statements in one job
I need to use STATEMENT SET. This solved the problem.

ср, 9 окт. 2024 г. в 12:17, Ilya Karpov <idkf...@gmail.com>:

> During this morning debug I've found that if I comment one of two insert
> expressions and submit sql, then only one job will be created in the flink
> cluster. If a corrupted message causes failure of this job then flink
> behaves correctly: checkpoint does not happen, offset is not committed!
> Modified sql is:
> CREATE TABLE IF NOT EXISTS events (
> message STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'events',
> 'properties.bootstrap.servers' = 'localhost:19092',
> 'properties.group.id' = 'flink-kafka-to-ch-connector',
> 'properties.auto.offset.reset' = 'latest', -- If my-group is used for the
> first time, the consumption starts from the latest offset
> 'format' = 'raw'
> );
>
> CREATE CATALOG clickhouse WITH (
> 'type' = 'clickhouse',
> 'url' = 'clickhouse://localhost:8123',
> 'username' = 'default',
> 'password' = 'secret',
> 'database-name' = 'default',
> 'use-local' = 'false',
> 'sink.max-retries' = '-1'
> );
>
> USE CATALOG clickhouse;
>
> -- write data into the clickhouse `mobile` table
> -- INSERT INTO mobile_hits_medium
> -- SELECT
> -- JSON_VALUE(message, '$.id' RETURNING STRING)
> -- , JSON_VALUE(message, '$.type' RETURNING STRING)
> -- , JSON_VALUE(message, '$.platform' RETURNING STRING)
> -- , JSON_VALUE(message, '$.cnt' RETURNING INTEGER)
> -- FROM default_catalog.default_database.events
> -- where JSON_VALUE(message, '$.platform') = 'mobile';
>
> -- write data into the clickhouse `web` table
> INSERT INTO web_hits
> SELECT
> JSON_VALUE(message, '$.id' RETURNING STRING) as id
> , JSON_VALUE(message, '$.type' RETURNING STRING) as type
> , JSON_VALUE(message, '$.platform' RETURNING STRING) as platform
> , JSON_VALUE(message, '$.payload.browser_name' RETURNING STRING) as
> payload_browser_name
> , JSON_VALUE(message, '$.payload.browser_version' RETURNING INTEGER) as
> payload_browser_version
> , JSON_VALUE(message, '$.payload.fp_score' RETURNING DOUBLE) as
> payload_fp_score
> , TO_TIMESTAMP(JSON_VALUE(message, '$.created_at' RETURNING STRING), 
> 'yyyy-MM-dd
> HH:mm:ss.SSSX') as created_at
> FROM default_catalog.default_database.events
> where JSON_VALUE(message, '$.platform') = 'web';
>
>
> My current guess is that because of* two inserts *(that uses the* same
> kafka table as a source data*) running simultaneously and* one of them is
> not failing *then this job successfully commits offset. This causes a
> buggy situation.
>
> ср, 9 окт. 2024 г. в 10:06, Ilya Karpov <idkf...@gmail.com>:
>
>> Hi,
>> I have a local flink-1.20.0 setup, where I test clickhouse connector
>> <https://github.com/itinycheng/flink-connector-clickhouse>. The problem
>> occurred in one of the test cases: when I push a corrupted message to kafka
>> (json field `browser_version` expected to be integer but actually is a
>> string) then the task fails with exception and restarts over and over
>> (which is ok and expected). But after (some time passes and) checkpoint
>> occurs then offset value is incremented and committed to kafka (this is *not
>> expected*!), then task restarts reads updated offset value and ready to
>> handle new data - actually it just skipped corrupted message! What I expect
>> is: checkpoint is stuck/skipped because one of tasks is restarting, offset
>> value is NOT committed to kafka, tasks is restarting infinitely until
>> manual increment of the offset or change in flinksql script.
>> Checkpointing is default (uses hash, one in 3min), number of slots = 10 -
>> nothing else is changed in default flink conf.
>>
>> Please help to figure out where the problem is: in my expectations, in
>> configuration or its a bug.
>>
>> Full logs and flink conf attached.
>>
>> Details:
>> Exception:
>> 2024-10-09 08:37:58,657 WARN org.apache.flink.runtime.taskmanager.Task
>> [] - Source: events[4] -> Calc[5] -> Sink: web_hits[6] (1/1)#12 
>> (7f77085ad8b8d671723bf6bf5a8f6493_cbc357ccb763df2852fee8c4fc7d55f2_0_12)
>> switched from RUNNING to FAILED with failure cause: java.io.IOException:
>> Failed to deserialize consumer record due to
>> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
>> .emitRecord(KafkaRecordEmitter.java:56)
>> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
>> .emitRecord(KafkaRecordEmitter.java:33)
>> at org.apache.flink.connector.base.source.reader.SourceReaderBase
>> .pollNext(SourceReaderBase.java:143)
>> at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
>> SourceOperator.java:385)
>> at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(
>> StreamTaskSourceInput.java:68)
>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>> .processInput(StreamOneInputProcessor.java:65)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>> StreamTask.java:542)
>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>> .runMailboxLoop(MailboxProcessor.java:231)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
>> StreamTask.java:831)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> .java:780)
>> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
>> Task.java:935)
>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
>> 914)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>> at java.lang.Thread.run(Thread.java:750)
>> Caused by: java.io.IOException: Failed to deserialize consumer record
>> ConsumerRecord(topic = events, partition = 0, leaderEpoch = 2, offset =
>> 90, CreateTime = 1728452261616, serialized key size = -1, serialized
>> value size = 201, headers = RecordHeaders(headers = [], isReadOnly =
>> false), key = null, value = [B@574ffd26).
>> at org.apache.flink.connector.kafka.source.reader.deserializer.
>> KafkaDeserializationSchemaWrapper.deserialize(
>> KafkaDeserializationSchemaWrapper.java:57)
>> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
>> .emitRecord(KafkaRecordEmitter.java:53)
>> ... 14 more
>> Caused by: org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput
>> .pushToOperator(CopyingChainingOutput.java:96)
>> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect
>> (CopyingChainingOutput.java:57)
>> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect
>> (CopyingChainingOutput.java:29)
>> at org.apache.flink.streaming.runtime.tasks.
>> SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(
>> SourceOperatorStreamTask.java:313)
>> at org.apache.flink.streaming.api.operators.source.
>> SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
>> at org.apache.flink.connector.kafka.source.reader.
>> KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67
>> )
>> at org.apache.flink.api.common.serialization.DeserializationSchema
>> .deserialize(DeserializationSchema.java:84)
>> at org.apache.flink.streaming.connectors.kafka.table.
>> DynamicKafkaDeserializationSchema.deserialize(
>> DynamicKafkaDeserializationSchema.java:113)
>> at org.apache.flink.connector.kafka.source.reader.deserializer.
>> KafkaDeserializationSchemaWrapper.deserialize(
>> KafkaDeserializationSchemaWrapper.java:54)
>> ... 15 more
>> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
>> to java.lang.Integer
>> at StreamExecCalc$71.processElement_trueFilter13_split15(Unknown Source)
>> at StreamExecCalc$71.processElement_trueFilter13(Unknown Source)
>> at StreamExecCalc$71.processElement(Unknown Source)
>> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput
>> .pushToOperator(CopyingChainingOutput.java:82)
>> ... 23 more
>>
>> flinksql script:
>> CREATE TABLE IF NOT EXISTS events (
>> message STRING
>> ) WITH (
>> 'connector' = 'kafka',
>> 'topic' = 'events',
>> 'properties.bootstrap.servers' = 'localhost:19092',
>> 'properties.group.id' = 'flink-kafka-to-ch-connector',
>> 'properties.auto.offset.reset' = 'latest', -- If my-group is used for
>> the first time, the consumption starts from the latest offset
>> 'format' = 'raw'
>> );
>>
>> CREATE CATALOG clickhouse WITH (
>> 'type' = 'clickhouse',
>> 'url' = 'clickhouse://localhost:8123',
>> 'username' = 'default',
>> 'password' = 'secret',
>> 'database-name' = 'default',
>> 'use-local' = 'false',
>> 'sink.max-retries' = '-1'
>> );
>>
>> INSERT INTO web_hits
>> SELECT
>> JSON_VALUE(message, '$.id' RETURNING STRING) as id
>> , JSON_VALUE(message, '$.type' RETURNING STRING) as type
>> , JSON_VALUE(message, '$.platform' RETURNING STRING) as platform
>> , JSON_VALUE(message, '$.payload.browser_name' RETURNING STRING) as
>> payload_browser_name
>> , JSON_VALUE(message, '$.payload.browser_version' RETURNING INTEGER) as
>> payload_browser_version -- this column is failed to deser due to corrupted
>> msg
>> , JSON_VALUE(message, '$.payload.fp_score' RETURNING DOUBLE) as
>> payload_fp_score
>> , TO_TIMESTAMP(JSON_VALUE(message, '$.created_at' RETURNING STRING), 
>> 'yyyy-MM-dd
>> HH:mm:ss.SSSX') as created_at
>> FROM default_catalog.default_database.events
>> where JSON_VALUE(message, '$.platform') = 'web';
>>
>> Corrupted message:
>> {"id":"c7ba28aa-73f2-4e29-bfd5-5a9b1fbe20c0","type":"view","platform":"web","payload":{"browser_name":"safari_1","browser_version":"abracadabra","fp_score":0.92},"created_at":"2024-09-30
>> 11:07:10.332"}
>>
>>
>>
>>

Reply via email to