Hi Siva. What is the InfluxDB URL you are using? I assume it is something like “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a problem.
Do you see a connection on InfluxDB? Anything in the logs? Anything in the logs of Job Mnager? Nix, From: Siva Krishna <sivasoe...@gmail.com> Date: Thursday, January 30, 2025 at 5:12 AM To: user@flink.apache.org <user@flink.apache.org> Subject: Please help with Flink + InfluxDB Hi, I am trying to create a data streaming pipeline for real-time analytics. Kafka -> Flink -> InfluDB -> Visualize I am facing an issue writing the data to InfluxDB. I can see logs in 'taskmanager' after submitting the Job. But, the data is not being written to InfluxDB for some reason. Please help, I am attaching sample docker setup and basic code. import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.influxdb.client.InfluxDBClient import com.influxdb.client.InfluxDBClientFactory import com.influxdb.client.InfluxDBClientOptions import com.influxdb.client.domain.WritePrecision import com.influxdb.client.write.Point import org.apache.flink.api.common.eventtime.Watermark import org.apache.flink.api.connector.sink2.Sink import org.apache.flink.api.connector.sink2.SinkWriter import org.apache.flink.api.connector.sink2.WriterInitContext import java.time.Instant class InfluxDBSink : Sink<String> { override fun createWriter(p0: WriterInitContext?): SinkWriter<String> { return InfluxDBSinkWriter() } } // Implement the SinkWriter that writes EventData to InfluxDB class InfluxDBSinkWriter : SinkWriter<String> { private val mapper = jacksonObjectMapper() // Initialize the InfluxDB client private val influxDBClient = initializeInfluxDBClient() // private val writeApi = influxDBClient.makeWriteApi() // Write data to InfluxDB override fun write(element: String, context: SinkWriter.Context) { val event = mapper.readValue(element, EventData::class.java) println("eventData: $event") try { val connectable = influxDBClient.ping() val ready = influxDBClient.ready() val influxStr = influxDBClient.toString() println("influxStr: $connectable -> $ready -> $influxStr") } catch (e: Exception) { println(e.message) } val point = Point("dummy_events") .addTag("source", "flink") .addField("timestamp", event.timestamp) .addField("value", event.value) .time(Instant.now().toEpochMilli(), WritePrecision.MS) // println("Writing point to InfluxDB: $point, ${point.toLineProtocol()}") // writeApi.writePoint(point) // writeApi.flush() influxDBClient.writeApiBlocking.writePoint(point) // println("after influxDBClient writeApiBlocking") } // This method will flush any pending writes (can be empty if no additional batching needed) override fun flush(endOfInput: Boolean) { // if (endOfInput) writeApi.flush() } // Handle any watermarks if required (optional, can be skipped if not needed) override fun writeWatermark(watermark: Watermark) { } // Close the writer when done override fun close() { influxDBClient.close() } } fun initializeInfluxDBClient(): InfluxDBClient { // Build the InfluxDBClientOptions with all required parameters val influxDBOptions = InfluxDBClientOptions.builder() .url(influxDBUrl) // InfluxDB URL .authenticate(influxUserName, influxPassword.toCharArray()) // Username and password .org(influxOrg) // Organization name .bucket(influxBucket) // Bucket name .authenticateToken(influxToken.toCharArray()) // Token for authentication .build() // Create the InfluxDB client using the built options return InfluxDBClientFactory.create(influxDBOptions) } Main.kt fun main() { // Create KafkaSource with properties val kafkaSource = KafkaSource.builder<String>() .setBootstrapServers("kafka:9092") .setTopics("dummy-events") // Kafka topic .setGroupId("flink-group") .setStartingOffsets(OffsetsInitializer.earliest()) // Start from the latest message .setValueOnlyDeserializer(SimpleStringSchema()) // Deserialize string values .setProperties(Properties().apply { setProperty("auto.offset.reset", "earliest") }) .build() // Create a WatermarkStrategy for event time processing val watermarkStrategy = WatermarkStrategy.noWatermarks<String>() // Set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment() // Add KafkaSource as the input stream, provide watermark strategy and a source name val stream = env.fromSource( kafkaSource, watermarkStrategy, "KafkaSource" ) stream.sinkTo(InfluxDBSink()) // Execute the job env.execute("Flink Kafka Integration") } But below curl is working. curl --request POST "http://localhost:8086/api/v2/write?org=org&bucket=sample_data&precision=ms" \ --header "Authorization: Token wvd-8VqKyz6YodOSyVPU17oSOobKqwJDHf0GtFzd9-1DuzFyF3l1xlDGt2R062CI8Jpg==" \ --header "Content-Type: text/plain; charset=utf-8" \ --data-binary "dummy_events,source=flink value=42.0 1738142927000" Please check and reply is I am doing anything wrong. Thank you.