Re: Is there any way to get the ExecutionConfigurations in Dynamic factory class

2022-04-07 Thread Qingsheng Ren
Hi Anitha, AFAIK DynamicTableSourceFactory doesn’t expose interface for getting parallelism. Could you elaborate on why you need parallelism in table factory? Maybe we could find other ways to fulfill your requirement. Best regards, Qingsheng > On Apr 7, 2022, at 16:11, Anitha Thankappan

Re: FlinkKafkaProducer - Avro - Schema Registry

2022-04-07 Thread Dan Serb
Hello Qingsheng, Removing KafkaAvroSerializer from the producer properties worked, indeed. I validated this by using a FlinkKafkaConsumer, using ConfluentRegistryAvroDeserializationSchema, so it's working properly. The problem I'm still having, is that I will have to use schema registry where I

Re: Official Flink operator additional class paths

2022-04-07 Thread Francis Conroy
Hi Yang, thanks a lot for your help. It ended up being the case that my command in the initContainer was specified incorrectly. On Thu, 7 Apr 2022 at 18:41, Yang Wang wrote: > It seems that you have a typo when specifying the pipeline classpath. > "file:///flink-jar/flink-connector-rabbitmq_2.

Re: FlinkKafkaProducer - Avro - Schema Registry

2022-04-07 Thread Qingsheng Ren
Hi Dan, In FlinkKafkaProducer, records are serialized by the SerializationSchema specified in the constructor, which is the “schema” (ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class)) in your case, instead of the serializer specified in producer properties. The default se

Re: Produnction : Flink 1.14.4 : Kafka reader threads blocked

2022-04-07 Thread yu'an huang
Hi Vignesh, I think you can check the following things: 1. Check the cpu usage of the workers. Are they close to zero or almost full? 2. Any back pressure happened in downstream tasks? 3. Is the fullGC significant serious? Best, Yuan > On 7 Apr 2022, at 12:33 PM, Vignesh Ramesh wrote: > > H

Re: flink pipeline handles very small amount of messages in a second (only 500)

2022-04-07 Thread yu'an huang
Hi Sigalit, In your settings, I guess each job will only have one slot (parallelism). So is it too many input for your jobs with parallelism only one? One easy way to confirm is that you increase your slots and job parallelism twice and then see whether the QPS is increased. Hope this would

Re: flink pipeline handles very small amount of messages in a second (only 500)

2022-04-07 Thread Yun Tang
Hi Sigalit, First of all, did you ensure different source operator consumes different consumer id for the kafka source? Did each flink job share the same data or consumed the data independently? Moreover, was your job behaves back pressured? It might need to break the chained operator to see w

flink pipeline handles very small amount of messages in a second (only 500)

2022-04-07 Thread Sigalit Eliazov
hi all I would appreciate some help to understand the pipeline behaviour... We deployed a standalone flink cluster. The pipelines are deployed via the jm rest api. We have 5 task managers with 1 slot each. In total i am deploying 5 pipelines which mainly read from kafka, a simple object conversi

Re: Missing metrics in Flink v 1.15.0 rc-0

2022-04-07 Thread Jing Ge
Hi, Flink 1.15 has developed a new feature to support different sink pre- and post-topologies[1]. New metrics e.g. NumRecordsSend has been developed to measure records sent to the external system. Metrics like "Bytes Sent" and "Records Sent" measure records sent to the next task. So, in your cas

RE: python table api

2022-04-07 Thread ivan.ros...@agilent.com
Hello Dian, Indeed. Thank you very much. Now getting +I[2022-01-01T10:00:20, 2022-01-01T10:00:25, 2] +I[2022-01-01T10:00:25, 2022-01-01T10:00:30, 5] +I[2022-01-01T10:00:30, 2022-01-01T10:00:35, 1] from pyflink.table import EnvironmentSettings, TableEnvironment t_env = TableEnvironment.create

Re: Unbalanced distribution of keyed stream to downstream parallel operators

2022-04-07 Thread Isidoros Ioannou
Hello Arvid , thank you for your reply. Actually using a window to aggregate the events for a time period is not applicable to my case since I need the records to be processed immediately. Even if I could I still can not understand how I could forward the aggregated events to lets say 2 parallel o

Re: BIGDECIMAL data handling

2022-04-07 Thread Francesco Guardiani
Is there any reason for not using DECIMAL provided by Flink SQL? On Tue, Apr 5, 2022 at 4:06 PM Anitha Thankappan < anitha.thankap...@quantiphi.com> wrote: > Hi Martijn, > > I am using flink version 1.11.0. > The flink application code snippet is like: > > [image: image.png] > > The Error I am re

Re: HOP_PROCTIME is returning null

2022-04-07 Thread Francesco Guardiani
Maybe the reason is because the HOP_PROCTIME gets the name of the column? Can you share query and plan? On Mon, Apr 4, 2022 at 3:41 PM Surendra Lalwani wrote: > Hi Team, > > HOP_PROCTIME in flink version 1.13.6 is returning null while in previous > versions it used to output a time attribute, an

Re: python table api

2022-04-07 Thread Francesco Guardiani
As Dian sad, your insert into query is just selecting records from source to print, flowing them without any computation whatsoever. Please check out [1] and [2] to learn how to develop queries that perform aggregations over windows. Note that the second method (window tvf) is preferred and recomm

Re: Official Flink operator additional class paths

2022-04-07 Thread Yang Wang
It seems that you have a typo when specifying the pipeline classpath. "file:///flink-jar/flink-connector-rabbitmq_2.12-1.14.4.jar" -> "file:///flink-jars/flink-connector-rabbitmq_2.12-1.14.4.jar" If this is not the root cause, maybe you could have a try with downloading the connector jars to /opt/

Is there any way to get the ExecutionConfigurations in Dynamic factory class

2022-04-07 Thread Anitha Thankappan
Hi I have developed a BigQuery Flink connector by implementing DynamicTableSourceFactory. I have a requirement to : get the configured parallelism value of StreamExecutionEnvironment in the Factory class. or set the parallelism a

Official Flink operator additional class paths

2022-04-07 Thread Francis Conroy
Hi all, thanks in advance for any tips. I've been trying to specify some additional classpaths in my kubernetes yaml file when using the official flink operator and nothing seems to work. I know the technique for getting my job jar works fine since it's finding the class ok, but I cannot get the