Hi, I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis. The job can start but will fail within 1 hour. Detailed error log is attached.
When I changed the version of the flink-connector-kinesis to `1.15.2` , everything settled. Any idea to fix it ?
create table kafka_event_v1 ( `timestamp` bigint, serverTimestamp bigint, name string, data string, app string, `user` string, context string, browser row< url string, referrer string, userAgent string, `language` string, title string, viewportWidth int, viewportHeight int, contentWidth int, contentHeight int, cookies map<string, string>, name string, version string, device row< model string, type string, vendor string >, engine row< name string, version string >, os row< name string, version string > >, abtests map<string, string>, apikey string, lifecycleId string, sessionId string, instanceId string, requestId string, eventId string, `trigger` string, virtualId string, accountId string, ip string, serverTimestampLtz as to_timestamp(from_unixtime(serverTimestamp / 1000)), watermark for serverTimestampLtz as serverTimestampLtz - interval '5' second ) with ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'shared.kafka.smartnews.internal:9093', 'topic' = 'shared-cluster-sn-pixel-event-v1-dev', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json', 'json.ignore-parse-errors' = 'true', 'properties.group.id' = 'event_v2', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'SNTOKEN', 'properties.sasl.login.class' = 'com.smartnews.dp.kafka.security.sn.auth.SnTokenLogin', 'properties.sasl.login.callback.handler.class' = 'com.smartnews.dp.kafka.security.sn.auth.SnTokenCallbackHandler', 'properties.sasl.client.callback.handler.class' = 'com.smartnews.dp.kafka.security.sn.sasl.SnTokenSaslClientCallbackHandler', 'properties.sasl.jaas.config' = 'com.smartnews.dp.kafka.security.sn.auth.SnTokenLoginModule required username="sn-pixel" password="aQXJcNUsCuIZpICHO9bQ" env="prd";', 'properties.ssl.truststore.type' = 'PEM' ); create catalog iceberg_dev with ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://dev-hive-metastore.smartnews.internal:9083', 'warehouse'='s3a://smartnews-dmp/warehouse/development' ); insert into iceberg_dev.pixel.event_v2 /*+ options( 'partition.time-extractor.timestamp-pattern'='$dt 00:00:00', 'sink.partition-commit.policy.kind'='metastore,success-file', 'auto-compaction'='true' ) */ select `timestamp`, serverTimestamp, data, app, `user`, context, browser, abtests, lifecycleId, sessionId, instanceId, requestId, eventId, `trigger`, virtualId, accountId, ip, date_format(serverTimestampLtz, 'yyyy-MM-dd') dt, apikey, name from default_catalog.default_database.kafka_event_v1 ;