Hello kafka community,
The users_table below does not have any records (its topic has no messages
starting offset 0), although source topic input_topic has messages:

CREATE STREAM USERS_STREAM(
    accountid STRING,
    migrationDate BIGINT,
    metadata STRING)
WITH (KAFKA_TOPIC='input_topic', VALUE_FORMAT='AVRO');

CREATE TABLE USERS_TABLE
AS SELECT
    accountid,
    min(migrationDate) AS firstMigrationDate,
    min(CAST(extractjsonfield(metadata, 'createdAt') AS BIGINT)) AS
firstCreatedAt
FROM USERS_STREAM
GROUP BY accountid
emit changes;

My intention is to have a continuously aggregating query from users_stream
into users_table, that only keeps the oldest values for each key, and emits
a record or message every time a new key arrives and every time an older
value for a key arrives (which should almost never happen);

How would you do such a query?
Thanks a lot,
-- 
Thank you,
Nicolae Marasoiu
Scala Engineer
Orion, OVO Group

Reply via email to