Hi. First of all, can you verify your Docker compose InfluxDB is working, is it accepting connections?
Can you connect to it from outside Docker Compose? When connecting from Flink, I hope you are referencing the service name as a host, because that is how Docker Compose works. It is a common mistake to target “localhost”, which stays inside the container. Nix. From: Siva Krishna <sivasoe...@gmail.com> Date: Thursday, January 30, 2025 at 11:07 AM To: Nikola Milutinovic <n.milutino...@levi9.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: Please help with Flink + InfluxDB Hi Nix, Thank you for your reply. I have small update. For trial/error, I have signed in InfluxDB cloud. This same code is working and data is being save in the cloud. So, I am guessing there might issue with my 'docker compose', which is why the data is not being saved in local(docker InfluxDB). Maybe I need to correct my volumes paths or something? Warm regards, Siva Rama Krishna. On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic <n.milutino...@levi9.com> wrote: > > Hi Siva. > > > > What is the InfluxDB URL you are using? I assume it is something like > “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a > problem. > > > > Do you see a connection on InfluxDB? Anything in the logs? Anything in the > logs of Job Mnager? > > > > Nix, > > > > From: Siva Krishna <sivasoe...@gmail.com> > Date: Thursday, January 30, 2025 at 5:12 AM > To: user@flink.apache.org <user@flink.apache.org> > Subject: Please help with Flink + InfluxDB > > Hi, I am trying to create a data streaming pipeline for real-time analytics. > > Kafka -> Flink -> InfluDB -> Visualize > > I am facing an issue writing the data to InfluxDB. I can see logs in > 'taskmanager' after submitting the Job. But, the data is not being > written to InfluxDB for some reason. > Please help, I am attaching sample docker setup and basic code. > > import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper > import com.influxdb.client.InfluxDBClient > import com.influxdb.client.InfluxDBClientFactory > import com.influxdb.client.InfluxDBClientOptions > import com.influxdb.client.domain.WritePrecision > import com.influxdb.client.write.Point > import org.apache.flink.api.common.eventtime.Watermark > import org.apache.flink.api.connector.sink2.Sink > import org.apache.flink.api.connector.sink2.SinkWriter > import org.apache.flink.api.connector.sink2.WriterInitContext > import java.time.Instant > > class InfluxDBSink : Sink<String> { > override fun createWriter(p0: WriterInitContext?): SinkWriter<String> { > return InfluxDBSinkWriter() > } > } > > // Implement the SinkWriter that writes EventData to InfluxDB > class InfluxDBSinkWriter : SinkWriter<String> { > private val mapper = jacksonObjectMapper() > > // Initialize the InfluxDB client > private val influxDBClient = initializeInfluxDBClient() > // private val writeApi = influxDBClient.makeWriteApi() > > // Write data to InfluxDB > override fun write(element: String, context: SinkWriter.Context) { > > val event = mapper.readValue(element, EventData::class.java) > println("eventData: $event") > > try { > val connectable = influxDBClient.ping() > val ready = influxDBClient.ready() > val influxStr = influxDBClient.toString() > println("influxStr: $connectable -> $ready -> $influxStr") > } catch (e: Exception) { > println(e.message) > } > > val point = Point("dummy_events") > .addTag("source", "flink") > .addField("timestamp", event.timestamp) > .addField("value", event.value) > .time(Instant.now().toEpochMilli(), WritePrecision.MS) > // println("Writing point to InfluxDB: $point, > ${point.toLineProtocol()}") > > // writeApi.writePoint(point) > // writeApi.flush() > > influxDBClient.writeApiBlocking.writePoint(point) > // println("after influxDBClient writeApiBlocking") > } > > // This method will flush any pending writes (can be empty if no > additional batching needed) > override fun flush(endOfInput: Boolean) { > // if (endOfInput) writeApi.flush() > } > > // Handle any watermarks if required (optional, can be skipped if > not needed) > override fun writeWatermark(watermark: Watermark) { > } > > // Close the writer when done > override fun close() { > influxDBClient.close() > } > } > > fun initializeInfluxDBClient(): InfluxDBClient { > // Build the InfluxDBClientOptions with all required parameters > val influxDBOptions = InfluxDBClientOptions.builder() > .url(influxDBUrl) // InfluxDB URL > .authenticate(influxUserName, influxPassword.toCharArray()) > // Username and password > .org(influxOrg) // Organization name > .bucket(influxBucket) // Bucket name > .authenticateToken(influxToken.toCharArray()) // Token for > authentication > .build() > > // Create the InfluxDB client using the built options > return InfluxDBClientFactory.create(influxDBOptions) > } > > > Main.kt > fun main() { > // Create KafkaSource with properties > val kafkaSource = KafkaSource.builder<String>() > .setBootstrapServers("kafka:9092") > .setTopics("dummy-events") // Kafka topic > .setGroupId("flink-group") > .setStartingOffsets(OffsetsInitializer.earliest()) // Start > from the latest message > .setValueOnlyDeserializer(SimpleStringSchema()) // > Deserialize string values > .setProperties(Properties().apply { > setProperty("auto.offset.reset", "earliest") > }) > .build() > > // Create a WatermarkStrategy for event time processing > val watermarkStrategy = WatermarkStrategy.noWatermarks<String>() > > // Set up the execution environment > val env = StreamExecutionEnvironment.getExecutionEnvironment() > > // Add KafkaSource as the input stream, provide watermark strategy > and a source name > val stream = env.fromSource( > kafkaSource, > watermarkStrategy, > "KafkaSource" > ) > > stream.sinkTo(InfluxDBSink()) > > // Execute the job > env.execute("Flink Kafka Integration") > } > > But below curl is working. > curl --request POST > "http://localhost:8086/api/v2/write?org=org&bucket=sample_data&precision=ms"; > \ > --header "Authorization: Token > wvd-8VqKyz6YodOSyVPU17oSOobKqwJDHf0GtFzd9-1DuzFyF3l1xlDGt2R062CI8Jpg==" > \ > --header "Content-Type: text/plain; charset=utf-8" \ > --data-binary "dummy_events,source=flink value=42.0 1738142927000" > > > > Please check and reply is I am doing anything wrong. > > Thank you.