Hi Nix,
I tried the python approach and successfully ran it with command in docker,
and was able to save data in InfluxDB cloud.

*Running python code, working*
python3 main.py

*Below not working,*
/opt/flink/bin/flink run -py /opt/flink/flink_job/main.py
error:
\Traceback (most recent call last):
  File "/opt/flink/flink_job/main.py", line 73, in <module>
    main()
  File "/opt/flink/flink_job/main.py", line 70, in main
    env.execute("Kafka to InfluxDB Stream")
  File
"/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
line 824, in execute
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py",
line 1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line
146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o8.execute.
: java.net.MalformedURLException: no protocol:
['file:/opt/flink/opt/flink-python-1.20.0.jar']


*Tried, ant not working,*
/opt/flink/bin/flink run -py /opt/flink/flink_job/main.py -j
file:///opt/flink/opt/flink-python-1.20.0.jar
error:
Could not get job jar and dependencies from JAR file: JAR file does not
exist: file:/opt/flink/opt/flink-python-1.20.0.jar


*even though the file is in location,*
# ls /opt/flink/opt/
flink-azure-fs-hadoop-1.20.0.jar  flink-python-1.20.0.jar
flink-shaded-netty-tcnative-dynamic-2.0.59.Final-17.0.jar
 flink-table-planner_2.12-1.20.0.jar


Please help.


Thanking you,
Siva Rama Krishna





On Fri, Jan 31, 2025 at 2:16 PM Nikola Milutinovic <n.milutino...@levi9.com>
wrote:

> Hi Siva.
>
>
>
> In this case, extensive logging is in order. Now, you should be aware that
> if you want to track progress of data in your pipeline, you need to log
> statements inside your processors, sinks,… On our project we made a small
> LogProcessingFunction, which was just outputting log statement we set on
> its constructor.
>
>
>
> So, log, log and log.
>
>
>
> Nix.
>
>
>
> *From: *Siva Krishna <sivasoe...@gmail.com>
> *Date: *Friday, January 31, 2025 at 4:41 AM
> *To: *Nikola Milutinovic <n.milutino...@levi9.com>
> *Cc: *user@flink.apache.org <user@flink.apache.org>
> *Subject: *Re: Please help with Flink + InfluxDB
>
> Yes Nix.
> I can confirm that I am able to connect to InfluxDB from
> Flink(taskmanager).
> URL="http://influxdb:8086";
> And also, I tried with below from inside Flink(taskmanager) container
> and that also worked.
> curl influxdb:8086
>
> Siva.
>
> On Thu, Jan 30, 2025 at 4:50 PM Nikola Milutinovic
> <n.milutino...@levi9.com> wrote:
> >
> > Hi.
> >
> >
> >
> > First of all, can you verify your Docker compose InfluxDB is working, is
> it accepting connections?
> >
> >
> >
> > Can you connect to it from outside Docker Compose?
> >
> >
> >
> > When connecting from Flink, I hope you are referencing the service name
> as a host, because that is how Docker Compose works. It is a common mistake
> to target “localhost”, which stays inside the container.
> >
> >
> >
> > Nix.
> >
> >
> >
> > From: Siva Krishna <sivasoe...@gmail.com>
> > Date: Thursday, January 30, 2025 at 11:07 AM
> > To: Nikola Milutinovic <n.milutino...@levi9.com>
> > Cc: user@flink.apache.org <user@flink.apache.org>
> > Subject: Re: Please help with Flink + InfluxDB
> >
> > Hi Nix,
> > Thank you for your reply.
> > I have small update. For trial/error, I have signed in InfluxDB cloud.
> > This same code is working and data is being save in the cloud.
> > So, I am guessing there might issue with my 'docker compose', which is
> > why the data is not being saved in local(docker InfluxDB).
> > Maybe I need to correct my volumes paths or something?
> >
> > Warm regards,
> > Siva Rama Krishna.
> >
> > On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic
> > <n.milutino...@levi9.com> wrote:
> > >
> > > Hi Siva.
> > >
> > >
> > >
> > > What is the InfluxDB URL you are using? I assume it is something like
> “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a
> problem.
> > >
> > >
> > >
> > > Do you see a connection on InfluxDB? Anything in the logs? Anything in
> the logs of Job Mnager?
> > >
> > >
> > >
> > > Nix,
> > >
> > >
> > >
> > > From: Siva Krishna <sivasoe...@gmail.com>
> > > Date: Thursday, January 30, 2025 at 5:12 AM
> > > To: user@flink.apache.org <user@flink.apache.org>
> > > Subject: Please help with Flink + InfluxDB
> > >
> > > Hi, I am trying to create a data streaming pipeline for real-time
> analytics.
> > >
> > > Kafka -> Flink -> InfluDB -> Visualize
> > >
> > > I am facing an issue writing the data to InfluxDB. I can see logs in
> > > 'taskmanager' after submitting the Job. But, the data is not being
> > > written to InfluxDB for some reason.
> > > Please help, I am attaching sample docker setup and basic code.
> > >
> > > import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
> > > import com.influxdb.client.InfluxDBClient
> > > import com.influxdb.client.InfluxDBClientFactory
> > > import com.influxdb.client.InfluxDBClientOptions
> > > import com.influxdb.client.domain.WritePrecision
> > > import com.influxdb.client.write.Point
> > > import org.apache.flink.api.common.eventtime.Watermark
> > > import org.apache.flink.api.connector.sink2.Sink
> > > import org.apache.flink.api.connector.sink2.SinkWriter
> > > import org.apache.flink.api.connector.sink2.WriterInitContext
> > > import java.time.Instant
> > >
> > > class InfluxDBSink : Sink<String> {
> > >     override fun createWriter(p0: WriterInitContext?):
> SinkWriter<String> {
> > >         return InfluxDBSinkWriter()
> > >     }
> > > }
> > >
> > > // Implement the SinkWriter that writes EventData to InfluxDB
> > > class InfluxDBSinkWriter : SinkWriter<String> {
> > >     private val mapper = jacksonObjectMapper()
> > >
> > >     // Initialize the InfluxDB client
> > >     private val influxDBClient = initializeInfluxDBClient()
> > > //    private val writeApi = influxDBClient.makeWriteApi()
> > >
> > >     // Write data to InfluxDB
> > >     override fun write(element: String, context: SinkWriter.Context) {
> > >
> > >         val event = mapper.readValue(element, EventData::class.java)
> > >         println("eventData: $event")
> > >
> > >         try {
> > >             val connectable = influxDBClient.ping()
> > >             val ready = influxDBClient.ready()
> > >             val influxStr = influxDBClient.toString()
> > >             println("influxStr: $connectable -> $ready -> $influxStr")
> > >         } catch (e: Exception) {
> > >             println(e.message)
> > >         }
> > >
> > >         val point = Point("dummy_events")
> > >             .addTag("source", "flink")
> > >             .addField("timestamp", event.timestamp)
> > >             .addField("value", event.value)
> > >             .time(Instant.now().toEpochMilli(), WritePrecision.MS)
> > > //        println("Writing point to InfluxDB: $point,
> > > ${point.toLineProtocol()}")
> > >
> > > //        writeApi.writePoint(point)
> > > //        writeApi.flush()
> > >
> > >         influxDBClient.writeApiBlocking.writePoint(point)
> > > //        println("after influxDBClient writeApiBlocking")
> > >     }
> > >
> > >     // This method will flush any pending writes (can be empty if no
> > > additional batching needed)
> > >     override fun flush(endOfInput: Boolean) {
> > > //        if (endOfInput) writeApi.flush()
> > >     }
> > >
> > >     // Handle any watermarks if required (optional, can be skipped if
> > > not needed)
> > >     override fun writeWatermark(watermark: Watermark) {
> > >     }
> > >
> > >     // Close the writer when done
> > >     override fun close() {
> > >         influxDBClient.close()
> > >     }
> > > }
> > >
> > > fun initializeInfluxDBClient(): InfluxDBClient {
> > >     // Build the InfluxDBClientOptions with all required parameters
> > >     val influxDBOptions = InfluxDBClientOptions.builder()
> > >         .url(influxDBUrl)  // InfluxDB URL
> > >         .authenticate(influxUserName, influxPassword.toCharArray())
> > > // Username and password
> > >         .org(influxOrg)  // Organization name
> > >         .bucket(influxBucket)  // Bucket name
> > >         .authenticateToken(influxToken.toCharArray())  // Token for
> > > authentication
> > >         .build()
> > >
> > >     // Create the InfluxDB client using the built options
> > >     return InfluxDBClientFactory.create(influxDBOptions)
> > > }
> > >
> > >
> > > Main.kt
> > > fun main() {
> > >    // Create KafkaSource with properties
> > >     val kafkaSource = KafkaSource.builder<String>()
> > >         .setBootstrapServers("kafka:9092")
> > >         .setTopics("dummy-events") // Kafka topic
> > >         .setGroupId("flink-group")
> > >         .setStartingOffsets(OffsetsInitializer.earliest())  // Start
> > > from the latest message
> > >         .setValueOnlyDeserializer(SimpleStringSchema())  //
> > > Deserialize string values
> > >         .setProperties(Properties().apply {
> > >             setProperty("auto.offset.reset", "earliest")
> > >         })
> > >         .build()
> > >
> > >     // Create a WatermarkStrategy for event time processing
> > >     val watermarkStrategy = WatermarkStrategy.noWatermarks<String>()
> > >
> > >     // Set up the execution environment
> > >     val env = StreamExecutionEnvironment.getExecutionEnvironment()
> > >
> > >     // Add KafkaSource as the input stream, provide watermark strategy
> > > and a source name
> > >     val stream = env.fromSource(
> > >         kafkaSource,
> > >         watermarkStrategy,
> > >         "KafkaSource"
> > >     )
> > >
> > >     stream.sinkTo(InfluxDBSink())
> > >
> > >     // Execute the job
> > >     env.execute("Flink Kafka Integration")
> > > }
> > >
> > > But below curl is working.
> > > curl --request POST
> > > "
> http://localhost:8086/api/v2/write?org=org&bucket=sample_data&precision=ms
> "
> > > \
> > >   --header "Authorization: Token
> > > wvd-8VqKyz6YodOSyVPU17oSOobKqwJDHf0GtFzd9-1DuzFyF3l1xlDGt2R062CI8Jpg=="
> > > \
> > >   --header "Content-Type: text/plain; charset=utf-8" \
> > >   --data-binary "dummy_events,source=flink value=42.0 1738142927000"
> > >
> > >
> > >
> > > Please check and reply is I am doing anything wrong.
> > >
> > > Thank you.
>

Reply via email to