Hi Siva.

In this case, extensive logging is in order. Now, you should be aware that if 
you want to track progress of data in your pipeline, you need to log statements 
inside your processors, sinks,… On our project we made a small 
LogProcessingFunction, which was just outputting log statement we set on its 
constructor.

So, log, log and log.

Nix.

From: Siva Krishna <sivasoe...@gmail.com>
Date: Friday, January 31, 2025 at 4:41 AM
To: Nikola Milutinovic <n.milutino...@levi9.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Please help with Flink + InfluxDB
Yes Nix.
I can confirm that I am able to connect to InfluxDB from Flink(taskmanager).
URL="http://influxdb:8086";
And also, I tried with below from inside Flink(taskmanager) container
and that also worked.
curl influxdb:8086

Siva.

On Thu, Jan 30, 2025 at 4:50 PM Nikola Milutinovic
<n.milutino...@levi9.com> wrote:
>
> Hi.
>
>
>
> First of all, can you verify your Docker compose InfluxDB is working, is it 
> accepting connections?
>
>
>
> Can you connect to it from outside Docker Compose?
>
>
>
> When connecting from Flink, I hope you are referencing the service name as a 
> host, because that is how Docker Compose works. It is a common mistake to 
> target “localhost”, which stays inside the container.
>
>
>
> Nix.
>
>
>
> From: Siva Krishna <sivasoe...@gmail.com>
> Date: Thursday, January 30, 2025 at 11:07 AM
> To: Nikola Milutinovic <n.milutino...@levi9.com>
> Cc: user@flink.apache.org <user@flink.apache.org>
> Subject: Re: Please help with Flink + InfluxDB
>
> Hi Nix,
> Thank you for your reply.
> I have small update. For trial/error, I have signed in InfluxDB cloud.
> This same code is working and data is being save in the cloud.
> So, I am guessing there might issue with my 'docker compose', which is
> why the data is not being saved in local(docker InfluxDB).
> Maybe I need to correct my volumes paths or something?
>
> Warm regards,
> Siva Rama Krishna.
>
> On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic
> <n.milutino...@levi9.com> wrote:
> >
> > Hi Siva.
> >
> >
> >
> > What is the InfluxDB URL you are using? I assume it is something like 
> > “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a 
> > problem.
> >
> >
> >
> > Do you see a connection on InfluxDB? Anything in the logs? Anything in the 
> > logs of Job Mnager?
> >
> >
> >
> > Nix,
> >
> >
> >
> > From: Siva Krishna <sivasoe...@gmail.com>
> > Date: Thursday, January 30, 2025 at 5:12 AM
> > To: user@flink.apache.org <user@flink.apache.org>
> > Subject: Please help with Flink + InfluxDB
> >
> > Hi, I am trying to create a data streaming pipeline for real-time analytics.
> >
> > Kafka -> Flink -> InfluDB -> Visualize
> >
> > I am facing an issue writing the data to InfluxDB. I can see logs in
> > 'taskmanager' after submitting the Job. But, the data is not being
> > written to InfluxDB for some reason.
> > Please help, I am attaching sample docker setup and basic code.
> >
> > import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
> > import com.influxdb.client.InfluxDBClient
> > import com.influxdb.client.InfluxDBClientFactory
> > import com.influxdb.client.InfluxDBClientOptions
> > import com.influxdb.client.domain.WritePrecision
> > import com.influxdb.client.write.Point
> > import org.apache.flink.api.common.eventtime.Watermark
> > import org.apache.flink.api.connector.sink2.Sink
> > import org.apache.flink.api.connector.sink2.SinkWriter
> > import org.apache.flink.api.connector.sink2.WriterInitContext
> > import java.time.Instant
> >
> > class InfluxDBSink : Sink<String> {
> >     override fun createWriter(p0: WriterInitContext?): SinkWriter<String> {
> >         return InfluxDBSinkWriter()
> >     }
> > }
> >
> > // Implement the SinkWriter that writes EventData to InfluxDB
> > class InfluxDBSinkWriter : SinkWriter<String> {
> >     private val mapper = jacksonObjectMapper()
> >
> >     // Initialize the InfluxDB client
> >     private val influxDBClient = initializeInfluxDBClient()
> > //    private val writeApi = influxDBClient.makeWriteApi()
> >
> >     // Write data to InfluxDB
> >     override fun write(element: String, context: SinkWriter.Context) {
> >
> >         val event = mapper.readValue(element, EventData::class.java)
> >         println("eventData: $event")
> >
> >         try {
> >             val connectable = influxDBClient.ping()
> >             val ready = influxDBClient.ready()
> >             val influxStr = influxDBClient.toString()
> >             println("influxStr: $connectable -> $ready -> $influxStr")
> >         } catch (e: Exception) {
> >             println(e.message)
> >         }
> >
> >         val point = Point("dummy_events")
> >             .addTag("source", "flink")
> >             .addField("timestamp", event.timestamp)
> >             .addField("value", event.value)
> >             .time(Instant.now().toEpochMilli(), WritePrecision.MS)
> > //        println("Writing point to InfluxDB: $point,
> > ${point.toLineProtocol()}")
> >
> > //        writeApi.writePoint(point)
> > //        writeApi.flush()
> >
> >         influxDBClient.writeApiBlocking.writePoint(point)
> > //        println("after influxDBClient writeApiBlocking")
> >     }
> >
> >     // This method will flush any pending writes (can be empty if no
> > additional batching needed)
> >     override fun flush(endOfInput: Boolean) {
> > //        if (endOfInput) writeApi.flush()
> >     }
> >
> >     // Handle any watermarks if required (optional, can be skipped if
> > not needed)
> >     override fun writeWatermark(watermark: Watermark) {
> >     }
> >
> >     // Close the writer when done
> >     override fun close() {
> >         influxDBClient.close()
> >     }
> > }
> >
> > fun initializeInfluxDBClient(): InfluxDBClient {
> >     // Build the InfluxDBClientOptions with all required parameters
> >     val influxDBOptions = InfluxDBClientOptions.builder()
> >         .url(influxDBUrl)  // InfluxDB URL
> >         .authenticate(influxUserName, influxPassword.toCharArray())
> > // Username and password
> >         .org(influxOrg)  // Organization name
> >         .bucket(influxBucket)  // Bucket name
> >         .authenticateToken(influxToken.toCharArray())  // Token for
> > authentication
> >         .build()
> >
> >     // Create the InfluxDB client using the built options
> >     return InfluxDBClientFactory.create(influxDBOptions)
> > }
> >
> >
> > Main.kt
> > fun main() {
> >    // Create KafkaSource with properties
> >     val kafkaSource = KafkaSource.builder<String>()
> >         .setBootstrapServers("kafka:9092")
> >         .setTopics("dummy-events") // Kafka topic
> >         .setGroupId("flink-group")
> >         .setStartingOffsets(OffsetsInitializer.earliest())  // Start
> > from the latest message
> >         .setValueOnlyDeserializer(SimpleStringSchema())  //
> > Deserialize string values
> >         .setProperties(Properties().apply {
> >             setProperty("auto.offset.reset", "earliest")
> >         })
> >         .build()
> >
> >     // Create a WatermarkStrategy for event time processing
> >     val watermarkStrategy = WatermarkStrategy.noWatermarks<String>()
> >
> >     // Set up the execution environment
> >     val env = StreamExecutionEnvironment.getExecutionEnvironment()
> >
> >     // Add KafkaSource as the input stream, provide watermark strategy
> > and a source name
> >     val stream = env.fromSource(
> >         kafkaSource,
> >         watermarkStrategy,
> >         "KafkaSource"
> >     )
> >
> >     stream.sinkTo(InfluxDBSink())
> >
> >     // Execute the job
> >     env.execute("Flink Kafka Integration")
> > }
> >
> > But below curl is working.
> > curl --request POST
> > "http://localhost:8086/api/v2/write?org=org&bucket=sample_data&precision=ms";
> > \
> >   --header "Authorization: Token
> > wvd-8VqKyz6YodOSyVPU17oSOobKqwJDHf0GtFzd9-1DuzFyF3l1xlDGt2R062CI8Jpg=="
> > \
> >   --header "Content-Type: text/plain; charset=utf-8" \
> >   --data-binary "dummy_events,source=flink value=42.0 1738142927000"
> >
> >
> >
> > Please check and reply is I am doing anything wrong.
> >
> > Thank you.

Reply via email to