docs fixes

2022-05-18 Thread Владимир Савостин
In section https://kafka.apache.org/quickstart#quickstart_kafkaconnect
you should change plugin path from lib/connect-file-3.2.0.ja to
libs/connect-file-3.2.0.ja
Have a nice day!


What does Apache Kafka do?

2022-05-18 Thread Turritopsis Dohrnii Teo En Ming
Subject: What does Apache Kafka do?

Good day from Singapore,

I notice my company/organization is using Apache Kafka. What does it do?

Just being curious.

Regards,

Mr. Turritopsis Dohrnii Teo En Ming
Targeted Individual in Singapore
18 May 2022 Wed


Re: What does Apache Kafka do?

2022-05-18 Thread sunil chaudhari
Hi
Good evening from India😃
I am using Kafka(confluent Platform) since long time.
Its actually a message broker in simple term which can persist data on disk
for short time in days or weeks.
It acts like a buffer in between message producer and message consumer.
Kafka reduces loading of consumers by acting as a buffer in real time data
processing apps.
In short.. you can achieve lose coupling between producers and consumers
using kafka. (Asynchronous communication)
And there are lot of other features like Rest Proxy, connectors, Schema
Registry etc. you need to invest time in reading for good returns in long
term 😀

I hope it helps.

Cheers,
Sunil.



On Wed, 18 May 2022 at 8:01 PM, Turritopsis Dohrnii Teo En Ming <
ceo.teo.en.m...@gmail.com> wrote:

> Subject: What does Apache Kafka do?
>
> Good day from Singapore,
>
> I notice my company/organization is using Apache Kafka. What does it do?
>
> Just being curious.
>
> Regards,
>
> Mr. Turritopsis Dohrnii Teo En Ming
> Targeted Individual in Singapore
> 18 May 2022 Wed
>


Re: kafka stream - sliding window - getting unexpected output

2022-05-18 Thread Matthias J. Sax

Emitting intermediate result is by-design.

If you don't want to get intermediate result, you can add `suppress()` 
after the aggregation and configure it to only "emit on window close".


-Matthias

On 5/17/22 3:20 AM, Shankar Mane wrote:

Hi All,

Our use case is to use sliding window. (for e.g. at any point, whenever

user performs any actions at time [ t1 ], we would like to see his activity
in [ t1 - last 24 hours]. Using this, to show the user some recommendations.




-- I have code ready and it works without any errors.
-- aggregations happen as expected.
-- but the output generated is unexpected. As windows gets slides, i am
getting mixed output which includes intermediate aggregated records also
coming with final aggregated outputs.

Could someone please help me here ?  what can I do here to get ONLY final
aggregated output.


Code snippet :




builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
 .filter((k, v) -> v != null)
 .map((k,v) -> KeyValue.pair(v.getUserId(), v))
 //.through("slidingbykey",
Produced.with(Serdes.String(), inputSerde))
 .groupByKey()

.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
windowDuration))
 .aggregate(OutputPojo::new, (k, tr, out) -> {
 out.setUserId(tr.getUserId());
 out.setCount(out.getCount() +1);
 out.setSum(out.getSum() + tr.getInt4());
 out.setUuid(tr.getUuid());

out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
 waitForMs(200); //added delay just for analysing output
 return out;
 }, Materialized.with(stringSerde, outputSerde))
 .suppress(Suppressed.untilTimeLimit(windowDuration,
Suppressed.BufferConfig.unbounded()))
 .toStream()
 .map((Windowed key, OutputPojo out) -> {
 return new KeyValue<>(key.key(),out) ;
 })
 .print(Printed.toSysOut());
//.to(aveTempOutputTopic, Produced.with(stringSerde,
outputSerde))
 ;






Input data :

for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done

{'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid':
'64f019ee-9cf4-427d-b4c9-f2b5f88820e1'}
{'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid':
'cf173b3e-c34f-470a-ba15-ef648d0be8b9'}
{'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid':
'48d2b4ea-052d-42fa-a998-0216d928c034'}
{'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid':
'55a6c26c-3d2c-46f1-ab3c-04927f660cbe'}
{'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid':
'dbfd8cee-565d-496b-b5a8-773ae64bc518'}
{'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid':
'135dc5cd-50cb-467b-9e63-300fdeedaf75'}
{'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid':
'66d8e3c7-8f63-43ca-acf1-e39619bf33a0'}
{'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid':
'f037712b-42a5-4449-bcc2-cf6eafddf5ad'}
{'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid':
'7baa4254-b9da-43dc-bbb7-4caede578aeb'}
{'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid':
'16541989-f3ba-49f6-bd31-bf8a75ba8eac'}






Output (*Unexpected*) :  below output is captured at each sliding window of
1s duration   (but input data is published at 2s of interval) :

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,

strTime=2022-05-17 15:31:28.263,
uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)  > seems older UUID
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
strTime=2022-05-17 15:31:28.263, uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
strTime=2022-05-17 15:31:56.234, uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
strTime=2022-05-17 15:31:58.436, uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, 

Re: docs fixes

2022-05-18 Thread Guozhang Wang
Thanks for reporting this.

Are you interested in submitting a PR to fix it?

Guozhang

On Wed, May 18, 2022 at 7:26 AM Владимир Савостин 
wrote:

> In section https://kafka.apache.org/quickstart#quickstart_kafkaconnect
> you should change plugin path from lib/connect-file-3.2.0.ja to
> libs/connect-file-3.2.0.ja
> Have a nice day!
>


-- 
-- Guozhang


Re: kafka stream - sliding window - getting unexpected output

2022-05-18 Thread Shankar Mane
@Matthias J. Sax / All

Have added below line :

> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
>
>

Here is the output : (for uuid (*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*,
expecting single output but that is not the case here. Which 1 is the final
output from those 2 rows for the same uuid ?

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
> strTime=2022-05-19 11:48:08.128, uuid=fb6bea5f-8fd0-4c03-8df3-aaf392f04a5a)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
> strTime=2022-05-19 11:48:10.328, uuid=b4ab837f-b10a-452d-a663-719215d2992f)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
> strTime=2022-05-19 11:48:12.527, uuid=8fa1b621-c967-4770-9f85-9fd84999c97c)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
> strTime=2022-05-19 11:48:14.726, uuid=1fc21253-7859-45ef-969e-82ed596c4fa0)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
> strTime=2022-05-19 11:48:16.925, uuid=
> *2cbef750-325b-4a2f-ac39-b2c23fa0313f)*
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
> strTime=2022-05-19 11:48:16.925, uuid=
> *2cbef750-325b-4a2f-ac39-b2c23fa0313f*)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
> strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
> strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
> strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
> strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
> strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
> strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa)
> [KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
> strTime=2022-05-19 11:48:25.721, uuid=fbe62fa4-e7c4-437f-b976-0bb7ae0c4390)


On Wed, May 18, 2022 at 10:21 PM Matthias J. Sax  wrote:

> Emitting intermediate result is by-design.
>
> If you don't want to get intermediate result, you can add `suppress()`
> after the aggregation and configure it to only "emit on window close".
>
> -Matthias
>
> On 5/17/22 3:20 AM, Shankar Mane wrote:
> > Hi All,
> >
> > Our use case is to use sliding window. (for e.g. at any point, whenever
> >> user performs any actions at time [ t1 ], we would like to see his
> activity
> >> in [ t1 - last 24 hours]. Using this, to show the user some
> recommendations.
> >
> >
> >
> > -- I have code ready and it works without any errors.
> > -- aggregations happen as expected.
> > -- but the output generated is unexpected. As windows gets slides, i am
> > getting mixed output which includes intermediate aggregated records also
> > coming with final aggregated outputs.
> >
> > Could someone please help me here ?  what can I do here to get ONLY final
> > aggregated output.
> >
> >
> > Code snippet :
> > 
> >
> >
> >
> > builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
> >  .filter((k, v) -> v != null)
> >  .map((k,v) -> KeyValue.pair(v.getUserId(), v))
> >  //.through("slidingbykey",
> > Produced.with(Serdes.String(), inputSerde))
> >  .groupByKey()
> >
> >
> .windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
> > windowDuration))
> >  .aggregate(OutputPojo::new, (k, tr, out) -> {
> >  out.setUserId(tr.getUserId());
> >  out.setCount(out.getCount() +1);
> >  out.setSum(out.getSum() + tr.getInt4());
> >  out.setUuid(tr.getUuid());
> >
> > out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
> >  waitForMs(200); //added delay just for analysing
> output
> >  return out;
> >  }, Materialized.with(stringSerde, outputSerde))
> >  .suppress(Suppressed.untilTimeLimit(windowDuration,
> > Suppressed.BufferConfig.unbounded()))
> >  .toStream()
> >  .map((Windowed key, OutputPojo out) -> {
> >  return new KeyValue<>(key.key(),out) ;
> >  })
> >  .print(Printed.toSysOut());
> > //.to(aveTempOutputTopic, Produced.with(stringSerde,
> > outputSerde))
> >  ;
> >
> >
> >
> > 
> >
> >
> > Input data