How can I mark the end of the stream with an end message while using the new KafkaSource?

2021-11-09 Thread LIU Xiao
With the legacy FlinkKafkaConsumer, overriding the isEndOfStream method of DeserializationSchema can solve the problem. But the new KafkaSource ignores the method (never been called), and it seems the setUnbounded method only accepts offset or time.

Re: How to express the datatype of sparksql collect_list(named_struct(...))in flinksql?

2021-11-09 Thread JING ZHANG
Hi vtygoss, I'm a little confused. The UDF could already work well without defining `DataTypeHint `annotation. Why do you define `DataTypeHint `annotation before input parameter of `eval `method? Best, JING ZHANG vtygoss 于2021年11月9日周二 下午8:17写道: > Hi, JING ZHANG! > > Thanks for your many times o

Re: Getting mini-cluster logs when debugging pyflink from IDE

2021-11-09 Thread Dian Fu
The logging directory could be configured via one the following way: - environment variable: FLINK_LOG_DIR - configuration 'env.log.dir' in $PYTHON_INSTALLATION_DIR/site-packages/pyflink/conf/flink-conf.yaml You could refer to [1] for more details. PS: This is only available for 1.13.3+ and 1.14.

Re: select records using JDBC with parameters

2021-11-09 Thread Caizhi Weng
Hi! It is very likely that versions of your Flink client and Flink standalone cluster do not match. SubtaskStateMapper.DISCARD_EXTRA_STATE is removed since Flink 1.14 so please make sure that your Flink client version is also 1.14. Sigalit Eliazov 于2021年11月10日周三 上午5:46写道: > Hello > > i am creat

Re: Pyflink PyPi build - scala 2.12 compatibility

2021-11-09 Thread Dian Fu
Hi Kamil, You are right that it comes with JAR packages of scala 2.11 in the PyFlink package as it has to select one version of JARs to bundle, either 2.11 or 2.12. Whether it works with scala 2.12 depends on how you submit your job. - If you execute the job locally, then it will use the JARs bund

Re: Access to GlobalJobParameters From DynamicTableSourceFactory

2021-11-09 Thread Caizhi Weng
Hi! You can't get GLOBAL_JOB_PARAMETERS from DynamicTableSourceFactory#createDynamicTableSource as far as I know, because configuration in that context is from table config, which only contains configuration on table API level. Could you tell us more about your use case? There are many other ways

Re: Access to GlobalJobParameters From DynamicTableSourceFactory

2021-11-09 Thread Krzysztof Chmielewski
Hi, Well no, because ReadableContext does not have getConfiguration method ;) So like I wrote, I would like to get the GLOBAL_JOB_PARAMETERS from DynamicTableSourceFactory interface implementations. I'm talking about DynamicTableSourceFactory:: createDynamicTableSource method. It seems that I h

select records using JDBC with parameters

2021-11-09 Thread Sigalit Eliazov
Hello i am creating new pipeline which 1. receives info from kafka (mainly the key) 2. with this key select information from a D 3. writes to kafka the results Flink is running has a standalone cluster I am failing on the pipeline deployment when activating step 2 with the following

Pyflink PyPi build - scala 2.12 compatibility

2021-11-09 Thread Kamil ty
Hello, Just wanted to verify if the default build of pyflink available from PyPi is compatible with flink - scala version 2.12. I have noticed that the PyPi pyflink version comes with apache-flink-libraries targeted for scala 2.11 only and I was wondering if this might be the cause of some issues

Re: Access to GlobalJobParameters From DynamicTableSourceFactory

2021-11-09 Thread Francesco Guardiani
Have you tried this? context.getConfiguration().get(org.apache.flink.configuration.PipelineOptions.GLOBAL_JOB_PARAMETERS) On Tue, Nov 9, 2021 at 3:59 PM Krzysztof Chmielewski < krzysiek.chmielew...@gmail.com> wrote: > Hi, > Is there a way to access GlobalJobParameters registered as > env.getCo

JVM cluster not firing event time window

2021-11-09 Thread Carlos Downey
Hello, Recently, I've decided to migrate one of my toy projects to Flink 1.14.0 and I realized that JVM cluster behaviour changed. It no longer respects event time. On Flink 1.12.5 I didn't experience any issues. I tried some debugging and it seems that InternalTimerServiceImpl's watermark is not

Re: Providing files while application mode deployment

2021-11-09 Thread Piotr Nowojski
Hi Vasily, Unfortunately no, I don't think there is such an option in your case. With per job mode, you could try to use the Distributed Cache, it should be working in streaming as well [1], but this doesn't work in the application mode, as in that case no code is executed on the JobMaster [2] Tw

Access to GlobalJobParameters From DynamicTableSourceFactory

2021-11-09 Thread Krzysztof Chmielewski
Hi, Is there a way to access GlobalJobParameters registered as env.getConfig().setGlobalJobParameters(parameters); from DynamicTableSourceFactory implementation? To be more specific from DynamicTableSourceFactory::createDynamicTableSource method. The Context parameter of createDynamicTableSource

Azure blob storage credential configuration in flink on ververica is giving java.lang.ClassNotFoundException

2021-11-09 Thread Samir Vasani
I am working on accessing azure blob storage through flink pipeline. As per flink documentation https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/ there are two approaches to implement this. 1)fs.azure.accou

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Dongwon Kim
Hi Fabian, Can you maybe share more about the setup and how you use the AsyncFunction > with > the Kafka client? Oops, I didn't mention in the reply to David that the kafka producer has nothing to do with the AsyncFunction! I interact with Redis and a Spring boot app. in the AsyncFunction, not Ka

Providing files while application mode deployment

2021-11-09 Thread Vasily Melnik
Hi all. While running Flink jobs in application mode on YARN and Kuber, we need to provide some configuration files to main class. Is there any option on Flink CLI to copy local files on cluster without manually copying on DFS or in docker image, something like *--files* option in spark-submit?

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Dongwon Kim
Hi Jun, Did you override AsyncFunction#timeout()? If so, did you call > resultFuture.complete()/completeExceptionally() in your override? Not > calling them can result in checkpoint timeout. No, I've only called resultFuture.complete() in AsyncFunction.asyncInvoke() and didn't know much about

Re: How to express the datatype of sparksql collect_list(named_struct(...))in flinksql?

2021-11-09 Thread vtygoss
Hi, JING ZHANG! Thanks for your many times of help. I already try to use COLLECT(ROW(id, name)) and store the result with type String(for POC test). So I try to define an UDF, and the annotation of function eval must be defined as "MULTISET>" as below, otherwise exception "..RAW/MAP expecte

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Piotr Nowojski
Hi All, to me it looks like something deadlocked, maybe due to this OOM error from Kafka, preventing a Task from making any progress. To confirm Dongwan you could collecte stack traces while the job is in such a blocked state. Deadlocked Kafka could easily explain those symptoms and it would be vi

Re: Upgrade from 1.13.1 to 1.13.2/1.13.3 failing

2021-11-09 Thread Dawid Wysakowicz
Hey Sweta, Sorry I did not get back to you earlier. Could you explain how do you do the upgrade? Do you try to upgrade your cluster through HA services (e.g. zookeeper)? Meaning you bring down the 1.13.1 cluster down and start a 1.13.2/3 cluster which you intend to pick up the job automatically a

Re: A savepoint was created but the corresponding job didn't terminate successfully.

2021-11-09 Thread Piotr Nowojski
Hi Dongwon, Thanks for reporting the issue, I've created a ticket for it [1] and we will analyse and try to fix it soon. In the meantime it should be safe for you to ignore this problem. If this failure happens only rarely, you can always retry stop-with-savepoint command and there should be no vi

Re: Beginner: guidance on long term event stream persistence and replaying

2021-11-09 Thread Piotr Nowojski
Hi Simon, >From the top of my head I do not see a reason why this shouldn't work in Flink. I'm not sure what your question is here. For reading both from the FileSource and Kafka at the same time you might want to take a look at the Hybrid Source [1]. Apart from that there are FileSource/FileSink

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Fabian Paul
Hi Dongwan, Can you maybe share more about the setup and how you use the AsyncFunction with the Kafka client? As David already pointed out it could be indeed a Kafka bug but it could also mean that your defined async function leaks direct memory by not freeing some resources. We can definitely i

Re: Dependency injection for TypeSerializer?

2021-11-09 Thread Krzysztof Chmielewski
Hi, In my past project I was able to use Spring as a DI provider for Flink Jobs. It actually saves me a lot of hassle while writing/composing jobs and process functions. I was able to use all Spring's Bean annotations along with properties files managed by Spring as it would be a "normal" spring ap

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread Jun Qin
Hi Dongwon Did you override AsyncFunction#timeout()? If so, did you call resultFuture.complete()/completeExceptionally() in your override? Not calling them can result in checkpoint timeout. Thanks Jun > On Nov 9, 2021, at 7:37 AM, Dongwon Kim wrote: > > Hi David, > > There are currently

Re: how to expose the current in-flight async i/o requests as metrics?

2021-11-09 Thread David Morávek
This is definitely a bug on Kafka side, because they're not handling uncaught exceptions properly [1]. I don't think there is much we can do on the Flink side here, because we're not able to override factory for the Kafka IO thread :/ [1] https://issues.apache.org/jira/browse/KAFKA-4228 On Tue, N