docs fixes
In section https://kafka.apache.org/quickstart#quickstart_kafkaconnect you should change plugin path from lib/connect-file-3.2.0.ja to libs/connect-file-3.2.0.ja Have a nice day!
What does Apache Kafka do?
Subject: What does Apache Kafka do? Good day from Singapore, I notice my company/organization is using Apache Kafka. What does it do? Just being curious. Regards, Mr. Turritopsis Dohrnii Teo En Ming Targeted Individual in Singapore 18 May 2022 Wed
Re: What does Apache Kafka do?
Hi Good evening from India😃 I am using Kafka(confluent Platform) since long time. Its actually a message broker in simple term which can persist data on disk for short time in days or weeks. It acts like a buffer in between message producer and message consumer. Kafka reduces loading of consumers by acting as a buffer in real time data processing apps. In short.. you can achieve lose coupling between producers and consumers using kafka. (Asynchronous communication) And there are lot of other features like Rest Proxy, connectors, Schema Registry etc. you need to invest time in reading for good returns in long term 😀 I hope it helps. Cheers, Sunil. On Wed, 18 May 2022 at 8:01 PM, Turritopsis Dohrnii Teo En Ming < ceo.teo.en.m...@gmail.com> wrote: > Subject: What does Apache Kafka do? > > Good day from Singapore, > > I notice my company/organization is using Apache Kafka. What does it do? > > Just being curious. > > Regards, > > Mr. Turritopsis Dohrnii Teo En Ming > Targeted Individual in Singapore > 18 May 2022 Wed >
Re: kafka stream - sliding window - getting unexpected output
Emitting intermediate result is by-design. If you don't want to get intermediate result, you can add `suppress()` after the aggregation and configure it to only "emit on window close". -Matthias On 5/17/22 3:20 AM, Shankar Mane wrote: Hi All, Our use case is to use sliding window. (for e.g. at any point, whenever user performs any actions at time [ t1 ], we would like to see his activity in [ t1 - last 24 hours]. Using this, to show the user some recommendations. -- I have code ready and it works without any errors. -- aggregations happen as expected. -- but the output generated is unexpected. As windows gets slides, i am getting mixed output which includes intermediate aggregated records also coming with final aggregated outputs. Could someone please help me here ? what can I do here to get ONLY final aggregated output. Code snippet : builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde)) .filter((k, v) -> v != null) .map((k,v) -> KeyValue.pair(v.getUserId(), v)) //.through("slidingbykey", Produced.with(Serdes.String(), inputSerde)) .groupByKey() .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), windowDuration)) .aggregate(OutputPojo::new, (k, tr, out) -> { out.setUserId(tr.getUserId()); out.setCount(out.getCount() +1); out.setSum(out.getSum() + tr.getInt4()); out.setUuid(tr.getUuid()); out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp())); waitForMs(200); //added delay just for analysing output return out; }, Materialized.with(stringSerde, outputSerde)) .suppress(Suppressed.untilTimeLimit(windowDuration, Suppressed.BufferConfig.unbounded())) .toStream() .map((Windowed key, OutputPojo out) -> { return new KeyValue<>(key.key(),out) ; }) .print(Printed.toSysOut()); //.to(aveTempOutputTopic, Produced.with(stringSerde, outputSerde)) ; Input data : for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done {'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid': '64f019ee-9cf4-427d-b4c9-f2b5f88820e1'} {'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid': 'cf173b3e-c34f-470a-ba15-ef648d0be8b9'} {'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid': '48d2b4ea-052d-42fa-a998-0216d928c034'} {'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid': '55a6c26c-3d2c-46f1-ab3c-04927f660cbe'} {'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid': 'dbfd8cee-565d-496b-b5a8-773ae64bc518'} {'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid': '135dc5cd-50cb-467b-9e63-300fdeedaf75'} {'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid': '66d8e3c7-8f63-43ca-acf1-e39619bf33a0'} {'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid': 'f037712b-42a5-4449-bcc2-cf6eafddf5ad'} {'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid': '7baa4254-b9da-43dc-bbb7-4caede578aeb'} {'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid': '16541989-f3ba-49f6-bd31-bf8a75ba8eac'} Output (*Unexpected*) : below output is captured at each sliding window of 1s duration (but input data is published at 2s of interval) : [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0, strTime=2022-05-17 15:31:28.263, uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34) > seems older UUID [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0, strTime=2022-05-17 15:31:28.263, uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34) [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0, strTime=2022-05-17 15:31:56.234, uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1) [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0, strTime=2022-05-17 15:31:58.436, uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9) [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0, strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034) [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0, strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034) [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0, strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe) [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0, strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe) [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001,
Re: docs fixes
Thanks for reporting this. Are you interested in submitting a PR to fix it? Guozhang On Wed, May 18, 2022 at 7:26 AM Владимир Савостин wrote: > In section https://kafka.apache.org/quickstart#quickstart_kafkaconnect > you should change plugin path from lib/connect-file-3.2.0.ja to > libs/connect-file-3.2.0.ja > Have a nice day! > -- -- Guozhang
Re: kafka stream - sliding window - getting unexpected output
@Matthias J. Sax / All Have added below line : > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > > Here is the output : (for uuid (*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*, expecting single output but that is not the case here. Which 1 is the final output from those 2 rows for the same uuid ? [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0, > strTime=2022-05-19 11:48:08.128, uuid=fb6bea5f-8fd0-4c03-8df3-aaf392f04a5a) [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0, > strTime=2022-05-19 11:48:10.328, uuid=b4ab837f-b10a-452d-a663-719215d2992f) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0, > strTime=2022-05-19 11:48:12.527, uuid=8fa1b621-c967-4770-9f85-9fd84999c97c) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:14.726, uuid=1fc21253-7859-45ef-969e-82ed596c4fa0) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0, > strTime=2022-05-19 11:48:16.925, uuid= > *2cbef750-325b-4a2f-ac39-b2c23fa0313f)* > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:16.925, uuid= > *2cbef750-325b-4a2f-ac39-b2c23fa0313f*) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0, > strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0, > strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0, > strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0, > strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa) > [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0, > strTime=2022-05-19 11:48:25.721, uuid=fbe62fa4-e7c4-437f-b976-0bb7ae0c4390) On Wed, May 18, 2022 at 10:21 PM Matthias J. Sax wrote: > Emitting intermediate result is by-design. > > If you don't want to get intermediate result, you can add `suppress()` > after the aggregation and configure it to only "emit on window close". > > -Matthias > > On 5/17/22 3:20 AM, Shankar Mane wrote: > > Hi All, > > > > Our use case is to use sliding window. (for e.g. at any point, whenever > >> user performs any actions at time [ t1 ], we would like to see his > activity > >> in [ t1 - last 24 hours]. Using this, to show the user some > recommendations. > > > > > > > > -- I have code ready and it works without any errors. > > -- aggregations happen as expected. > > -- but the output generated is unexpected. As windows gets slides, i am > > getting mixed output which includes intermediate aggregated records also > > coming with final aggregated outputs. > > > > Could someone please help me here ? what can I do here to get ONLY final > > aggregated output. > > > > > > Code snippet : > > > > > > > > > > builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde)) > > .filter((k, v) -> v != null) > > .map((k,v) -> KeyValue.pair(v.getUserId(), v)) > > //.through("slidingbykey", > > Produced.with(Serdes.String(), inputSerde)) > > .groupByKey() > > > > > .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000), > > windowDuration)) > > .aggregate(OutputPojo::new, (k, tr, out) -> { > > out.setUserId(tr.getUserId()); > > out.setCount(out.getCount() +1); > > out.setSum(out.getSum() + tr.getInt4()); > > out.setUuid(tr.getUuid()); > > > > out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp())); > > waitForMs(200); //added delay just for analysing > output > > return out; > > }, Materialized.with(stringSerde, outputSerde)) > > .suppress(Suppressed.untilTimeLimit(windowDuration, > > Suppressed.BufferConfig.unbounded())) > > .toStream() > > .map((Windowed key, OutputPojo out) -> { > > return new KeyValue<>(key.key(),out) ; > > }) > > .print(Printed.toSysOut()); > > //.to(aveTempOutputTopic, Produced.with(stringSerde, > > outputSerde)) > > ; > > > > > > > > > > > > > > Input data