Mohammed,

The first thing you need to do is making sure to set a key for this stream. This can be accomplished either in the creation statement or creating a new stream and using the *PARTITION BY* clause. For the sake of simplicity; the example below uses the creation statement strategy:

```

CREATE STREAM events(event_type STRING)

   WITH (KAFKA_TOPIC='events', *KEY='event_type'*, VALUE_FORMAT='DELIMITED');

```

This will make sure that each record in the topic will have a key associated. Then, you will need to create two tables:

### One to aggregate the sum of all event types

```

CREATE TABLE number_of_events AS

   SELECT COUNT(event_type) AS number_of_events

   FROM EVENTS GROUP BY 'number_of_events';

```

That you can easily query the result using a pull query:

```

SELECT number_of_events

FROM NUMBER_OF_EVENTS

WHERE ROWKEY = 'number_of_events';

```

### One to aggregate the sum of all event types per event

```

CREATE TABLE events_per_type AS

   SELECT event_type as event_type, COUNT(event_type) AS total

   FROM EVENTS GROUP BY event_type;

```

That you can query using a push query:

```

SELECT * FROM events_per_type EMIT CHANGES;

```

Thanks,

-- Ricardo

On 6/4/20 8:48 PM, Mohammed Ait Haddou wrote:
I have a stream with an event_type field, possible values are (view, cart,
purchase).

CREATE STREAM events(event_type STRING)
WITH (KAFKA_TOPIC='events', VALUE_FORMAT='DELIMITED');

I want to count the total number of all events and the number of events for
each event_type into a single table.

Reply via email to