Re: Please help with Flink + InfluxDB
Yes Nix. I can confirm that I am able to connect to InfluxDB from Flink(taskmanager). URL="http://influxdb:8086"; And also, I tried with below from inside Flink(taskmanager) container and that also worked. curl influxdb:8086 Siva. On Thu, Jan 30, 2025 at 4:50 PM Nikola Milutinovic wrote: > > Hi. > > > > First of all, can you verify your Docker compose InfluxDB is working, is it > accepting connections? > > > > Can you connect to it from outside Docker Compose? > > > > When connecting from Flink, I hope you are referencing the service name as a > host, because that is how Docker Compose works. It is a common mistake to > target “localhost”, which stays inside the container. > > > > Nix. > > > > From: Siva Krishna > Date: Thursday, January 30, 2025 at 11:07 AM > To: Nikola Milutinovic > Cc: user@flink.apache.org > Subject: Re: Please help with Flink + InfluxDB > > Hi Nix, > Thank you for your reply. > I have small update. For trial/error, I have signed in InfluxDB cloud. > This same code is working and data is being save in the cloud. > So, I am guessing there might issue with my 'docker compose', which is > why the data is not being saved in local(docker InfluxDB). > Maybe I need to correct my volumes paths or something? > > Warm regards, > Siva Rama Krishna. > > On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic > wrote: > > > > Hi Siva. > > > > > > > > What is the InfluxDB URL you are using? I assume it is something like > > “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a > > problem. > > > > > > > > Do you see a connection on InfluxDB? Anything in the logs? Anything in the > > logs of Job Mnager? > > > > > > > > Nix, > > > > > > > > From: Siva Krishna > > Date: Thursday, January 30, 2025 at 5:12 AM > > To: user@flink.apache.org > > Subject: Please help with Flink + InfluxDB > > > > Hi, I am trying to create a data streaming pipeline for real-time analytics. > > > > Kafka -> Flink -> InfluDB -> Visualize > > > > I am facing an issue writing the data to InfluxDB. I can see logs in > > 'taskmanager' after submitting the Job. But, the data is not being > > written to InfluxDB for some reason. > > Please help, I am attaching sample docker setup and basic code. > > > > import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper > > import com.influxdb.client.InfluxDBClient > > import com.influxdb.client.InfluxDBClientFactory > > import com.influxdb.client.InfluxDBClientOptions > > import com.influxdb.client.domain.WritePrecision > > import com.influxdb.client.write.Point > > import org.apache.flink.api.common.eventtime.Watermark > > import org.apache.flink.api.connector.sink2.Sink > > import org.apache.flink.api.connector.sink2.SinkWriter > > import org.apache.flink.api.connector.sink2.WriterInitContext > > import java.time.Instant > > > > class InfluxDBSink : Sink { > > override fun createWriter(p0: WriterInitContext?): SinkWriter { > > return InfluxDBSinkWriter() > > } > > } > > > > // Implement the SinkWriter that writes EventData to InfluxDB > > class InfluxDBSinkWriter : SinkWriter { > > private val mapper = jacksonObjectMapper() > > > > // Initialize the InfluxDB client > > private val influxDBClient = initializeInfluxDBClient() > > //private val writeApi = influxDBClient.makeWriteApi() > > > > // Write data to InfluxDB > > override fun write(element: String, context: SinkWriter.Context) { > > > > val event = mapper.readValue(element, EventData::class.java) > > println("eventData: $event") > > > > try { > > val connectable = influxDBClient.ping() > > val ready = influxDBClient.ready() > > val influxStr = influxDBClient.toString() > > println("influxStr: $connectable -> $ready -> $influxStr") > > } catch (e: Exception) { > > println(e.message) > > } > > > > val point = Point("dummy_events") > > .addTag("source", "flink") > > .addField("timestamp", event.timestamp) > > .addField("value", event.value) > > .time(Instant.now().toEpochMilli(), WritePrecision.MS) > > //println("Writing point to InfluxDB: $point, > > ${point.toLineProtocol()}") > > > > //writeApi.writePoint(point) > > //writeApi.flush() > > > > influxDBClient.writeApiBlocking.writePoint(point) > > //println("after influxDBClient writeApiBlocking") > > } > > > > // This method will flush any pending writes (can be empty if no > > additional batching needed) > > override fun flush(endOfInput: Boolean) { > > //if (endOfInput) writeApi.flush() > > } > > > > // Handle any watermarks if required (optional, can be skipped if > > not needed) > > override fun writeWatermark(watermark: Watermark) { > > } > > > > // Close the writer when done > > override fun close() { > > influxDBClient.close() > > } > > } > > > > fun initiali
Unsubscribe
Unsubscribe
Re: Please help with Flink + InfluxDB
Hi. First of all, can you verify your Docker compose InfluxDB is working, is it accepting connections? Can you connect to it from outside Docker Compose? When connecting from Flink, I hope you are referencing the service name as a host, because that is how Docker Compose works. It is a common mistake to target “localhost”, which stays inside the container. Nix. From: Siva Krishna Date: Thursday, January 30, 2025 at 11:07 AM To: Nikola Milutinovic Cc: user@flink.apache.org Subject: Re: Please help with Flink + InfluxDB Hi Nix, Thank you for your reply. I have small update. For trial/error, I have signed in InfluxDB cloud. This same code is working and data is being save in the cloud. So, I am guessing there might issue with my 'docker compose', which is why the data is not being saved in local(docker InfluxDB). Maybe I need to correct my volumes paths or something? Warm regards, Siva Rama Krishna. On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic wrote: > > Hi Siva. > > > > What is the InfluxDB URL you are using? I assume it is something like > “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a > problem. > > > > Do you see a connection on InfluxDB? Anything in the logs? Anything in the > logs of Job Mnager? > > > > Nix, > > > > From: Siva Krishna > Date: Thursday, January 30, 2025 at 5:12 AM > To: user@flink.apache.org > Subject: Please help with Flink + InfluxDB > > Hi, I am trying to create a data streaming pipeline for real-time analytics. > > Kafka -> Flink -> InfluDB -> Visualize > > I am facing an issue writing the data to InfluxDB. I can see logs in > 'taskmanager' after submitting the Job. But, the data is not being > written to InfluxDB for some reason. > Please help, I am attaching sample docker setup and basic code. > > import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper > import com.influxdb.client.InfluxDBClient > import com.influxdb.client.InfluxDBClientFactory > import com.influxdb.client.InfluxDBClientOptions > import com.influxdb.client.domain.WritePrecision > import com.influxdb.client.write.Point > import org.apache.flink.api.common.eventtime.Watermark > import org.apache.flink.api.connector.sink2.Sink > import org.apache.flink.api.connector.sink2.SinkWriter > import org.apache.flink.api.connector.sink2.WriterInitContext > import java.time.Instant > > class InfluxDBSink : Sink { > override fun createWriter(p0: WriterInitContext?): SinkWriter { > return InfluxDBSinkWriter() > } > } > > // Implement the SinkWriter that writes EventData to InfluxDB > class InfluxDBSinkWriter : SinkWriter { > private val mapper = jacksonObjectMapper() > > // Initialize the InfluxDB client > private val influxDBClient = initializeInfluxDBClient() > //private val writeApi = influxDBClient.makeWriteApi() > > // Write data to InfluxDB > override fun write(element: String, context: SinkWriter.Context) { > > val event = mapper.readValue(element, EventData::class.java) > println("eventData: $event") > > try { > val connectable = influxDBClient.ping() > val ready = influxDBClient.ready() > val influxStr = influxDBClient.toString() > println("influxStr: $connectable -> $ready -> $influxStr") > } catch (e: Exception) { > println(e.message) > } > > val point = Point("dummy_events") > .addTag("source", "flink") > .addField("timestamp", event.timestamp) > .addField("value", event.value) > .time(Instant.now().toEpochMilli(), WritePrecision.MS) > //println("Writing point to InfluxDB: $point, > ${point.toLineProtocol()}") > > //writeApi.writePoint(point) > //writeApi.flush() > > influxDBClient.writeApiBlocking.writePoint(point) > //println("after influxDBClient writeApiBlocking") > } > > // This method will flush any pending writes (can be empty if no > additional batching needed) > override fun flush(endOfInput: Boolean) { > //if (endOfInput) writeApi.flush() > } > > // Handle any watermarks if required (optional, can be skipped if > not needed) > override fun writeWatermark(watermark: Watermark) { > } > > // Close the writer when done > override fun close() { > influxDBClient.close() > } > } > > fun initializeInfluxDBClient(): InfluxDBClient { > // Build the InfluxDBClientOptions with all required parameters > val influxDBOptions = InfluxDBClientOptions.builder() > .url(influxDBUrl) // InfluxDB URL > .authenticate(influxUserName, influxPassword.toCharArray()) > // Username and password > .org(influxOrg) // Organization name > .bucket(influxBucket) // Bucket name > .authenticateToken(influxToken.toCharArray()) // Token for > authentication > .build() > > // Create the InfluxDB client using the built options > return I
Re: Dynamic Kafka Properties
By looking at: https://github.com/apache/flink-connector-kafka/pull/20 where the "setRackIdSupplier" setter for the kafka source builder was introduced, I guess one could take a more general approach and instead consider overrides for: "setProperty" "setProperties" which receive a supplier for the property/properties (this could be done for sinks, too). These new methods could be used for any property that one requires to be inferred/run in the taskmanagers for what it's worth, not just the rack id one. I guess as of now the rack id is probably the only property requiring lazy evaluation so to say, but in the future who knows... Regards, Salva On Mon, Jan 27, 2025 at 6:11 PM Salva Alcántara wrote: > I've recently started to enable rack awareness in my sources following > this: > > > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#rackid > > E.g., > > ```java > > .setRackIdSupplier(() -> System.getenv("TM_NODE_AZ")) > > ``` > > This approach allows to decouple the AZ between jobmanager & taskamangers. > > There are indeed some Kafka-compatible solutions like WarpStream which > support rack awareness for producers, too. E.g., > > > https://docs.warpstream.com/warpstream/byoc/configure-kafka-client/configuring-kafka-client-id-features#warpstream_az > > The problem is that they pass the rack id using a normal producer > property. Because the Kafka sink builder gets the properties and serializes > them in the jobmanager, all the taskmanagers will use the same AZ as that > of the jobmanager. > > Is there a way to easily pass "dynamic" properties so to say? > > Would it make sense to consider a supplier-based override for the property > setters of the Kafka builders? > > Regards, > > Salva > >
Re: Please help with Flink + InfluxDB
Hi Siva. What is the InfluxDB URL you are using? I assume it is something like “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a problem. Do you see a connection on InfluxDB? Anything in the logs? Anything in the logs of Job Mnager? Nix, From: Siva Krishna Date: Thursday, January 30, 2025 at 5:12 AM To: user@flink.apache.org Subject: Please help with Flink + InfluxDB Hi, I am trying to create a data streaming pipeline for real-time analytics. Kafka -> Flink -> InfluDB -> Visualize I am facing an issue writing the data to InfluxDB. I can see logs in 'taskmanager' after submitting the Job. But, the data is not being written to InfluxDB for some reason. Please help, I am attaching sample docker setup and basic code. import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import com.influxdb.client.InfluxDBClient import com.influxdb.client.InfluxDBClientFactory import com.influxdb.client.InfluxDBClientOptions import com.influxdb.client.domain.WritePrecision import com.influxdb.client.write.Point import org.apache.flink.api.common.eventtime.Watermark import org.apache.flink.api.connector.sink2.Sink import org.apache.flink.api.connector.sink2.SinkWriter import org.apache.flink.api.connector.sink2.WriterInitContext import java.time.Instant class InfluxDBSink : Sink { override fun createWriter(p0: WriterInitContext?): SinkWriter { return InfluxDBSinkWriter() } } // Implement the SinkWriter that writes EventData to InfluxDB class InfluxDBSinkWriter : SinkWriter { private val mapper = jacksonObjectMapper() // Initialize the InfluxDB client private val influxDBClient = initializeInfluxDBClient() //private val writeApi = influxDBClient.makeWriteApi() // Write data to InfluxDB override fun write(element: String, context: SinkWriter.Context) { val event = mapper.readValue(element, EventData::class.java) println("eventData: $event") try { val connectable = influxDBClient.ping() val ready = influxDBClient.ready() val influxStr = influxDBClient.toString() println("influxStr: $connectable -> $ready -> $influxStr") } catch (e: Exception) { println(e.message) } val point = Point("dummy_events") .addTag("source", "flink") .addField("timestamp", event.timestamp) .addField("value", event.value) .time(Instant.now().toEpochMilli(), WritePrecision.MS) //println("Writing point to InfluxDB: $point, ${point.toLineProtocol()}") //writeApi.writePoint(point) //writeApi.flush() influxDBClient.writeApiBlocking.writePoint(point) //println("after influxDBClient writeApiBlocking") } // This method will flush any pending writes (can be empty if no additional batching needed) override fun flush(endOfInput: Boolean) { //if (endOfInput) writeApi.flush() } // Handle any watermarks if required (optional, can be skipped if not needed) override fun writeWatermark(watermark: Watermark) { } // Close the writer when done override fun close() { influxDBClient.close() } } fun initializeInfluxDBClient(): InfluxDBClient { // Build the InfluxDBClientOptions with all required parameters val influxDBOptions = InfluxDBClientOptions.builder() .url(influxDBUrl) // InfluxDB URL .authenticate(influxUserName, influxPassword.toCharArray()) // Username and password .org(influxOrg) // Organization name .bucket(influxBucket) // Bucket name .authenticateToken(influxToken.toCharArray()) // Token for authentication .build() // Create the InfluxDB client using the built options return InfluxDBClientFactory.create(influxDBOptions) } Main.kt fun main() { // Create KafkaSource with properties val kafkaSource = KafkaSource.builder() .setBootstrapServers("kafka:9092") .setTopics("dummy-events") // Kafka topic .setGroupId("flink-group") .setStartingOffsets(OffsetsInitializer.earliest()) // Start from the latest message .setValueOnlyDeserializer(SimpleStringSchema()) // Deserialize string values .setProperties(Properties().apply { setProperty("auto.offset.reset", "earliest") }) .build() // Create a WatermarkStrategy for event time processing val watermarkStrategy = WatermarkStrategy.noWatermarks() // Set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment() // Add KafkaSource as the input stream, provide watermark strategy and a source name val stream = env.fromSource( kafkaSource, watermarkStrategy, "KafkaSource" ) stream.sinkTo(InfluxDBSink()) // Execute the job env.execute("Flink Kafka Integration") } But below curl is working. curl --request POST "http://localhost:8086/api/v2/w
Re: Performance problem with FlinkSQL
> The average output rate needed to avoid lag after filtering messages should be around 60K messages per second. I’ve been testing different configurations of parallelism, slots and pods (everything runs on Kubernetes), but I’m far from achieving those numbers. How are you partitioning your query? Do you see any backpressure happening on the Flink UI? > In the latest configuration, I used 20 pods, a parallelism of 120, with 4 slots per taskmanager. Were all tasks working properly or you had idleness? > Additionally, setting parallelism to 120 creates hundreds of subtasks for the smaller topics, which don’t do much but still consume minimal resources even if idle. On the table API I'm not sure if you can choose parallelism per "task" DataStream I'm sure you can do it. Att, Pedro Mázala +31 (06) 3819 3814 Be awesome On Wed, 29 Jan 2025 at 22:06, Guillermo Ortiz Fernández < guillermo.ortiz.f...@gmail.com> wrote: > After last checking it uses about 200-400 millicores each pod and 2.2Gb. > > El mié, 29 ene 2025 a las 21:41, Guillermo Ortiz Fernández (< > guillermo.ortiz.f...@gmail.com>) escribió: > >> I have a job entirely written in Flink SQL. The first part of the program >> processes 10 input topics and generates one output topic with normalized >> messages and some filtering applied (really easy, some where by fields and >> substring). Nine of the topics produce between hundreds and thousands of >> messages per second, with an average of 4–10 partitions each. The other >> topic produces 150K messages per second and has 500 partitions. They are >> unioned to the output topic. >> >> The average output rate needed to avoid lag after filtering messages >> should be around 60K messages per second. I’ve been testing different >> configurations of parallelism, slots and pods (everything runs on >> Kubernetes), but I’m far from achieving those numbers. >> >> In the latest configuration, I used 20 pods, a parallelism of 120, with 4 >> slots per taskmanager. With this setup, I achieve approximately 20K >> messages per second, but I’m unable to consume the largest topic at the >> rate messages are being produced. Additionally, setting parallelism to 120 >> creates hundreds of subtasks for the smaller topics, which don’t do much >> but still consume minimal resources even if idle. >> >> I started trying with parallelism 12 and got about 1000-3000 messages per >> second. >> >> When I check the use of cpu and memory to the pods and don't see any >> problem and they are far from the limit, each taskmanager has 4gb and >> 2cpus and they are never close to using the cpu. >> >> It’s a requirement to do everything 100% in SQL. How can I improve the >> throughput rate? Should I be concerned about the hundreds of subtasks >> created for the smaller topics? >> >
Re: Resolving Table using Java API
In almost every case, the “table should be resolved” error from FlinkCatalog.createTable() means that the table identifier you are passing is either invalid, incomplete, or the catalog can’t figure out where the table is supposed to live. Most commonly, it’s because: The database (namespace) is empty or invalid, e.g., an empty catalogNamespace. The table name is empty or null, e.g., a missing catalogTable. A required configuration (e.g., warehouse) or property is not present, causing Iceberg to fail resolving the path. I'd add a log every time you do dotenv.get (for both catalog namespace and catalog table) and make sure those configs are coming properly and not null/empty. Att, Pedro Mázala +31 (06) 3819 3814 Be awesome On Wed, 29 Jan 2025 at 17:11, Kamesh Sampath wrote: > I am on Apache Flink 1.20, trying to use pure Java SQL API to create a > catalog and table in it. When I tried to do so I am getting the following > error: > > ``` > 21:31:41,939 ERROR dev.kameshs.demos.flink.iceberg > [] - Error running job > java.lang.IllegalArgumentException: table should be resolved > at > org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:143) > ~[iceberg-flink-runtime-1.20-1.7.1.jar:?] > at > org.apache.iceberg.flink.FlinkCatalog.createTable(FlinkCatalog.java:394) > ~[iceberg-flink-runtime-1.20-1.7.1.jar:?] > at > dev.kameshs.demos.flink.iceberg.VehicleTelemetryProcessor.lambda$main$0(VehicleTelemetryProcessor.java:167) > ~[main/:?] > at java.util.Optional.ifPresent(Optional.java:178) ~[?:?] > at > dev.kameshs.demos.flink.iceberg.VehicleTelemetryProcessor.main(VehicleTelemetryProcessor.java:106) > [main/:?] > ``` > > The sample code is > https://gist.github.com/kameshsampath/0c4ad07b236bd38e999316598141b93e#file-vehicletelemetryprocessor-java-L167 > > Any clue/pointers to move forward. > > Thank you. >
Re: Please help with Flink + InfluxDB
Hi Nix, Thank you for your reply. I have small update. For trial/error, I have signed in InfluxDB cloud. This same code is working and data is being save in the cloud. So, I am guessing there might issue with my 'docker compose', which is why the data is not being saved in local(docker InfluxDB). Maybe I need to correct my volumes paths or something? Warm regards, Siva Rama Krishna. On Thu, Jan 30, 2025 at 2:05 PM Nikola Milutinovic wrote: > > Hi Siva. > > > > What is the InfluxDB URL you are using? I assume it is something like > “influxdb://influxdb:8086/…”. If it is “localhost:8086…” that would be a > problem. > > > > Do you see a connection on InfluxDB? Anything in the logs? Anything in the > logs of Job Mnager? > > > > Nix, > > > > From: Siva Krishna > Date: Thursday, January 30, 2025 at 5:12 AM > To: user@flink.apache.org > Subject: Please help with Flink + InfluxDB > > Hi, I am trying to create a data streaming pipeline for real-time analytics. > > Kafka -> Flink -> InfluDB -> Visualize > > I am facing an issue writing the data to InfluxDB. I can see logs in > 'taskmanager' after submitting the Job. But, the data is not being > written to InfluxDB for some reason. > Please help, I am attaching sample docker setup and basic code. > > import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper > import com.influxdb.client.InfluxDBClient > import com.influxdb.client.InfluxDBClientFactory > import com.influxdb.client.InfluxDBClientOptions > import com.influxdb.client.domain.WritePrecision > import com.influxdb.client.write.Point > import org.apache.flink.api.common.eventtime.Watermark > import org.apache.flink.api.connector.sink2.Sink > import org.apache.flink.api.connector.sink2.SinkWriter > import org.apache.flink.api.connector.sink2.WriterInitContext > import java.time.Instant > > class InfluxDBSink : Sink { > override fun createWriter(p0: WriterInitContext?): SinkWriter { > return InfluxDBSinkWriter() > } > } > > // Implement the SinkWriter that writes EventData to InfluxDB > class InfluxDBSinkWriter : SinkWriter { > private val mapper = jacksonObjectMapper() > > // Initialize the InfluxDB client > private val influxDBClient = initializeInfluxDBClient() > //private val writeApi = influxDBClient.makeWriteApi() > > // Write data to InfluxDB > override fun write(element: String, context: SinkWriter.Context) { > > val event = mapper.readValue(element, EventData::class.java) > println("eventData: $event") > > try { > val connectable = influxDBClient.ping() > val ready = influxDBClient.ready() > val influxStr = influxDBClient.toString() > println("influxStr: $connectable -> $ready -> $influxStr") > } catch (e: Exception) { > println(e.message) > } > > val point = Point("dummy_events") > .addTag("source", "flink") > .addField("timestamp", event.timestamp) > .addField("value", event.value) > .time(Instant.now().toEpochMilli(), WritePrecision.MS) > //println("Writing point to InfluxDB: $point, > ${point.toLineProtocol()}") > > //writeApi.writePoint(point) > //writeApi.flush() > > influxDBClient.writeApiBlocking.writePoint(point) > //println("after influxDBClient writeApiBlocking") > } > > // This method will flush any pending writes (can be empty if no > additional batching needed) > override fun flush(endOfInput: Boolean) { > //if (endOfInput) writeApi.flush() > } > > // Handle any watermarks if required (optional, can be skipped if > not needed) > override fun writeWatermark(watermark: Watermark) { > } > > // Close the writer when done > override fun close() { > influxDBClient.close() > } > } > > fun initializeInfluxDBClient(): InfluxDBClient { > // Build the InfluxDBClientOptions with all required parameters > val influxDBOptions = InfluxDBClientOptions.builder() > .url(influxDBUrl) // InfluxDB URL > .authenticate(influxUserName, influxPassword.toCharArray()) > // Username and password > .org(influxOrg) // Organization name > .bucket(influxBucket) // Bucket name > .authenticateToken(influxToken.toCharArray()) // Token for > authentication > .build() > > // Create the InfluxDB client using the built options > return InfluxDBClientFactory.create(influxDBOptions) > } > > > Main.kt > fun main() { >// Create KafkaSource with properties > val kafkaSource = KafkaSource.builder() > .setBootstrapServers("kafka:9092") > .setTopics("dummy-events") // Kafka topic > .setGroupId("flink-group") > .setStartingOffsets(OffsetsInitializer.earliest()) // Start > from the latest message > .setValueOnlyDeserializer(SimpleStringSchema()) // > Deserialize string values > .setProperties(Properties().apply { >