During this morning debug I've found that if I comment one of two insert
expressions and submit sql, then only one job will be created in the flink
cluster. If a corrupted message causes failure of this job then flink
behaves correctly: checkpoint does not happen, offset is not committed!
Modified sql is:
CREATE TABLE IF NOT EXISTS events (
message STRING
) WITH (
'connector' = 'kafka',
'topic' = 'events',
'properties.bootstrap.servers' = 'localhost:19092',
'properties.group.id' = 'flink-kafka-to-ch-connector',
'properties.auto.offset.reset' = 'latest', -- If my-group is used for the
first time, the consumption starts from the latest offset
'format' = 'raw'
);

CREATE CATALOG clickhouse WITH (
'type' = 'clickhouse',
'url' = 'clickhouse://localhost:8123',
'username' = 'default',
'password' = 'secret',
'database-name' = 'default',
'use-local' = 'false',
'sink.max-retries' = '-1'
);

USE CATALOG clickhouse;

-- write data into the clickhouse `mobile` table
-- INSERT INTO mobile_hits_medium
-- SELECT
-- JSON_VALUE(message, '$.id' RETURNING STRING)
-- , JSON_VALUE(message, '$.type' RETURNING STRING)
-- , JSON_VALUE(message, '$.platform' RETURNING STRING)
-- , JSON_VALUE(message, '$.cnt' RETURNING INTEGER)
-- FROM default_catalog.default_database.events
-- where JSON_VALUE(message, '$.platform') = 'mobile';

-- write data into the clickhouse `web` table
INSERT INTO web_hits
SELECT
JSON_VALUE(message, '$.id' RETURNING STRING) as id
, JSON_VALUE(message, '$.type' RETURNING STRING) as type
, JSON_VALUE(message, '$.platform' RETURNING STRING) as platform
, JSON_VALUE(message, '$.payload.browser_name' RETURNING STRING) as
payload_browser_name
, JSON_VALUE(message, '$.payload.browser_version' RETURNING INTEGER) as
payload_browser_version
, JSON_VALUE(message, '$.payload.fp_score' RETURNING DOUBLE) as
payload_fp_score
, TO_TIMESTAMP(JSON_VALUE(message, '$.created_at' RETURNING STRING),
'yyyy-MM-dd
HH:mm:ss.SSSX') as created_at
FROM default_catalog.default_database.events
where JSON_VALUE(message, '$.platform') = 'web';


My current guess is that because of* two inserts *(that uses the* same
kafka table as a source data*) running simultaneously and* one of them is
not failing *then this job successfully commits offset. This causes a buggy
situation.

ср, 9 окт. 2024 г. в 10:06, Ilya Karpov <idkf...@gmail.com>:

> Hi,
> I have a local flink-1.20.0 setup, where I test clickhouse connector
> <https://github.com/itinycheng/flink-connector-clickhouse>. The problem
> occurred in one of the test cases: when I push a corrupted message to kafka
> (json field `browser_version` expected to be integer but actually is a
> string) then the task fails with exception and restarts over and over
> (which is ok and expected). But after (some time passes and) checkpoint
> occurs then offset value is incremented and committed to kafka (this is *not
> expected*!), then task restarts reads updated offset value and ready to
> handle new data - actually it just skipped corrupted message! What I expect
> is: checkpoint is stuck/skipped because one of tasks is restarting, offset
> value is NOT committed to kafka, tasks is restarting infinitely until
> manual increment of the offset or change in flinksql script.
> Checkpointing is default (uses hash, one in 3min), number of slots = 10 -
> nothing else is changed in default flink conf.
>
> Please help to figure out where the problem is: in my expectations, in
> configuration or its a bug.
>
> Full logs and flink conf attached.
>
> Details:
> Exception:
> 2024-10-09 08:37:58,657 WARN org.apache.flink.runtime.taskmanager.Task []
> - Source: events[4] -> Calc[5] -> Sink: web_hits[6] (1/1)#12 
> (7f77085ad8b8d671723bf6bf5a8f6493_cbc357ccb763df2852fee8c4fc7d55f2_0_12)
> switched from RUNNING to FAILED with failure cause: java.io.IOException:
> Failed to deserialize consumer record due to
> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
> .emitRecord(KafkaRecordEmitter.java:56)
> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
> .emitRecord(KafkaRecordEmitter.java:33)
> at org.apache.flink.connector.base.source.reader.SourceReaderBase
> .pollNext(SourceReaderBase.java:143)
> at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
> SourceOperator.java:385)
> at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(
> StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:542)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:231)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:831)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
> .java:780)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:935)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.lang.Thread.run(Thread.java:750)
> Caused by: java.io.IOException: Failed to deserialize consumer record
> ConsumerRecord(topic = events, partition = 0, leaderEpoch = 2, offset = 90,
> CreateTime = 1728452261616, serialized key size = -1, serialized value
> size = 201, headers = RecordHeaders(headers = [], isReadOnly = false),
> key = null, value = [B@574ffd26).
> at org.apache.flink.connector.kafka.source.reader.deserializer.
> KafkaDeserializationSchemaWrapper.deserialize(
> KafkaDeserializationSchemaWrapper.java:57)
> at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter
> .emitRecord(KafkaRecordEmitter.java:53)
> ... 14 more
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput
> .pushToOperator(CopyingChainingOutput.java:96)
> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(
> CopyingChainingOutput.java:57)
> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(
> CopyingChainingOutput.java:29)
> at org.apache.flink.streaming.runtime.tasks.
> SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(
> SourceOperatorStreamTask.java:313)
> at org.apache.flink.streaming.api.operators.source.
> SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
> at org.apache.flink.connector.kafka.source.reader.
> KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67)
> at org.apache.flink.api.common.serialization.DeserializationSchema
> .deserialize(DeserializationSchema.java:84)
> at org.apache.flink.streaming.connectors.kafka.table.
> DynamicKafkaDeserializationSchema.deserialize(
> DynamicKafkaDeserializationSchema.java:113)
> at org.apache.flink.connector.kafka.source.reader.deserializer.
> KafkaDeserializationSchemaWrapper.deserialize(
> KafkaDeserializationSchemaWrapper.java:54)
> ... 15 more
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
> to java.lang.Integer
> at StreamExecCalc$71.processElement_trueFilter13_split15(Unknown Source)
> at StreamExecCalc$71.processElement_trueFilter13(Unknown Source)
> at StreamExecCalc$71.processElement(Unknown Source)
> at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput
> .pushToOperator(CopyingChainingOutput.java:82)
> ... 23 more
>
> flinksql script:
> CREATE TABLE IF NOT EXISTS events (
> message STRING
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'events',
> 'properties.bootstrap.servers' = 'localhost:19092',
> 'properties.group.id' = 'flink-kafka-to-ch-connector',
> 'properties.auto.offset.reset' = 'latest', -- If my-group is used for the
> first time, the consumption starts from the latest offset
> 'format' = 'raw'
> );
>
> CREATE CATALOG clickhouse WITH (
> 'type' = 'clickhouse',
> 'url' = 'clickhouse://localhost:8123',
> 'username' = 'default',
> 'password' = 'secret',
> 'database-name' = 'default',
> 'use-local' = 'false',
> 'sink.max-retries' = '-1'
> );
>
> INSERT INTO web_hits
> SELECT
> JSON_VALUE(message, '$.id' RETURNING STRING) as id
> , JSON_VALUE(message, '$.type' RETURNING STRING) as type
> , JSON_VALUE(message, '$.platform' RETURNING STRING) as platform
> , JSON_VALUE(message, '$.payload.browser_name' RETURNING STRING) as
> payload_browser_name
> , JSON_VALUE(message, '$.payload.browser_version' RETURNING INTEGER) as
> payload_browser_version -- this column is failed to deser due to corrupted
> msg
> , JSON_VALUE(message, '$.payload.fp_score' RETURNING DOUBLE) as
> payload_fp_score
> , TO_TIMESTAMP(JSON_VALUE(message, '$.created_at' RETURNING STRING), 
> 'yyyy-MM-dd
> HH:mm:ss.SSSX') as created_at
> FROM default_catalog.default_database.events
> where JSON_VALUE(message, '$.platform') = 'web';
>
> Corrupted message:
> {"id":"c7ba28aa-73f2-4e29-bfd5-5a9b1fbe20c0","type":"view","platform":"web","payload":{"browser_name":"safari_1","browser_version":"abracadabra","fp_score":0.92},"created_at":"2024-09-30
> 11:07:10.332"}
>
>
>
>

Reply via email to