Hi,
I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis.
The job can start but will fail within 1 hour. Detailed error log
is attached.
When I changed the version of the flink-connector-kinesis to `1.15.2` ,
everything settled.
Any idea to fix it ?
create table kafka_event_v1 (
`timestamp` bigint,
serverTimestamp bigint,
name string,
data string,
app string,
`user` string,
context string,
browser row<
url string,
referrer string,
userAgent string,
`language` string,
title string,
viewportWidth int,
viewportHeight int,
contentWidth int,
contentHeight int,
cookies map<string, string>,
name string,
version string,
device row<
model string,
type string,
vendor string
>,
engine row<
name string,
version string
>,
os row<
name string,
version string
>
>,
abtests map<string, string>,
apikey string,
lifecycleId string,
sessionId string,
instanceId string,
requestId string,
eventId string,
`trigger` string,
virtualId string,
accountId string,
ip string,
serverTimestampLtz as to_timestamp(from_unixtime(serverTimestamp / 1000)),
watermark for serverTimestampLtz as serverTimestampLtz - interval '5' second
) with (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'shared.kafka.smartnews.internal:9093',
'topic' = 'shared-cluster-sn-pixel-event-v1-dev',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true',
'properties.group.id' = 'event_v2',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'SNTOKEN',
'properties.sasl.login.class' =
'com.smartnews.dp.kafka.security.sn.auth.SnTokenLogin',
'properties.sasl.login.callback.handler.class' =
'com.smartnews.dp.kafka.security.sn.auth.SnTokenCallbackHandler',
'properties.sasl.client.callback.handler.class' =
'com.smartnews.dp.kafka.security.sn.sasl.SnTokenSaslClientCallbackHandler',
'properties.sasl.jaas.config' =
'com.smartnews.dp.kafka.security.sn.auth.SnTokenLoginModule required
username="sn-pixel" password="aQXJcNUsCuIZpICHO9bQ" env="prd";',
'properties.ssl.truststore.type' = 'PEM'
);
create catalog iceberg_dev with (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://dev-hive-metastore.smartnews.internal:9083',
'warehouse'='s3a://smartnews-dmp/warehouse/development'
);
insert into iceberg_dev.pixel.event_v2 /*+ options(
'partition.time-extractor.timestamp-pattern'='$dt 00:00:00',
'sink.partition-commit.policy.kind'='metastore,success-file',
'auto-compaction'='true'
) */
select
`timestamp`,
serverTimestamp,
data,
app,
`user`,
context,
browser,
abtests,
lifecycleId,
sessionId,
instanceId,
requestId,
eventId,
`trigger`,
virtualId,
accountId,
ip,
date_format(serverTimestampLtz, 'yyyy-MM-dd') dt,
apikey,
name
from
default_catalog.default_database.kafka_event_v1
;