Hi,

I used the flink-connector-kinesis (4.0.2-1.18) to consume from Kinesis.
The job can start but will fail within 1 hour. Detailed error log
is attached.

When I changed the version of the flink-connector-kinesis to `1.15.2` ,
everything settled.

Any idea to fix it ?
create table kafka_event_v1 (
  `timestamp` bigint,
  serverTimestamp bigint,

  name string,
  data string,
  app string,
  `user` string,
  context string,
  browser row<
    url string,
    referrer string,
    userAgent string,
    `language` string,
    title string,
    viewportWidth int,
    viewportHeight int,
    contentWidth int,
    contentHeight int,
    cookies map<string, string>,
    name string,
    version string,
    device row<
      model string,
      type string,
      vendor string
    >,
    engine row<
      name string,
      version string
    >,
    os row<
      name string,
      version string
    >
  >,
  abtests map<string, string>,
  apikey string,
  lifecycleId string,
  sessionId string,
  instanceId string,
  requestId string,
  eventId string,
  `trigger` string,

  virtualId string,
  accountId string,
  ip string,

  serverTimestampLtz as to_timestamp(from_unixtime(serverTimestamp / 1000)),
  watermark for serverTimestampLtz as serverTimestampLtz - interval '5' second
) with (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'shared.kafka.smartnews.internal:9093',
  'topic' = 'shared-cluster-sn-pixel-event-v1-dev',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true',

  'properties.group.id' = 'event_v2',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'SNTOKEN',
  'properties.sasl.login.class' = 
'com.smartnews.dp.kafka.security.sn.auth.SnTokenLogin',
  'properties.sasl.login.callback.handler.class' = 
'com.smartnews.dp.kafka.security.sn.auth.SnTokenCallbackHandler',
  'properties.sasl.client.callback.handler.class' = 
'com.smartnews.dp.kafka.security.sn.sasl.SnTokenSaslClientCallbackHandler',
  'properties.sasl.jaas.config' = 
'com.smartnews.dp.kafka.security.sn.auth.SnTokenLoginModule required 
username="sn-pixel" password="aQXJcNUsCuIZpICHO9bQ" env="prd";',
  'properties.ssl.truststore.type' = 'PEM'
);

create catalog iceberg_dev with (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://dev-hive-metastore.smartnews.internal:9083',
  'warehouse'='s3a://smartnews-dmp/warehouse/development'
);

insert into iceberg_dev.pixel.event_v2 /*+ options(
  'partition.time-extractor.timestamp-pattern'='$dt 00:00:00',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'auto-compaction'='true'
) */
select
  `timestamp`,
  serverTimestamp,
  data,
  app,
  `user`,
  context,
  browser,
  abtests,
  lifecycleId,
  sessionId,
  instanceId,
  requestId,
  eventId,
  `trigger`,

  virtualId,
  accountId,
  ip,

  date_format(serverTimestampLtz, 'yyyy-MM-dd') dt,
  apikey,
  name
from
  default_catalog.default_database.kafka_event_v1
;



Reply via email to