Hello kafka community, The users_table below does not have any records (its topic has no messages starting offset 0), although source topic input_topic has messages:
CREATE STREAM USERS_STREAM( accountid STRING, migrationDate BIGINT, metadata STRING) WITH (KAFKA_TOPIC='input_topic', VALUE_FORMAT='AVRO'); CREATE TABLE USERS_TABLE AS SELECT accountid, min(migrationDate) AS firstMigrationDate, min(CAST(extractjsonfield(metadata, 'createdAt') AS BIGINT)) AS firstCreatedAt FROM USERS_STREAM GROUP BY accountid emit changes; My intention is to have a continuously aggregating query from users_stream into users_table, that only keeps the oldest values for each key, and emits a record or message every time a new key arrives and every time an older value for a key arrives (which should almost never happen); How would you do such a query? Thanks a lot, -- Thank you, Nicolae Marasoiu Scala Engineer Orion, OVO Group