Hi, I am trying to create a data streaming pipeline for real-time analytics.
Kafka -> Flink -> InfluDB -> Visualize
I am facing an issue writing the data to InfluxDB. I can see logs in
'taskmanager' after submitting the Job. But, the data is not being
written to InfluxDB for some reason.
Please help, I am attaching sample docker setup and basic code.
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.influxdb.client.InfluxDBClient
import com.influxdb.client.InfluxDBClientFactory
import com.influxdb.client.InfluxDBClientOptions
import com.influxdb.client.domain.WritePrecision
import com.influxdb.client.write.Point
import org.apache.flink.api.common.eventtime.Watermark
import org.apache.flink.api.connector.sink2.Sink
import org.apache.flink.api.connector.sink2.SinkWriter
import org.apache.flink.api.connector.sink2.WriterInitContext
import java.time.Instant
class InfluxDBSink : Sink<String> {
override fun createWriter(p0: WriterInitContext?): SinkWriter<String> {
return InfluxDBSinkWriter()
}
}
// Implement the SinkWriter that writes EventData to InfluxDB
class InfluxDBSinkWriter : SinkWriter<String> {
private val mapper = jacksonObjectMapper()
// Initialize the InfluxDB client
private val influxDBClient = initializeInfluxDBClient()
// private val writeApi = influxDBClient.makeWriteApi()
// Write data to InfluxDB
override fun write(element: String, context: SinkWriter.Context) {
val event = mapper.readValue(element, EventData::class.java)
println("eventData: $event")
try {
val connectable = influxDBClient.ping()
val ready = influxDBClient.ready()
val influxStr = influxDBClient.toString()
println("influxStr: $connectable -> $ready -> $influxStr")
} catch (e: Exception) {
println(e.message)
}
val point = Point("dummy_events")
.addTag("source", "flink")
.addField("timestamp", event.timestamp)
.addField("value", event.value)
.time(Instant.now().toEpochMilli(), WritePrecision.MS)
// println("Writing point to InfluxDB: $point,
${point.toLineProtocol()}")
// writeApi.writePoint(point)
// writeApi.flush()
influxDBClient.writeApiBlocking.writePoint(point)
// println("after influxDBClient writeApiBlocking")
}
// This method will flush any pending writes (can be empty if no
additional batching needed)
override fun flush(endOfInput: Boolean) {
// if (endOfInput) writeApi.flush()
}
// Handle any watermarks if required (optional, can be skipped if
not needed)
override fun writeWatermark(watermark: Watermark) {
}
// Close the writer when done
override fun close() {
influxDBClient.close()
}
}
fun initializeInfluxDBClient(): InfluxDBClient {
// Build the InfluxDBClientOptions with all required parameters
val influxDBOptions = InfluxDBClientOptions.builder()
.url(influxDBUrl) // InfluxDB URL
.authenticate(influxUserName, influxPassword.toCharArray())
// Username and password
.org(influxOrg) // Organization name
.bucket(influxBucket) // Bucket name
.authenticateToken(influxToken.toCharArray()) // Token for
authentication
.build()
// Create the InfluxDB client using the built options
return InfluxDBClientFactory.create(influxDBOptions)
}
Main.kt
fun main() {
// Create KafkaSource with properties
val kafkaSource = KafkaSource.builder<String>()
.setBootstrapServers("kafka:9092")
.setTopics("dummy-events") // Kafka topic
.setGroupId("flink-group")
.setStartingOffsets(OffsetsInitializer.earliest()) // Start
from the latest message
.setValueOnlyDeserializer(SimpleStringSchema()) //
Deserialize string values
.setProperties(Properties().apply {
setProperty("auto.offset.reset", "earliest")
})
.build()
// Create a WatermarkStrategy for event time processing
val watermarkStrategy = WatermarkStrategy.noWatermarks<String>()
// Set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment()
// Add KafkaSource as the input stream, provide watermark strategy
and a source name
val stream = env.fromSource(
kafkaSource,
watermarkStrategy,
"KafkaSource"
)
stream.sinkTo(InfluxDBSink())
// Execute the job
env.execute("Flink Kafka Integration")
}
But below curl is working.
curl --request POST
"http://localhost:8086/api/v2/write?org=org&bucket=sample_data&precision=ms"
\
--header "Authorization: Token
wvd-8VqKyz6YodOSyVPU17oSOobKqwJDHf0GtFzd9-1DuzFyF3l1xlDGt2R062CI8Jpg=="
\
--header "Content-Type: text/plain; charset=utf-8" \
--data-binary "dummy_events,source=flink value=42.0 1738142927000"
Please check and reply is I am doing anything wrong.
Thank you.
docker-compose copy.yml
Description: application/yaml
