Re: Please help with Flink + InfluxDB

2025-01-30 Thread Siva Krishna
Yes Nix.
I can confirm that I am able to connect to InfluxDB from Flink(taskmanager).
URL="http://influxdb:8086";
And also, I tried with below from inside Flink(taskmanager) container
and that also worked.
curl influxdb:8086

Siva.

On Thu, Jan 30, 2025 at 4:50 PM Nikola Milutinovic
 wrote:
>
> Hi.
>
>
>
> First of all, can you verify your Docker compose InfluxDB is working, is it 
> accepting connections?
>
>
>
> Can you connect to it from outside Docker Compose?
>
>
>
> When connecting from Flink, I hope you are referencing the service name as a 
> host, because that is how Docker Compose works. It is a common mistake to 
> target “localhost”, which stays inside the container.
>
>
>
> Nix.
>
>
>
> From: Siva Krishna 
> Date: Thursday, January 30, 2025 at 11:07 AM
> To: Nikola Milutinovic 
> Cc: user@flink.apache.org 
> Subject: Re: Please help with Flink + InfluxDB
>
> Hi Nix,
> Thank you for your reply.
> I have small update. For trial/error, I have signed in InfluxDB cloud.
> This same code is working and data is being save in the cloud.
> So, I am guessing there might issue with my 'docker compose', which is
> why the data is not being saved in local(docker InfluxDB).
> Maybe I need to correct my volumes paths or something?
>
> Warm regards,
> Siva Rama Krishna.
>
> On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic
>  wrote:
> >
> > Hi Siva.
> >
> >
> >
> > What is the InfluxDB URL you are using? I assume it is something like 
> > “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a 
> > problem.
> >
> >
> >
> > Do you see a connection on InfluxDB? Anything in the logs? Anything in the 
> > logs of Job Mnager?
> >
> >
> >
> > Nix,
> >
> >
> >
> > From: Siva Krishna 
> > Date: Thursday, January 30, 2025 at 5:12 AM
> > To: user@flink.apache.org 
> > Subject: Please help with Flink + InfluxDB
> >
> > Hi, I am trying to create a data streaming pipeline for real-time analytics.
> >
> > Kafka -> Flink -> InfluDB -> Visualize
> >
> > I am facing an issue writing the data to InfluxDB. I can see logs in
> > 'taskmanager' after submitting the Job. But, the data is not being
> > written to InfluxDB for some reason.
> > Please help, I am attaching sample docker setup and basic code.
> >
> > import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
> > import com.influxdb.client.InfluxDBClient
> > import com.influxdb.client.InfluxDBClientFactory
> > import com.influxdb.client.InfluxDBClientOptions
> > import com.influxdb.client.domain.WritePrecision
> > import com.influxdb.client.write.Point
> > import org.apache.flink.api.common.eventtime.Watermark
> > import org.apache.flink.api.connector.sink2.Sink
> > import org.apache.flink.api.connector.sink2.SinkWriter
> > import org.apache.flink.api.connector.sink2.WriterInitContext
> > import java.time.Instant
> >
> > class InfluxDBSink : Sink {
> > override fun createWriter(p0: WriterInitContext?): SinkWriter {
> > return InfluxDBSinkWriter()
> > }
> > }
> >
> > // Implement the SinkWriter that writes EventData to InfluxDB
> > class InfluxDBSinkWriter : SinkWriter {
> > private val mapper = jacksonObjectMapper()
> >
> > // Initialize the InfluxDB client
> > private val influxDBClient = initializeInfluxDBClient()
> > //private val writeApi = influxDBClient.makeWriteApi()
> >
> > // Write data to InfluxDB
> > override fun write(element: String, context: SinkWriter.Context) {
> >
> > val event = mapper.readValue(element, EventData::class.java)
> > println("eventData: $event")
> >
> > try {
> > val connectable = influxDBClient.ping()
> > val ready = influxDBClient.ready()
> > val influxStr = influxDBClient.toString()
> > println("influxStr: $connectable -> $ready -> $influxStr")
> > } catch (e: Exception) {
> > println(e.message)
> > }
> >
> > val point = Point("dummy_events")
> > .addTag("source", "flink")
> > .addField("timestamp", event.timestamp)
> > .addField("value", event.value)
> > .time(Instant.now().toEpochMilli(), WritePrecision.MS)
> > //println("Writing point to InfluxDB: $point,
> > ${point.toLineProtocol()}")
> >
> > //writeApi.writePoint(point)
> > //writeApi.flush()
> >
> > influxDBClient.writeApiBlocking.writePoint(point)
> > //println("after influxDBClient writeApiBlocking")
> > }
> >
> > // This method will flush any pending writes (can be empty if no
> > additional batching needed)
> > override fun flush(endOfInput: Boolean) {
> > //if (endOfInput) writeApi.flush()
> > }
> >
> > // Handle any watermarks if required (optional, can be skipped if
> > not needed)
> > override fun writeWatermark(watermark: Watermark) {
> > }
> >
> > // Close the writer when done
> > override fun close() {
> > influxDBClient.close()
> > }
> > }
> >
> > fun initiali

Unsubscribe

2025-01-30 Thread Muazim Wani
Unsubscribe


Re: Please help with Flink + InfluxDB

2025-01-30 Thread Nikola Milutinovic
Hi.

First of all, can you verify your Docker compose InfluxDB is working, is it 
accepting connections?

Can you connect to it from outside Docker Compose?

When connecting from Flink, I hope you are referencing the service name as a 
host, because that is how Docker Compose works. It is a common mistake to 
target “localhost”, which stays inside the container.

Nix.

From: Siva Krishna 
Date: Thursday, January 30, 2025 at 11:07 AM
To: Nikola Milutinovic 
Cc: user@flink.apache.org 
Subject: Re: Please help with Flink + InfluxDB
Hi Nix,
Thank you for your reply.
I have small update. For trial/error, I have signed in InfluxDB cloud.
This same code is working and data is being save in the cloud.
So, I am guessing there might issue with my 'docker compose', which is
why the data is not being saved in local(docker InfluxDB).
Maybe I need to correct my volumes paths or something?

Warm regards,
Siva Rama Krishna.

On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic
 wrote:
>
> Hi Siva.
>
>
>
> What is the InfluxDB URL you are using? I assume it is something like 
> “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a 
> problem.
>
>
>
> Do you see a connection on InfluxDB? Anything in the logs? Anything in the 
> logs of Job Mnager?
>
>
>
> Nix,
>
>
>
> From: Siva Krishna 
> Date: Thursday, January 30, 2025 at 5:12 AM
> To: user@flink.apache.org 
> Subject: Please help with Flink + InfluxDB
>
> Hi, I am trying to create a data streaming pipeline for real-time analytics.
>
> Kafka -> Flink -> InfluDB -> Visualize
>
> I am facing an issue writing the data to InfluxDB. I can see logs in
> 'taskmanager' after submitting the Job. But, the data is not being
> written to InfluxDB for some reason.
> Please help, I am attaching sample docker setup and basic code.
>
> import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
> import com.influxdb.client.InfluxDBClient
> import com.influxdb.client.InfluxDBClientFactory
> import com.influxdb.client.InfluxDBClientOptions
> import com.influxdb.client.domain.WritePrecision
> import com.influxdb.client.write.Point
> import org.apache.flink.api.common.eventtime.Watermark
> import org.apache.flink.api.connector.sink2.Sink
> import org.apache.flink.api.connector.sink2.SinkWriter
> import org.apache.flink.api.connector.sink2.WriterInitContext
> import java.time.Instant
>
> class InfluxDBSink : Sink {
> override fun createWriter(p0: WriterInitContext?): SinkWriter {
> return InfluxDBSinkWriter()
> }
> }
>
> // Implement the SinkWriter that writes EventData to InfluxDB
> class InfluxDBSinkWriter : SinkWriter {
> private val mapper = jacksonObjectMapper()
>
> // Initialize the InfluxDB client
> private val influxDBClient = initializeInfluxDBClient()
> //private val writeApi = influxDBClient.makeWriteApi()
>
> // Write data to InfluxDB
> override fun write(element: String, context: SinkWriter.Context) {
>
> val event = mapper.readValue(element, EventData::class.java)
> println("eventData: $event")
>
> try {
> val connectable = influxDBClient.ping()
> val ready = influxDBClient.ready()
> val influxStr = influxDBClient.toString()
> println("influxStr: $connectable -> $ready -> $influxStr")
> } catch (e: Exception) {
> println(e.message)
> }
>
> val point = Point("dummy_events")
> .addTag("source", "flink")
> .addField("timestamp", event.timestamp)
> .addField("value", event.value)
> .time(Instant.now().toEpochMilli(), WritePrecision.MS)
> //println("Writing point to InfluxDB: $point,
> ${point.toLineProtocol()}")
>
> //writeApi.writePoint(point)
> //writeApi.flush()
>
> influxDBClient.writeApiBlocking.writePoint(point)
> //println("after influxDBClient writeApiBlocking")
> }
>
> // This method will flush any pending writes (can be empty if no
> additional batching needed)
> override fun flush(endOfInput: Boolean) {
> //if (endOfInput) writeApi.flush()
> }
>
> // Handle any watermarks if required (optional, can be skipped if
> not needed)
> override fun writeWatermark(watermark: Watermark) {
> }
>
> // Close the writer when done
> override fun close() {
> influxDBClient.close()
> }
> }
>
> fun initializeInfluxDBClient(): InfluxDBClient {
> // Build the InfluxDBClientOptions with all required parameters
> val influxDBOptions = InfluxDBClientOptions.builder()
> .url(influxDBUrl)  // InfluxDB URL
> .authenticate(influxUserName, influxPassword.toCharArray())
> // Username and password
> .org(influxOrg)  // Organization name
> .bucket(influxBucket)  // Bucket name
> .authenticateToken(influxToken.toCharArray())  // Token for
> authentication
> .build()
>
> // Create the InfluxDB client using the built options
> return I

Re: Dynamic Kafka Properties

2025-01-30 Thread Salva Alcántara
By looking at:

https://github.com/apache/flink-connector-kafka/pull/20

where the "setRackIdSupplier" setter for the kafka source builder was
introduced, I guess one could take a more general approach and instead
consider overrides for:

"setProperty"
"setProperties"

which receive a supplier for the property/properties (this could be done
for sinks, too).

These new methods could be used for any property that one requires to be
inferred/run in the taskmanagers for what it's worth, not just the rack id
one.

I guess as of now the rack id is probably the only property requiring lazy
evaluation so to say, but in the future who knows...

Regards,

Salva

On Mon, Jan 27, 2025 at 6:11 PM Salva Alcántara 
wrote:

> I've recently started to enable rack awareness in my sources following
> this:
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#rackid
>
> E.g.,
>
> ```java
>
> .setRackIdSupplier(() -> System.getenv("TM_NODE_AZ"))
>
> ```
>
> This approach allows to decouple the AZ between jobmanager & taskamangers.
>
> There are indeed some Kafka-compatible solutions like WarpStream which
> support rack awareness for producers, too. E.g.,
>
>
> https://docs.warpstream.com/warpstream/byoc/configure-kafka-client/configuring-kafka-client-id-features#warpstream_az
>
> The problem is that they pass the rack id using a normal producer
> property. Because the Kafka sink builder gets the properties and serializes
> them in the jobmanager, all the taskmanagers will use the same AZ as that
> of the jobmanager.
>
> Is there a way to easily pass "dynamic" properties so to say?
>
> Would it make sense to consider a supplier-based override for the property
> setters of the Kafka builders?
>
> Regards,
>
> Salva
>
>


Re: Please help with Flink + InfluxDB

2025-01-30 Thread Nikola Milutinovic
Hi Siva.

What is the InfluxDB URL you are using? I assume it is something like 
“influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a 
problem.

Do you see a connection on InfluxDB? Anything in the logs? Anything in the logs 
of Job Mnager?

Nix,

From: Siva Krishna 
Date: Thursday, January 30, 2025 at 5:12 AM
To: user@flink.apache.org 
Subject: Please help with Flink + InfluxDB
Hi, I am trying to create a data streaming pipeline for real-time analytics.

Kafka -> Flink -> InfluDB -> Visualize

I am facing an issue writing the data to InfluxDB. I can see logs in
'taskmanager' after submitting the Job. But, the data is not being
written to InfluxDB for some reason.
Please help, I am attaching sample docker setup and basic code.

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.influxdb.client.InfluxDBClient
import com.influxdb.client.InfluxDBClientFactory
import com.influxdb.client.InfluxDBClientOptions
import com.influxdb.client.domain.WritePrecision
import com.influxdb.client.write.Point
import org.apache.flink.api.common.eventtime.Watermark
import org.apache.flink.api.connector.sink2.Sink
import org.apache.flink.api.connector.sink2.SinkWriter
import org.apache.flink.api.connector.sink2.WriterInitContext
import java.time.Instant

class InfluxDBSink : Sink {
override fun createWriter(p0: WriterInitContext?): SinkWriter {
return InfluxDBSinkWriter()
}
}

// Implement the SinkWriter that writes EventData to InfluxDB
class InfluxDBSinkWriter : SinkWriter {
private val mapper = jacksonObjectMapper()

// Initialize the InfluxDB client
private val influxDBClient = initializeInfluxDBClient()
//private val writeApi = influxDBClient.makeWriteApi()

// Write data to InfluxDB
override fun write(element: String, context: SinkWriter.Context) {

val event = mapper.readValue(element, EventData::class.java)
println("eventData: $event")

try {
val connectable = influxDBClient.ping()
val ready = influxDBClient.ready()
val influxStr = influxDBClient.toString()
println("influxStr: $connectable -> $ready -> $influxStr")
} catch (e: Exception) {
println(e.message)
}

val point = Point("dummy_events")
.addTag("source", "flink")
.addField("timestamp", event.timestamp)
.addField("value", event.value)
.time(Instant.now().toEpochMilli(), WritePrecision.MS)
//println("Writing point to InfluxDB: $point,
${point.toLineProtocol()}")

//writeApi.writePoint(point)
//writeApi.flush()

influxDBClient.writeApiBlocking.writePoint(point)
//println("after influxDBClient writeApiBlocking")
}

// This method will flush any pending writes (can be empty if no
additional batching needed)
override fun flush(endOfInput: Boolean) {
//if (endOfInput) writeApi.flush()
}

// Handle any watermarks if required (optional, can be skipped if
not needed)
override fun writeWatermark(watermark: Watermark) {
}

// Close the writer when done
override fun close() {
influxDBClient.close()
}
}

fun initializeInfluxDBClient(): InfluxDBClient {
// Build the InfluxDBClientOptions with all required parameters
val influxDBOptions = InfluxDBClientOptions.builder()
.url(influxDBUrl)  // InfluxDB URL
.authenticate(influxUserName, influxPassword.toCharArray())
// Username and password
.org(influxOrg)  // Organization name
.bucket(influxBucket)  // Bucket name
.authenticateToken(influxToken.toCharArray())  // Token for
authentication
.build()

// Create the InfluxDB client using the built options
return InfluxDBClientFactory.create(influxDBOptions)
}


Main.kt
fun main() {
   // Create KafkaSource with properties
val kafkaSource = KafkaSource.builder()
.setBootstrapServers("kafka:9092")
.setTopics("dummy-events") // Kafka topic
.setGroupId("flink-group")
.setStartingOffsets(OffsetsInitializer.earliest())  // Start
from the latest message
.setValueOnlyDeserializer(SimpleStringSchema())  //
Deserialize string values
.setProperties(Properties().apply {
setProperty("auto.offset.reset", "earliest")
})
.build()

// Create a WatermarkStrategy for event time processing
val watermarkStrategy = WatermarkStrategy.noWatermarks()

// Set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment()

// Add KafkaSource as the input stream, provide watermark strategy
and a source name
val stream = env.fromSource(
kafkaSource,
watermarkStrategy,
"KafkaSource"
)

stream.sinkTo(InfluxDBSink())

// Execute the job
env.execute("Flink Kafka Integration")
}

But below curl is working.
curl --request POST
"http://localhost:8086/api/v2/w

Re: Performance problem with FlinkSQL

2025-01-30 Thread Pedro Mázala
> The average output rate needed to avoid lag after filtering messages
should be around 60K messages per second. I’ve been testing different
configurations of parallelism, slots and pods (everything runs on
Kubernetes), but I’m far from achieving those numbers.

How are you partitioning your query? Do you see any backpressure happening
on the Flink UI?

> In the latest configuration, I used 20 pods, a parallelism of 120, with 4
slots per taskmanager.

Were all tasks working properly or you had idleness?

> Additionally, setting parallelism to 120 creates hundreds of subtasks for
the smaller topics, which don’t do much but still consume minimal resources
even if idle.

On the table API I'm not sure if you can choose parallelism per "task"
DataStream I'm sure you can do it.



Att,
Pedro Mázala
+31 (06) 3819 3814
Be awesome


On Wed, 29 Jan 2025 at 22:06, Guillermo Ortiz Fernández <
guillermo.ortiz.f...@gmail.com> wrote:

> After last checking it uses about 200-400 millicores each pod and 2.2Gb.
>
> El mié, 29 ene 2025 a las 21:41, Guillermo Ortiz Fernández (<
> guillermo.ortiz.f...@gmail.com>) escribió:
>
>> I have a job entirely written in Flink SQL. The first part of the program
>> processes 10 input topics and generates one output topic with normalized
>> messages and some filtering applied (really easy, some where by fields and
>> substring). Nine of the topics produce between hundreds and thousands of
>> messages per second, with an average of 4–10 partitions each. The other
>> topic produces 150K messages per second and has 500 partitions. They are
>> unioned to the output topic.
>>
>> The average output rate needed to avoid lag after filtering messages
>> should be around 60K messages per second. I’ve been testing different
>> configurations of parallelism, slots and pods (everything runs on
>> Kubernetes), but I’m far from achieving those numbers.
>>
>> In the latest configuration, I used 20 pods, a parallelism of 120, with 4
>> slots per taskmanager. With this setup, I achieve approximately 20K
>> messages per second, but I’m unable to consume the largest topic at the
>> rate messages are being produced. Additionally, setting parallelism to 120
>> creates hundreds of subtasks for the smaller topics, which don’t do much
>> but still consume minimal resources even if idle.
>>
>> I started trying with parallelism 12 and got about 1000-3000 messages per
>> second.
>>
>> When I check the use of cpu and memory to the pods and don't see any
>> problem and they are far from the limit, each taskmanager has 4gb and
>> 2cpus and they are never close to using the cpu.
>>
>> It’s a requirement to do everything 100% in SQL. How can I improve the
>> throughput rate? Should I be concerned about the hundreds of subtasks
>> created for the smaller topics?
>>
>


Re: Resolving Table using Java API

2025-01-30 Thread Pedro Mázala
In almost every case, the “table should be resolved” error from
FlinkCatalog.createTable() means that the table identifier you are passing
is either invalid, incomplete, or the catalog can’t figure out where the
table is supposed to live. Most commonly, it’s because:
The database (namespace) is empty or invalid, e.g., an empty
catalogNamespace.
The table name is empty or null, e.g., a missing catalogTable.
A required configuration (e.g., warehouse) or property is not present,
causing Iceberg to fail resolving the path.



I'd add a log every time you do dotenv.get (for both catalog namespace and
 catalog table) and make sure those configs are coming properly and not
null/empty.


Att,
Pedro Mázala
+31 (06) 3819 3814
Be awesome


On Wed, 29 Jan 2025 at 17:11, Kamesh Sampath 
wrote:

> I am on Apache Flink 1.20, trying to use pure Java SQL API to create a
> catalog and table in it. When I tried to do so I am getting the following
> error:
>
> ```
> 21:31:41,939 ERROR dev.kameshs.demos.flink.iceberg
>  [] - Error running job
> java.lang.IllegalArgumentException: table should be resolved
> at
> org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:143)
> ~[iceberg-flink-runtime-1.20-1.7.1.jar:?]
> at
> org.apache.iceberg.flink.FlinkCatalog.createTable(FlinkCatalog.java:394)
> ~[iceberg-flink-runtime-1.20-1.7.1.jar:?]
> at
> dev.kameshs.demos.flink.iceberg.VehicleTelemetryProcessor.lambda$main$0(VehicleTelemetryProcessor.java:167)
> ~[main/:?]
> at java.util.Optional.ifPresent(Optional.java:178) ~[?:?]
> at
> dev.kameshs.demos.flink.iceberg.VehicleTelemetryProcessor.main(VehicleTelemetryProcessor.java:106)
> [main/:?]
> ```
>
> The sample code is
> https://gist.github.com/kameshsampath/0c4ad07b236bd38e999316598141b93e#file-vehicletelemetryprocessor-java-L167
>
> Any clue/pointers to move forward.
>
> Thank you.
>


Re: Please help with Flink + InfluxDB

2025-01-30 Thread Siva Krishna
Hi Nix,
Thank you for your reply.
I have small update. For trial/error, I have signed in InfluxDB cloud.
This same code is working and data is being save in the cloud.
So, I am guessing there might issue with my 'docker compose', which is
why the data is not being saved in local(docker InfluxDB).
Maybe I need to correct my volumes paths or something?

Warm regards,
Siva Rama Krishna.

On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic
 wrote:
>
> Hi Siva.
>
>
>
> What is the InfluxDB URL you are using? I assume it is something like 
> “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a 
> problem.
>
>
>
> Do you see a connection on InfluxDB? Anything in the logs? Anything in the 
> logs of Job Mnager?
>
>
>
> Nix,
>
>
>
> From: Siva Krishna 
> Date: Thursday, January 30, 2025 at 5:12 AM
> To: user@flink.apache.org 
> Subject: Please help with Flink + InfluxDB
>
> Hi, I am trying to create a data streaming pipeline for real-time analytics.
>
> Kafka -> Flink -> InfluDB -> Visualize
>
> I am facing an issue writing the data to InfluxDB. I can see logs in
> 'taskmanager' after submitting the Job. But, the data is not being
> written to InfluxDB for some reason.
> Please help, I am attaching sample docker setup and basic code.
>
> import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
> import com.influxdb.client.InfluxDBClient
> import com.influxdb.client.InfluxDBClientFactory
> import com.influxdb.client.InfluxDBClientOptions
> import com.influxdb.client.domain.WritePrecision
> import com.influxdb.client.write.Point
> import org.apache.flink.api.common.eventtime.Watermark
> import org.apache.flink.api.connector.sink2.Sink
> import org.apache.flink.api.connector.sink2.SinkWriter
> import org.apache.flink.api.connector.sink2.WriterInitContext
> import java.time.Instant
>
> class InfluxDBSink : Sink {
> override fun createWriter(p0: WriterInitContext?): SinkWriter {
> return InfluxDBSinkWriter()
> }
> }
>
> // Implement the SinkWriter that writes EventData to InfluxDB
> class InfluxDBSinkWriter : SinkWriter {
> private val mapper = jacksonObjectMapper()
>
> // Initialize the InfluxDB client
> private val influxDBClient = initializeInfluxDBClient()
> //private val writeApi = influxDBClient.makeWriteApi()
>
> // Write data to InfluxDB
> override fun write(element: String, context: SinkWriter.Context) {
>
> val event = mapper.readValue(element, EventData::class.java)
> println("eventData: $event")
>
> try {
> val connectable = influxDBClient.ping()
> val ready = influxDBClient.ready()
> val influxStr = influxDBClient.toString()
> println("influxStr: $connectable -> $ready -> $influxStr")
> } catch (e: Exception) {
> println(e.message)
> }
>
> val point = Point("dummy_events")
> .addTag("source", "flink")
> .addField("timestamp", event.timestamp)
> .addField("value", event.value)
> .time(Instant.now().toEpochMilli(), WritePrecision.MS)
> //println("Writing point to InfluxDB: $point,
> ${point.toLineProtocol()}")
>
> //writeApi.writePoint(point)
> //writeApi.flush()
>
> influxDBClient.writeApiBlocking.writePoint(point)
> //println("after influxDBClient writeApiBlocking")
> }
>
> // This method will flush any pending writes (can be empty if no
> additional batching needed)
> override fun flush(endOfInput: Boolean) {
> //if (endOfInput) writeApi.flush()
> }
>
> // Handle any watermarks if required (optional, can be skipped if
> not needed)
> override fun writeWatermark(watermark: Watermark) {
> }
>
> // Close the writer when done
> override fun close() {
> influxDBClient.close()
> }
> }
>
> fun initializeInfluxDBClient(): InfluxDBClient {
> // Build the InfluxDBClientOptions with all required parameters
> val influxDBOptions = InfluxDBClientOptions.builder()
> .url(influxDBUrl)  // InfluxDB URL
> .authenticate(influxUserName, influxPassword.toCharArray())
> // Username and password
> .org(influxOrg)  // Organization name
> .bucket(influxBucket)  // Bucket name
> .authenticateToken(influxToken.toCharArray())  // Token for
> authentication
> .build()
>
> // Create the InfluxDB client using the built options
> return InfluxDBClientFactory.create(influxDBOptions)
> }
>
>
> Main.kt
> fun main() {
>// Create KafkaSource with properties
> val kafkaSource = KafkaSource.builder()
> .setBootstrapServers("kafka:9092")
> .setTopics("dummy-events") // Kafka topic
> .setGroupId("flink-group")
> .setStartingOffsets(OffsetsInitializer.earliest())  // Start
> from the latest message
> .setValueOnlyDeserializer(SimpleStringSchema())  //
> Deserialize string values
> .setProperties(Properties().apply {
>