Hi Dev, I hope this email finds you well. I am using Flink 1.20.1 with this environment:
> java 11.0.26 2025-01-21 LTS > Java(TM) SE Runtime Environment 18.9 (build 11.0.26+7-LTS-187) > Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.26+7-LTS-187, mixed > mode) > OS: Windows 11 > IDE: VSCode I send the data from the producer Python and Flink consumer uses this logic: *Read Kafka Stock topic with Datastream API:* This is how I read the kafka topics: > KafkaSource<Stock> kafkaSource = createKafkaSource(env, inputProperties, > new JsonDeserializationSchema<>(Stock.class)); > DataStream<Stock> stockNoWatermark = env.fromSource(kafkaSource, > WatermarkStrategy.noWatermarks(), "Kafka source"); *After that, I convert the data to Table SQL because I prefer to manipulate the data using SQL:* > > Table stockTableWatermark = tableEnv.fromDataStream( > stockNoWatermark, > Schema.newBuilder() > .column("event_type", DataTypes.STRING()) > .column("exchange", DataTypes.INT()) > .column("id", DataTypes.BIGINT()) > .column("price", DataTypes.FLOAT()) > .column("sequence_number", DataTypes.BIGINT()) > .column("size", DataTypes.INT()) > .column("symbol", DataTypes.STRING()) > .column("tape", DataTypes.STRING()) > .column("timestamp", DataTypes.TIMESTAMP_LTZ(3)) > .column("trf_id", DataTypes.STRING()) > .column("trf_timestamp", DataTypes.STRING()) > .column("actual_timestamp", DataTypes.STRING()) > .watermark("timestamp", "`timestamp` - INTERVAL '1' SECOND") > .build() *The result from stockTableWatermark like this:* > 5> +I[T, 2015, 4991, 158.85, 282034, 95, GOOG, null, > +57465-02-11T05:36:48Z, null, null, 2025-06-30 00:10:53.808] > 5> +I[T, 4231, 4642, 181.31, 751310, 35, NVDA, null, > +57465-02-11T05:36:51Z, null, null, 2025-06-30 00:10:53.811] > 5> +I[T, 2692, 2536, 236.31, 435106, 50, AAPL, null, > +57465-02-11T05:36:58Z, null, null, 2025-06-30 00:10:53.818] > 5> +I[T, 3531, 1780, 137.95, 879217, 15, NVDA, null, > +57465-02-11T05:37:31Z, null, null, 2025-06-30 00:10:53.851] > 5> +I[T, 2046, 2779, 340.58, 658954, 24, NVDA, null, > +57465-02-11T05:37:37Z, null, null, 2025-06-30 00:10:53.857] *I aggregated with tumbling based on the column timestamp:* > Table resultTable = stockTableWatermark > .window(Tumble.over(lit(1).minutes()).on($("timestamp")).as("window")) // > define window > .groupBy($("symbol"), $("window")) // group by key and window > .select( > $("symbol").as("ticker"), > $("window").start(), > $("window").end(), > $("sequence_number").count().as("trades") > ); *But why, when I print out the output of resultTable, it shows empty?* *And this is the last log message:* > 14:48:34,317 INFO > org.apache.flink.connector.base.source.reader.SourceReaderBase [] - Adding > split(s) to reader: [[Partition: dev-stock-topic-0, StartingOffset: -2, > StoppingOffset: -9223372036854775808]] > 14:48:34,340 INFO org.apache.kafka.clients.consumer.ConsumerConfig > [] - ConsumerConfig values: > allow.auto.create.topics = true > auto.commit.interval.ms = 5000 > auto.include.jmx.reporter = true > auto.offset.reset = earliest > bootstrap.servers = [localhost:9092] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = flink-dev-group-stock-consumer-4 > client.rack = > connections.max.idle.ms = 540000 > default.api.timeout.ms = 60000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = flink-dev-group-stock-consumer > group.instance.id = null > heartbeat.interval.ms = 3000 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_uncommitted > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 300000 > max.poll.records = 500 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor, class > org.apache.kafka.clients.consumer.CooperativeStickyAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.login.retry.backoff.max.ms = 10000 > sasl.login.retry.backoff.ms = 100 > sasl.mechanism = GSSAPI > sasl.oauthbearer.clock.skew.seconds = 30 > sasl.oauthbearer.expected.audience = null > sasl.oauthbearer.expected.issuer = null > sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 > sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 > sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 > sasl.oauthbearer.jwks.endpoint.url = null > sasl.oauthbearer.scope.claim.name = scope > sasl.oauthbearer.sub.claim.name = sub > sasl.oauthbearer.token.endpoint.url = null > security.protocol = PLAINTEXT > security.providers = null > send.buffer.bytes = 131072 > session.timeout.ms = 45000 > socket.connection.setup.timeout.max.ms = 30000 > socket.connection.setup.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.3] > ssl.endpoint.identification.algorithm = https > ssl.engine.factory.class = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.certificate.chain = null > ssl.keystore.key = null > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLSv1.3 > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.certificates = null > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > 14:48:34,413 WARN org.apache.kafka.clients.consumer.ConsumerConfig > [] - These configurations '[client.id.prefix, > partition.discovery.interval.ms, aws.secret.username, > aws.database.username, environment, kafka.bootstrap.servers, > aws.database.password, aws.database.database, aws.database.hostname, > kafka.topic, aws.secret.password]' were supplied but are not used yet. > > 14:48:34,413 INFO org.apache.kafka.common.utils.AppInfoParser > [] - Kafka version: 3.4.0 > 14:48:34,414 INFO org.apache.kafka.common.utils.AppInfoParser > [] - Kafka commitId: 2e1947d240607d53 > 14:48:34,414 INFO org.apache.kafka.common.utils.AppInfoParser > [] - Kafka startTimeMs: 1751269714413 > 14:48:34,427 INFO > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - > Starting split fetcher 0 > 14:48:34,436 INFO org.apache.kafka.clients.consumer.KafkaConsumer > [] - [Consumer clientId=flink-dev-group-stock-consumer-4, > groupId=flink-dev-group-stock-consumer] Assigned to partition(s): > dev-stock-topic-0 > 14:48:34,444 INFO > org.apache.kafka.clients.consumer.internals.SubscriptionState [] - > [Consumer clientId=flink-dev-group-stock-consumer-4, > groupId=flink-dev-group-stock-consumer] Seeking to earliest offset of > partition dev-stock-topic-0 > 14:48:34,470 INFO org.apache.kafka.clients.Metadata > [] - [Consumer clientId=flink-dev-group-stock-consumer-4, > groupId=flink-dev-group-stock-consumer] Resetting the last seen epoch of > partition dev-stock-topic-0 to 0 since the associated topicId changed from > null to jKk4sUaiRfSsg8h4GfqpbQ > 14:48:34,471 INFO org.apache.kafka.clients.Metadata > [] - [Consumer clientId=flink-dev-group-stock-consumer-4, > groupId=flink-dev-group-stock-consumer] Cluster ID: MkU3OEVBNTcwNTJENDM2Qk > 14:48:34,491 INFO > org.apache.kafka.clients.consumer.internals.SubscriptionState [] - > [Consumer clientId=flink-dev-group-stock-consumer-4, > groupId=flink-dev-group-stock-consumer] Resetting offset for partition > dev-stock-topic-0 to position FetchPosition{offset=0, > offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: > null)], epoch=0}}. *Could you help me address the issue?* Because I assigned the event_time and watermark, I print the output and the timestamp is there. But it is failed to group by window_start and window_end. Dmitri Yanno Mahayana Mail: dmitri.mahay...@gmail.com Phone: +6281314435818