Can you please provide how you are submitting the Python job via JM. Siva,.
On Thu, Feb 13, 2025 at 3:44 PM Nikola Milutinovic <n.milutino...@levi9.com> wrote: > OK, you are running into some Flink weirdness here. And it is with PyFlink > and how it runs. > > > > What machine are you trying to run the job from? We were, initially, > launching our jobs from the Job Manager. It is not recommended for various > reasons, but never mind. Later, we were using a custom Docker image for > deployments. > > > > What you need to understand is that there are 2 stages to job running: > Flink client and Flink Job Manager. Flink client will execute the code for > building job graph and submit the graph to JM. JM will figure out how many > Task Managers it needs and deploy tasks to them. > > > > The error you are seeing is complaining on the URI for targeting PyFlink > JAR. Not sure what is wrong with it (yes, I know it should be > file:///opt/flink..., but I’ve seen both variants). In any case, it is > not your fault. At least, not directly. > > > > Anyway, I can confirm that we have been successfully launching PyFlink > jobs from JM, in Docker Compose. > > > > Nix,. > > > > *From: *Siva Krishna <sivasoe...@gmail.com> > *Date: *Thursday, February 13, 2025 at 5:20 AM > *To: *Nikola Milutinovic <n.milutino...@levi9.com> > *Cc: *user@flink.apache.org <user@flink.apache.org> > *Subject: *Re: Please help with Flink + InfluxDB > > Hi Nix, > I tried the python approach and successfully ran it with command in > docker, and was able to save data in InfluxDB cloud. > > *Running python code, working* > python3 main.py > > *Below not working,* > /opt/flink/bin/flink run -py /opt/flink/flink_job/main.py > error: > \Traceback (most recent call last): > File "/opt/flink/flink_job/main.py", line 73, in <module> > main() > File "/opt/flink/flink_job/main.py", line 70, in main > env.execute("Kafka to InfluxDB Stream") > File > "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/stream_execution_environment.py", > line 824, in execute > File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", > line 1322, in __call__ > File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 146, in deco > File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", > line 326, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o8.execute. > : java.net.MalformedURLException: no protocol: > ['file:/opt/flink/opt/flink-python-1.20.0.jar'] > > > > > > *Tried, ant not working,* > /opt/flink/bin/flink run -py /opt/flink/flink_job/main.py -j > file:///opt/flink/opt/flink-python-1.20.0.jar > error: > > Could not get job jar and dependencies from JAR file: JAR file does not > exist: file:/opt/flink/opt/flink-python-1.20.0.jar > > > > > > *even though the file is in location,* > > # ls /opt/flink/opt/ > flink-azure-fs-hadoop-1.20.0.jar flink-python-1.20.0.jar > flink-shaded-netty-tcnative-dynamic-2.0.59.Final-17.0.jar > flink-table-planner_2.12-1.20.0.jar > > > > > > Please help. > > > > > > Thanking you, > > Siva Rama Krishna > > > > > > > > > > On Fri, Jan 31, 2025 at 2:16 PM Nikola Milutinovic < > n.milutino...@levi9.com> wrote: > > Hi Siva. > > > > In this case, extensive logging is in order. Now, you should be aware that > if you want to track progress of data in your pipeline, you need to log > statements inside your processors, sinks,… On our project we made a small > LogProcessingFunction, which was just outputting log statement we set on > its constructor. > > > > So, log, log and log. > > > > Nix. > > > > *From: *Siva Krishna <sivasoe...@gmail.com> > *Date: *Friday, January 31, 2025 at 4:41 AM > *To: *Nikola Milutinovic <n.milutino...@levi9.com> > *Cc: *user@flink.apache.org <user@flink.apache.org> > *Subject: *Re: Please help with Flink + InfluxDB > > Yes Nix. > I can confirm that I am able to connect to InfluxDB from > Flink(taskmanager). > URL="http://influxdb:8086" > And also, I tried with below from inside Flink(taskmanager) container > and that also worked. > curl influxdb:8086 > > Siva. > > On Thu, Jan 30, 2025 at 4:50 PM Nikola Milutinovic > <n.milutino...@levi9.com> wrote: > > > > Hi. > > > > > > > > First of all, can you verify your Docker compose InfluxDB is working, is > it accepting connections? > > > > > > > > Can you connect to it from outside Docker Compose? > > > > > > > > When connecting from Flink, I hope you are referencing the service name > as a host, because that is how Docker Compose works. It is a common mistake > to target “localhost”, which stays inside the container. > > > > > > > > Nix. > > > > > > > > From: Siva Krishna <sivasoe...@gmail.com> > > Date: Thursday, January 30, 2025 at 11:07 AM > > To: Nikola Milutinovic <n.milutino...@levi9.com> > > Cc: user@flink.apache.org <user@flink.apache.org> > > Subject: Re: Please help with Flink + InfluxDB > > > > Hi Nix, > > Thank you for your reply. > > I have small update. For trial/error, I have signed in InfluxDB cloud. > > This same code is working and data is being save in the cloud. > > So, I am guessing there might issue with my 'docker compose', which is > > why the data is not being saved in local(docker InfluxDB). > > Maybe I need to correct my volumes paths or something? > > > > Warm regards, > > Siva Rama Krishna. > > > > On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic > > <n.milutino...@levi9.com> wrote: > > > > > > Hi Siva. > > > > > > > > > > > > What is the InfluxDB URL you are using? I assume it is something like > “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a > problem. > > > > > > > > > > > > Do you see a connection on InfluxDB? Anything in the logs? Anything in > the logs of Job Mnager? > > > > > > > > > > > > Nix, > > > > > > > > > > > > From: Siva Krishna <sivasoe...@gmail.com> > > > Date: Thursday, January 30, 2025 at 5:12 AM > > > To: user@flink.apache.org <user@flink.apache.org> > > > Subject: Please help with Flink + InfluxDB > > > > > > Hi, I am trying to create a data streaming pipeline for real-time > analytics. > > > > > > Kafka -> Flink -> InfluDB -> Visualize > > > > > > I am facing an issue writing the data to InfluxDB. I can see logs in > > > 'taskmanager' after submitting the Job. But, the data is not being > > > written to InfluxDB for some reason. > > > Please help, I am attaching sample docker setup and basic code. > > > > > > import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper > > > import com.influxdb.client.InfluxDBClient > > > import com.influxdb.client.InfluxDBClientFactory > > > import com.influxdb.client.InfluxDBClientOptions > > > import com.influxdb.client.domain.WritePrecision > > > import com.influxdb.client.write.Point > > > import org.apache.flink.api.common.eventtime.Watermark > > > import org.apache.flink.api.connector.sink2.Sink > > > import org.apache.flink.api.connector.sink2.SinkWriter > > > import org.apache.flink.api.connector.sink2.WriterInitContext > > > import java.time.Instant > > > > > > class InfluxDBSink : Sink<String> { > > > override fun createWriter(p0: WriterInitContext?): > SinkWriter<String> { > > > return InfluxDBSinkWriter() > > > } > > > } > > > > > > // Implement the SinkWriter that writes EventData to InfluxDB > > > class InfluxDBSinkWriter : SinkWriter<String> { > > > private val mapper = jacksonObjectMapper() > > > > > > // Initialize the InfluxDB client > > > private val influxDBClient = initializeInfluxDBClient() > > > // private val writeApi = influxDBClient.makeWriteApi() > > > > > > // Write data to InfluxDB > > > override fun write(element: String, context: SinkWriter.Context) { > > > > > > val event = mapper.readValue(element, EventData::class.java) > > > println("eventData: $event") > > > > > > try { > > > val connectable = influxDBClient.ping() > > > val ready = influxDBClient.ready() > > > val influxStr = influxDBClient.toString() > > > println("influxStr: $connectable -> $ready -> $influxStr") > > > } catch (e: Exception) { > > > println(e.message) > > > } > > > > > > val point = Point("dummy_events") > > > .addTag("source", "flink") > > > .addField("timestamp", event.timestamp) > > > .addField("value", event.value) > > > .time(Instant.now().toEpochMilli(), WritePrecision.MS) > > > // println("Writing point to InfluxDB: $point, > > > ${point.toLineProtocol()}") > > > > > > // writeApi.writePoint(point) > > > // writeApi.flush() > > > > > > influxDBClient.writeApiBlocking.writePoint(point) > > > // println("after influxDBClient writeApiBlocking") > > > } > > > > > > // This method will flush any pending writes (can be empty if no > > > additional batching needed) > > > override fun flush(endOfInput: Boolean) { > > > // if (endOfInput) writeApi.flush() > > > } > > > > > > // Handle any watermarks if required (optional, can be skipped if > > > not needed) > > > override fun writeWatermark(watermark: Watermark) { > > > } > > > > > > // Close the writer when done > > > override fun close() { > > > influxDBClient.close() > > > } > > > } > > > > > > fun initializeInfluxDBClient(): InfluxDBClient { > > > // Build the InfluxDBClientOptions with all required parameters > > > val influxDBOptions = InfluxDBClientOptions.builder() > > > .url(influxDBUrl) // InfluxDB URL > > > .authenticate(influxUserName, influxPassword.toCharArray()) > > > // Username and password > > > .org(influxOrg) // Organization name > > > .bucket(influxBucket) // Bucket name > > > .authenticateToken(influxToken.toCharArray()) // Token for > > > authentication > > > .build() > > > > > > // Create the InfluxDB client using the built options > > > return InfluxDBClientFactory.create(influxDBOptions) > > > } > > > > > > > > > Main.kt > > > fun main() { > > > // Create KafkaSource with properties > > > val kafkaSource = KafkaSource.builder<String>() > > > .setBootstrapServers("kafka:9092") > > > .setTopics("dummy-events") // Kafka topic > > > .setGroupId("flink-group") > > > .setStartingOffsets(OffsetsInitializer.earliest()) // Start > > > from the latest message > > > .setValueOnlyDeserializer(SimpleStringSchema()) // > > > Deserialize string values > > > .setProperties(Properties().apply { > > > setProperty("auto.offset.reset", "earliest") > > > }) > > > .build() > > > > > > // Create a WatermarkStrategy for event time processing > > > val watermarkStrategy = WatermarkStrategy.noWatermarks<String>() > > > > > > // Set up the execution environment > > > val env = StreamExecutionEnvironment.getExecutionEnvironment() > > > > > > // Add KafkaSource as the input stream, provide watermark strategy > > > and a source name > > > val stream = env.fromSource( > > > kafkaSource, > > > watermarkStrategy, > > > "KafkaSource" > > > ) > > > > > > stream.sinkTo(InfluxDBSink()) > > > > > > // Execute the job > > > env.execute("Flink Kafka Integration") > > > } > > > > > > But below curl is working. > > > curl --request POST > > > " > http://localhost:8086/api/v2/write?org=org&bucket=sample_data&precision=ms > " > > > \ > > > --header "Authorization: Token > > > wvd-8VqKyz6YodOSyVPU17oSOobKqwJDHf0GtFzd9-1DuzFyF3l1xlDGt2R062CI8Jpg==" > > > \ > > > --header "Content-Type: text/plain; charset=utf-8" \ > > > --data-binary "dummy_events,source=flink value=42.0 1738142927000" > > > > > > > > > > > > Please check and reply is I am doing anything wrong. > > > > > > Thank you. > >