OK, you are running into some Flink weirdness here. And it is with PyFlink and 
how it runs.

What machine are you trying to run the job from? We were, initially, launching 
our jobs from the Job Manager. It is not recommended for various reasons, but 
never mind. Later, we were using a custom Docker image for deployments.

What you need to understand is that there are 2 stages to job running: Flink 
client and Flink Job Manager. Flink client will execute the code for building 
job graph and submit the graph to JM. JM will figure out how many Task Managers 
it needs and deploy tasks to them.

The error you are seeing is complaining on the URI for targeting PyFlink JAR. 
Not sure what is wrong with it (yes, I know it should be file:///opt/flink..., 
but I’ve seen both variants). In any case, it is not your fault. At least, not 
directly.

Anyway, I can confirm that we have been successfully launching PyFlink jobs 
from JM, in Docker Compose.

Nix,.

From: Siva Krishna <sivasoe...@gmail.com>
Date: Thursday, February 13, 2025 at 5:20 AM
To: Nikola Milutinovic <n.milutino...@levi9.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: Please help with Flink + InfluxDB
Hi Nix,
I tried the python approach and successfully ran it with command in docker, and 
was able to save data in InfluxDB cloud.

Running python code, working
python3 main.py

Below not working,
/opt/flink/bin/flink run -py /opt/flink/flink_job/main.py
error:
\Traceback (most recent call last):
  File "/opt/flink/flink_job/main.py", line 73, in <module>
    main()
  File "/opt/flink/flink_job/main.py", line 70, in main
    env.execute("Kafka to InfluxDB Stream")
  File 
"/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py",
 line 824, in execute
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 
1322, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 
146, in deco
  File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 
326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o8.execute.
: java.net.MalformedURLException: no protocol: 
['file:/opt/flink/opt/flink-python-1.20.0.jar']


Tried, ant not working,
/opt/flink/bin/flink run -py /opt/flink/flink_job/main.py -j 
file:///opt/flink/opt/flink-python-1.20.0.jar
error:
Could not get job jar and dependencies from JAR file: JAR file does not exist: 
file:/opt/flink/opt/flink-python-1.20.0.jar


even though the file is in location,
# ls /opt/flink/opt/
flink-azure-fs-hadoop-1.20.0.jar  flink-python-1.20.0.jar                   
flink-shaded-netty-tcnative-dynamic-2.0.59.Final-17.0.jar  
flink-table-planner_2.12-1.20.0.jar


Please help.


Thanking you,
Siva Rama Krishna




On Fri, Jan 31, 2025 at 2:16 PM Nikola Milutinovic 
<n.milutino...@levi9.com<mailto:n.milutino...@levi9.com>> wrote:
Hi Siva.

In this case, extensive logging is in order. Now, you should be aware that if 
you want to track progress of data in your pipeline, you need to log statements 
inside your processors, sinks,… On our project we made a small 
LogProcessingFunction, which was just outputting log statement we set on its 
constructor.

So, log, log and log.

Nix.

From: Siva Krishna <sivasoe...@gmail.com<mailto:sivasoe...@gmail.com>>
Date: Friday, January 31, 2025 at 4:41 AM
To: Nikola Milutinovic <n.milutino...@levi9.com<mailto:n.milutino...@levi9.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Please help with Flink + InfluxDB
Yes Nix.
I can confirm that I am able to connect to InfluxDB from Flink(taskmanager).
URL="http://influxdb:8086";
And also, I tried with below from inside Flink(taskmanager) container
and that also worked.
curl influxdb:8086

Siva.

On Thu, Jan 30, 2025 at 4:50 PM Nikola Milutinovic
<n.milutino...@levi9.com<mailto:n.milutino...@levi9.com>> wrote:
>
> Hi.
>
>
>
> First of all, can you verify your Docker compose InfluxDB is working, is it 
> accepting connections?
>
>
>
> Can you connect to it from outside Docker Compose?
>
>
>
> When connecting from Flink, I hope you are referencing the service name as a 
> host, because that is how Docker Compose works. It is a common mistake to 
> target “localhost”, which stays inside the container.
>
>
>
> Nix.
>
>
>
> From: Siva Krishna <sivasoe...@gmail.com<mailto:sivasoe...@gmail.com>>
> Date: Thursday, January 30, 2025 at 11:07 AM
> To: Nikola Milutinovic 
> <n.milutino...@levi9.com<mailto:n.milutino...@levi9.com>>
> Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
> <user@flink.apache.org<mailto:user@flink.apache.org>>
> Subject: Re: Please help with Flink + InfluxDB
>
> Hi Nix,
> Thank you for your reply.
> I have small update. For trial/error, I have signed in InfluxDB cloud.
> This same code is working and data is being save in the cloud.
> So, I am guessing there might issue with my 'docker compose', which is
> why the data is not being saved in local(docker InfluxDB).
> Maybe I need to correct my volumes paths or something?
>
> Warm regards,
> Siva Rama Krishna.
>
> On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic
> <n.milutino...@levi9.com<mailto:n.milutino...@levi9.com>> wrote:
> >
> > Hi Siva.
> >
> >
> >
> > What is the InfluxDB URL you are using? I assume it is something like 
> > “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a 
> > problem.
> >
> >
> >
> > Do you see a connection on InfluxDB? Anything in the logs? Anything in the 
> > logs of Job Mnager?
> >
> >
> >
> > Nix,
> >
> >
> >
> > From: Siva Krishna <sivasoe...@gmail.com<mailto:sivasoe...@gmail.com>>
> > Date: Thursday, January 30, 2025 at 5:12 AM
> > To: user@flink.apache.org<mailto:user@flink.apache.org> 
> > <user@flink.apache.org<mailto:user@flink.apache.org>>
> > Subject: Please help with Flink + InfluxDB
> >
> > Hi, I am trying to create a data streaming pipeline for real-time analytics.
> >
> > Kafka -> Flink -> InfluDB -> Visualize
> >
> > I am facing an issue writing the data to InfluxDB. I can see logs in
> > 'taskmanager' after submitting the Job. But, the data is not being
> > written to InfluxDB for some reason.
> > Please help, I am attaching sample docker setup and basic code.
> >
> > import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
> > import com.influxdb.client.InfluxDBClient
> > import com.influxdb.client.InfluxDBClientFactory
> > import com.influxdb.client.InfluxDBClientOptions
> > import com.influxdb.client.domain.WritePrecision
> > import com.influxdb.client.write.Point
> > import org.apache.flink.api.common.eventtime.Watermark
> > import org.apache.flink.api.connector.sink2.Sink
> > import org.apache.flink.api.connector.sink2.SinkWriter
> > import org.apache.flink.api.connector.sink2.WriterInitContext
> > import java.time.Instant
> >
> > class InfluxDBSink : Sink<String> {
> >     override fun createWriter(p0: WriterInitContext?): SinkWriter<String> {
> >         return InfluxDBSinkWriter()
> >     }
> > }
> >
> > // Implement the SinkWriter that writes EventData to InfluxDB
> > class InfluxDBSinkWriter : SinkWriter<String> {
> >     private val mapper = jacksonObjectMapper()
> >
> >     // Initialize the InfluxDB client
> >     private val influxDBClient = initializeInfluxDBClient()
> > //    private val writeApi = influxDBClient.makeWriteApi()
> >
> >     // Write data to InfluxDB
> >     override fun write(element: String, context: SinkWriter.Context) {
> >
> >         val event = mapper.readValue(element, EventData::class.java)
> >         println("eventData: $event")
> >
> >         try {
> >             val connectable = influxDBClient.ping()
> >             val ready = influxDBClient.ready()
> >             val influxStr = influxDBClient.toString()
> >             println("influxStr: $connectable -> $ready -> $influxStr")
> >         } catch (e: Exception) {
> >             println(e.message)
> >         }
> >
> >         val point = Point("dummy_events")
> >             .addTag("source", "flink")
> >             .addField("timestamp", event.timestamp)
> >             .addField("value", event.value)
> >             .time(Instant.now().toEpochMilli(), WritePrecision.MS)
> > //        println("Writing point to InfluxDB: $point,
> > ${point.toLineProtocol()}")
> >
> > //        writeApi.writePoint(point)
> > //        writeApi.flush()
> >
> >         influxDBClient.writeApiBlocking.writePoint(point)
> > //        println("after influxDBClient writeApiBlocking")
> >     }
> >
> >     // This method will flush any pending writes (can be empty if no
> > additional batching needed)
> >     override fun flush(endOfInput: Boolean) {
> > //        if (endOfInput) writeApi.flush()
> >     }
> >
> >     // Handle any watermarks if required (optional, can be skipped if
> > not needed)
> >     override fun writeWatermark(watermark: Watermark) {
> >     }
> >
> >     // Close the writer when done
> >     override fun close() {
> >         influxDBClient.close()
> >     }
> > }
> >
> > fun initializeInfluxDBClient(): InfluxDBClient {
> >     // Build the InfluxDBClientOptions with all required parameters
> >     val influxDBOptions = InfluxDBClientOptions.builder()
> >         .url(influxDBUrl)  // InfluxDB URL
> >         .authenticate(influxUserName, influxPassword.toCharArray())
> > // Username and password
> >         .org(influxOrg)  // Organization name
> >         .bucket(influxBucket)  // Bucket name
> >         .authenticateToken(influxToken.toCharArray())  // Token for
> > authentication
> >         .build()
> >
> >     // Create the InfluxDB client using the built options
> >     return InfluxDBClientFactory.create(influxDBOptions)
> > }
> >
> >
> > Main.kt
> > fun main() {
> >    // Create KafkaSource with properties
> >     val kafkaSource = KafkaSource.builder<String>()
> >         .setBootstrapServers("kafka:9092")
> >         .setTopics("dummy-events") // Kafka topic
> >         .setGroupId("flink-group")
> >         .setStartingOffsets(OffsetsInitializer.earliest())  // Start
> > from the latest message
> >         .setValueOnlyDeserializer(SimpleStringSchema())  //
> > Deserialize string values
> >         .setProperties(Properties().apply {
> >             setProperty("auto.offset.reset", "earliest")
> >         })
> >         .build()
> >
> >     // Create a WatermarkStrategy for event time processing
> >     val watermarkStrategy = WatermarkStrategy.noWatermarks<String>()
> >
> >     // Set up the execution environment
> >     val env = StreamExecutionEnvironment.getExecutionEnvironment()
> >
> >     // Add KafkaSource as the input stream, provide watermark strategy
> > and a source name
> >     val stream = env.fromSource(
> >         kafkaSource,
> >         watermarkStrategy,
> >         "KafkaSource"
> >     )
> >
> >     stream.sinkTo(InfluxDBSink())
> >
> >     // Execute the job
> >     env.execute("Flink Kafka Integration")
> > }
> >
> > But below curl is working.
> > curl --request POST
> > "http://localhost:8086/api/v2/write?org=org&bucket=sample_data&precision=ms";
> > \
> >   --header "Authorization: Token
> > wvd-8VqKyz6YodOSyVPU17oSOobKqwJDHf0GtFzd9-1DuzFyF3l1xlDGt2R062CI8Jpg=="
> > \
> >   --header "Content-Type: text/plain; charset=utf-8" \
> >   --data-binary "dummy_events,source=flink value=42.0 1738142927000"
> >
> >
> >
> > Please check and reply is I am doing anything wrong.
> >
> > Thank you.

Reply via email to