Sending key with the event

2024-01-23 Thread Oscar Perez via user
Hi flink experts!

I have a question regarding apache flink. We want to send an event to a
certain topic but for some reason we fail to send a proper key with the
event.

The event is published properly in the topic but the key for this event is
null.  I only see the method out.collect(event) to publish the event

What should we do in order to see the key in the published message?

Thanks and regards,
Oscar


Re: Sending key with the event

2024-01-23 Thread Yaroslav Tkachenko
Hi Oscar,

The only way you can define the Kafka message key is by providing a
custom KafkaRecordSerializationSchema to your FlinkKafkaSink via
"setRecordSerializer" method.

KafkaRecordSerializationSchema.serialize method return a ProducerRecord, so
you can set things like the message key, message headers, etc.

On Tue, Jan 23, 2024 at 6:11 AM Oscar Perez via user 
wrote:

> Hi flink experts!
>
> I have a question regarding apache flink. We want to send an event to a
> certain topic but for some reason we fail to send a proper key with the
> event.
>
> The event is published properly in the topic but the key for this event is
> null.  I only see the method out.collect(event) to publish the event
>
> What should we do in order to see the key in the published message?
>
> Thanks and regards,
> Oscar
>


Long execution of SQL query to Kafka + Hive (q77 TPC-DS)

2024-01-23 Thread Вова Фролов
Hello,

I am executing a heterogeneous SQL query  (part of the data is in Hive and
part in Kafka. The query utilizes TPC-DS benchmark 100GB data.) in
BatchMode. However, the execution time is excessively long, taking
approximately 11 minutes to complete , although the request to Hive only
(without Kafka) is completed in 12 seconds.

How can I speed up execution heterogeneous SQL query to Kafka + Hive?

*   Versions of Components in Use:*

·Apache Flink: 1.17.1

·Kafka: 3.2.3

·Hive: 3.1.2.3.4.5.1-1

*Flink Job Code:*

EnviromentSettings settings =
EnviromentSettings.newInstance().inBatchMode().build();

TableEnviroment tEnv = TableEnviroment.create(settings);



*Hive Catalog*

HiveCatalog catalog = new HiveCatalog(“hive”, DEFAULT_DATABASE,
PATH_TO_CONF, HiveVersionInfo.getVersion());

tEnv.registerCatalog(“hive”, catalog);

tEnv.useCatalog(“hive”);





Creating tables with Kafka connector:



public static final String *CREATE_STORE_SALES *= "CREATE TEMPORARY TABLE
store_sales_kafka(\n" +
"  ss_sold_date_sk INTEGER,\n" +

// here are 21 columns

"  ss_net_profit DOUBLE\n" +
") WITH (\n" +
"   'connector' = 'kafka',\n" +
"   'key.format' = 'avro',\n" +
"   'key.fields' = 'ss_item_sk;ss_ticket_number',\n" +
"   'properties.group.id' = 'store_sales_group',\n" +
"   'scan.startup.mode' = 'earliest-offset',\n" +

"   'scan.bounded.mode' = 'latest-offset',\n" +
"   'properties.bootstrap.servers' = 'xyz1:9092, xyz2:9092, xyz3:9092,
xyz4:9092, xyz5:9092',\n" +
"   'topic' = 'storeSales100',\n" +

"'value.format' = 'avro',\n" +

"'value.fields-include' = 'EXCEPT_KEY'\n" +


"   );";



Q77 with Flink

tEnv.executeSql(Tpcds100.*CREATE_STORE_SALES*);
Table result = tEnv.sqlQuery(Tpcds100.*Q77_WITH_KAFKA*);
List res = CollectionUtil.*iteratorToList*(result.execute().collect());

for (Row row : res) {
System.*out*.println(row);
}



Kafka Settings: (kafka cluster consists of 6 topics(6 tables) and each has:
512 partitions, replication factor 3)

·num.network.threads=12

·num.io.threads=10

·socket.send.buffer.bytes=2097152

·socket.request.max.bytes=1073741824

Cluster consists of 5 machines and each has:

·2 CPU x86-64 20 cores, 40 threads, 2200 MHz base frequency, 3600
MHz max turbo frequency. 40 cores, 80 threads total on each machine.

·RAM 768GB, up to 640GB is available for Flink.

·2 network cards 10 Gigabit each

·10 HDD 5.5 TB

Kind regards,

Vladimir