Hi:
I am using RocksDbSessionBytesStoreSupplier in my kafka streaming application
for an aggregation like this:
var materialized =
Materialized.<String, List<CDCRecord>>as(
new
RocksDbSessionBytesStoreSupplier(env.getProperty("messages.cdc.pft.topic",
"NASHCM.PAYROLL.PFT.FILENUMBER"),
Duration.parse(env.getProperty("pft.duration", "P7D")).toMillis()))
.withKeySerde(stringSerde)
.withValueSerde(listSerde);
stream.windowedBy(SessionWindows
.with(Duration.parse(env.getProperty("pft.gap", "PT0.1S")))
.grace(Duration.parse(env.getProperty("pft.duration",
"PT0.05S")))
)
.aggregate(ArrayList::new,
(k, v, list)->{list.add(v); return list;},
(k, list1, list2)->{list1.addAll(list2); return list1;},
materialized)
.toStream().foreach((key, value) -> {
//sometimes value is null, but this should never happened – and we do see some
messages not processed.
}
The application runs on Kubernetes, should we not use
RocksDbSessionBytesStoreSupplier?
Thanks
Andrew
This message and any attachments are intended only for the use of the addressee
and may contain information that is privileged and confidential. If the reader
of the message is not the intended recipient or an authorized representative of
the intended recipient, you are hereby notified that any dissemination of this
communication is strictly prohibited. If you have received this communication
in error, notify the sender immediately by return email and delete the message
and any attachments from your system.