Hi.

First of all, can you verify your Docker compose InfluxDB is working, is it 
accepting connections?
Can you connect to it from outside Docker Compose?

When connecting from Flink, I hope you are referencing the service name as a 
host, because that is how Docker Compose works. It is a common mistake to 
target “localhost”, which stays inside the container.

Nix.

From: Siva Krishna <sivasoe...@gmail.com>
Date: Thursday, January 30, 2025 at 11:07 AM
To: Nikola Milutinovic <n.milutino...@levi9.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Please help with Flink + InfluxDB
Hi Nix,
Thank you for your reply.
I have small update. For trial/error, I have signed in InfluxDB cloud.
This same code is working and data is being save in the cloud.
So, I am guessing there might issue with my 'docker compose', which is
why the data is not being saved in local(docker InfluxDB).
Maybe I need to correct my volumes paths or something?

Warm regards,
Siva Rama Krishna.

On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic
<n.milutino...@levi9.com> wrote:
>
> Hi Siva.
>
>
>
> What is the InfluxDB URL you are using? I assume it is something like 
> “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a 
> problem.
>
>
>
> Do you see a connection on InfluxDB? Anything in the logs? Anything in the 
> logs of Job Mnager?
>
>
>
> Nix,
>
>
>
> From: Siva Krishna <sivasoe...@gmail.com>
> Date: Thursday, January 30, 2025 at 5:12 AM
> To: user@flink.apache.org <user@flink.apache.org>
> Subject: Please help with Flink + InfluxDB
>
> Hi, I am trying to create a data streaming pipeline for real-time analytics.
>
> Kafka -> Flink -> InfluDB -> Visualize
>
> I am facing an issue writing the data to InfluxDB. I can see logs in
> 'taskmanager' after submitting the Job. But, the data is not being
> written to InfluxDB for some reason.
> Please help, I am attaching sample docker setup and basic code.
>
> import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
> import com.influxdb.client.InfluxDBClient
> import com.influxdb.client.InfluxDBClientFactory
> import com.influxdb.client.InfluxDBClientOptions
> import com.influxdb.client.domain.WritePrecision
> import com.influxdb.client.write.Point
> import org.apache.flink.api.common.eventtime.Watermark
> import org.apache.flink.api.connector.sink2.Sink
> import org.apache.flink.api.connector.sink2.SinkWriter
> import org.apache.flink.api.connector.sink2.WriterInitContext
> import java.time.Instant
>
> class InfluxDBSink : Sink<String> {
>     override fun createWriter(p0: WriterInitContext?): SinkWriter<String> {
>         return InfluxDBSinkWriter()
>     }
> }
>
> // Implement the SinkWriter that writes EventData to InfluxDB
> class InfluxDBSinkWriter : SinkWriter<String> {
>     private val mapper = jacksonObjectMapper()
>
>     // Initialize the InfluxDB client
>     private val influxDBClient = initializeInfluxDBClient()
> //    private val writeApi = influxDBClient.makeWriteApi()
>
>     // Write data to InfluxDB
>     override fun write(element: String, context: SinkWriter.Context) {
>
>         val event = mapper.readValue(element, EventData::class.java)
>         println("eventData: $event")
>
>         try {
>             val connectable = influxDBClient.ping()
>             val ready = influxDBClient.ready()
>             val influxStr = influxDBClient.toString()
>             println("influxStr: $connectable -> $ready -> $influxStr")
>         } catch (e: Exception) {
>             println(e.message)
>         }
>
>         val point = Point("dummy_events")
>             .addTag("source", "flink")
>             .addField("timestamp", event.timestamp)
>             .addField("value", event.value)
>             .time(Instant.now().toEpochMilli(), WritePrecision.MS)
> //        println("Writing point to InfluxDB: $point,
> ${point.toLineProtocol()}")
>
> //        writeApi.writePoint(point)
> //        writeApi.flush()
>
>         influxDBClient.writeApiBlocking.writePoint(point)
> //        println("after influxDBClient writeApiBlocking")
>     }
>
>     // This method will flush any pending writes (can be empty if no
> additional batching needed)
>     override fun flush(endOfInput: Boolean) {
> //        if (endOfInput) writeApi.flush()
>     }
>
>     // Handle any watermarks if required (optional, can be skipped if
> not needed)
>     override fun writeWatermark(watermark: Watermark) {
>     }
>
>     // Close the writer when done
>     override fun close() {
>         influxDBClient.close()
>     }
> }
>
> fun initializeInfluxDBClient(): InfluxDBClient {
>     // Build the InfluxDBClientOptions with all required parameters
>     val influxDBOptions = InfluxDBClientOptions.builder()
>         .url(influxDBUrl)  // InfluxDB URL
>         .authenticate(influxUserName, influxPassword.toCharArray())
> // Username and password
>         .org(influxOrg)  // Organization name
>         .bucket(influxBucket)  // Bucket name
>         .authenticateToken(influxToken.toCharArray())  // Token for
> authentication
>         .build()
>
>     // Create the InfluxDB client using the built options
>     return InfluxDBClientFactory.create(influxDBOptions)
> }
>
>
> Main.kt
> fun main() {
>    // Create KafkaSource with properties
>     val kafkaSource = KafkaSource.builder<String>()
>         .setBootstrapServers("kafka:9092")
>         .setTopics("dummy-events") // Kafka topic
>         .setGroupId("flink-group")
>         .setStartingOffsets(OffsetsInitializer.earliest())  // Start
> from the latest message
>         .setValueOnlyDeserializer(SimpleStringSchema())  //
> Deserialize string values
>         .setProperties(Properties().apply {
>             setProperty("auto.offset.reset", "earliest")
>         })
>         .build()
>
>     // Create a WatermarkStrategy for event time processing
>     val watermarkStrategy = WatermarkStrategy.noWatermarks<String>()
>
>     // Set up the execution environment
>     val env = StreamExecutionEnvironment.getExecutionEnvironment()
>
>     // Add KafkaSource as the input stream, provide watermark strategy
> and a source name
>     val stream = env.fromSource(
>         kafkaSource,
>         watermarkStrategy,
>         "KafkaSource"
>     )
>
>     stream.sinkTo(InfluxDBSink())
>
>     // Execute the job
>     env.execute("Flink Kafka Integration")
> }
>
> But below curl is working.
> curl --request POST
> "http://localhost:8086/api/v2/write?org=org&bucket=sample_data&precision=ms";
> \
>   --header "Authorization: Token
> wvd-8VqKyz6YodOSyVPU17oSOobKqwJDHf0GtFzd9-1DuzFyF3l1xlDGt2R062CI8Jpg=="
> \
>   --header "Content-Type: text/plain; charset=utf-8" \
>   --data-binary "dummy_events,source=flink value=42.0 1738142927000"
>
>
>
> Please check and reply is I am doing anything wrong.
>
> Thank you.

Reply via email to