Hi Siva.

What is the InfluxDB URL you are using? I assume it is something like 
“influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a 
problem.

Do you see a connection on InfluxDB? Anything in the logs? Anything in the logs 
of Job Mnager?

Nix,

From: Siva Krishna <sivasoe...@gmail.com>
Date: Thursday, January 30, 2025 at 5:12 AM
To: user@flink.apache.org <user@flink.apache.org>
Subject: Please help with Flink + InfluxDB
Hi, I am trying to create a data streaming pipeline for real-time analytics.

Kafka -> Flink -> InfluDB -> Visualize

I am facing an issue writing the data to InfluxDB. I can see logs in
'taskmanager' after submitting the Job. But, the data is not being
written to InfluxDB for some reason.
Please help, I am attaching sample docker setup and basic code.

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.influxdb.client.InfluxDBClient
import com.influxdb.client.InfluxDBClientFactory
import com.influxdb.client.InfluxDBClientOptions
import com.influxdb.client.domain.WritePrecision
import com.influxdb.client.write.Point
import org.apache.flink.api.common.eventtime.Watermark
import org.apache.flink.api.connector.sink2.Sink
import org.apache.flink.api.connector.sink2.SinkWriter
import org.apache.flink.api.connector.sink2.WriterInitContext
import java.time.Instant

class InfluxDBSink : Sink<String> {
    override fun createWriter(p0: WriterInitContext?): SinkWriter<String> {
        return InfluxDBSinkWriter()
    }
}

// Implement the SinkWriter that writes EventData to InfluxDB
class InfluxDBSinkWriter : SinkWriter<String> {
    private val mapper = jacksonObjectMapper()

    // Initialize the InfluxDB client
    private val influxDBClient = initializeInfluxDBClient()
//    private val writeApi = influxDBClient.makeWriteApi()

    // Write data to InfluxDB
    override fun write(element: String, context: SinkWriter.Context) {

        val event = mapper.readValue(element, EventData::class.java)
        println("eventData: $event")

        try {
            val connectable = influxDBClient.ping()
            val ready = influxDBClient.ready()
            val influxStr = influxDBClient.toString()
            println("influxStr: $connectable -> $ready -> $influxStr")
        } catch (e: Exception) {
            println(e.message)
        }

        val point = Point("dummy_events")
            .addTag("source", "flink")
            .addField("timestamp", event.timestamp)
            .addField("value", event.value)
            .time(Instant.now().toEpochMilli(), WritePrecision.MS)
//        println("Writing point to InfluxDB: $point,
${point.toLineProtocol()}")

//        writeApi.writePoint(point)
//        writeApi.flush()

        influxDBClient.writeApiBlocking.writePoint(point)
//        println("after influxDBClient writeApiBlocking")
    }

    // This method will flush any pending writes (can be empty if no
additional batching needed)
    override fun flush(endOfInput: Boolean) {
//        if (endOfInput) writeApi.flush()
    }

    // Handle any watermarks if required (optional, can be skipped if
not needed)
    override fun writeWatermark(watermark: Watermark) {
    }

    // Close the writer when done
    override fun close() {
        influxDBClient.close()
    }
}

fun initializeInfluxDBClient(): InfluxDBClient {
    // Build the InfluxDBClientOptions with all required parameters
    val influxDBOptions = InfluxDBClientOptions.builder()
        .url(influxDBUrl)  // InfluxDB URL
        .authenticate(influxUserName, influxPassword.toCharArray())
// Username and password
        .org(influxOrg)  // Organization name
        .bucket(influxBucket)  // Bucket name
        .authenticateToken(influxToken.toCharArray())  // Token for
authentication
        .build()

    // Create the InfluxDB client using the built options
    return InfluxDBClientFactory.create(influxDBOptions)
}


Main.kt
fun main() {
   // Create KafkaSource with properties
    val kafkaSource = KafkaSource.builder<String>()
        .setBootstrapServers("kafka:9092")
        .setTopics("dummy-events") // Kafka topic
        .setGroupId("flink-group")
        .setStartingOffsets(OffsetsInitializer.earliest())  // Start
from the latest message
        .setValueOnlyDeserializer(SimpleStringSchema())  //
Deserialize string values
        .setProperties(Properties().apply {
            setProperty("auto.offset.reset", "earliest")
        })
        .build()

    // Create a WatermarkStrategy for event time processing
    val watermarkStrategy = WatermarkStrategy.noWatermarks<String>()

    // Set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment()

    // Add KafkaSource as the input stream, provide watermark strategy
and a source name
    val stream = env.fromSource(
        kafkaSource,
        watermarkStrategy,
        "KafkaSource"
    )

    stream.sinkTo(InfluxDBSink())

    // Execute the job
    env.execute("Flink Kafka Integration")
}

But below curl is working.
curl --request POST
"http://localhost:8086/api/v2/write?org=org&bucket=sample_data&precision=ms";
\
  --header "Authorization: Token
wvd-8VqKyz6YodOSyVPU17oSOobKqwJDHf0GtFzd9-1DuzFyF3l1xlDGt2R062CI8Jpg=="
\
  --header "Content-Type: text/plain; charset=utf-8" \
  --data-binary "dummy_events,source=flink value=42.0 1738142927000"



Please check and reply is I am doing anything wrong.

Thank you.

Reply via email to