Read Json deserialization schema from file

2022-01-28 Thread Hussein El Ghoul
Hello, How to specify the deserialization schema for multiple Kafka topics using Flink (python) I want to read from multiple Kafka topics with JSON schema using FlinkKafkaConsumer, and I assume that I need to use JsonRowDeserializationSchema to deserialize the data. The schema of the topics is v

Re: Stack Overflow Question - Deserialization schema for multiple topics

2022-01-28 Thread David Anderson
For questions like this one, please address them to either Stack Overflow or the user mailing list, but not both at once. Those two forums are appropriate places to get help with using Flink's APIs. And once you've asked a question, please allow some days for folks to respond before trying again.

Re: Determinism of interval joins

2022-01-28 Thread Robert Metzger
Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if the behavior gets deterministic? On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa < alexis.sarda-espin...@microfocus.com> wrote: > I'm not sure if the issue in [1] is relevant since it mentions the Table > API, but it

Re: Inaccurate checkpoint trigger time

2022-01-28 Thread Robert Metzger
Hi Paul, where are you storing your checkpoints, and what's their size? IIRC, Flink won't trigger a new checkpoint before the old ones haven't been cleaned up, and if your checkpoints are large and stored on S3, it can take a while to clean them up (especially with the Hadoop S3 plugin, using pre

Re: Duplicate job submission error

2022-01-28 Thread Robert Metzger
Hi Parag, it seems that you are submitting a job with the same job id multiple times. An easy fix would be generating a new job id each time you are submitting the job. To debug this: check out the Flink jobmanager logs, there are log messages for every job submission. On Thu, Jan 27, 2022 at 9

Re: IllegalArgumentException: URI is not hierarchical error when initializating jobmanager in cluster

2022-01-28 Thread Robert Metzger
Hi Javier, I suspect that TwitterServer is using some classloading / dependency injection / service loading "magic" that is causing this. I would try to find out, either by attaching a remote debugger (should be possible when executing in cluster mode locally) or by adding log statements in the co

Re: Upgrade to 1.14.3

2022-01-28 Thread Robert Metzger
Hi Sweta, yes, you can not run a Flink job compiled against Flink 1.13. against a 1.14 cluster. But if you are only using stable APIs of Flink, you should be able to compile your job with the 1.14 dependencies without touching the code. See also: https://nightlies.apache.org/flink/flink-docs-rele

Re: Example for Jackson JsonNode Kafka serialization schema

2022-01-28 Thread Robert Metzger
Hi Oran, as you've already suggested, you could just use a (flat)map function that takes an ObjectNode and outputs a string. In the mapper, you can do whatever you want in case of an invalid object: logging about it, discarding it, writing an "error json string", writing to a side output stream, .

Socket stream source in Python?

2022-01-28 Thread Philippe Rigaux
Hi there I would like to use a socket stream as input for my Flink workflow in Python. This works in scala with the socketTextStream() method, for instance val stream = senv.socketTextStream("localhost", 9000, '\n') I cannot find an equivalent in PyFlink, although it is briefly mentioned in the

Re: Resolving a CatalogTable

2022-01-28 Thread Timo Walther
Hi Balazs, you are right, the new APIs only allow the serialization of resolved instances. This ensures that only validated, correct instances are put into the persistent storage such as a database. The framework will always provide resolved instances and call the corresponding methods with t

Flink test late elements

2022-01-28 Thread Dario Heinisch
Hey there, Hope everyone is well! I have a question: ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();     env.setParallelism(1);     DataStream dataStream = env.addSource(new CustomSource());     OutputTag outputTag = new OutputTag("late"

Re: Unbounded streaming with table API and large json as one of the columns

2022-01-28 Thread HG
Thanks On Fri, Jan 28, 2022, 07:47 Caizhi Weng wrote: > Hi! > > This job will work as long as your SQL statement is valid. Did you meet > some difficulties? Or what is your concern? A record of 100K is sort of > large, but I've seen quite a lot of jobs with such record size so it is OK. > > HG

MAP data type (PyFlink)

2022-01-28 Thread Philippe Rigaux
Hello I want to send and receive dict Python values. According to the PyFlink doc, this is specified with Types.MAP(). Unfortunately I found no example of the required arguments, and I am stuck with the following error: TypeError: MAP() missing 2 required positional arguments: 'key_type_info'

Re: Is Scala the best language for Flink?

2022-01-28 Thread Nicolás Ferrario
Hi Seb. In my team we are migrating things to Kotlin because we find it much easier to deal with. It's like the best of both worlds, but you do give up on Flink Scala serializers, since the only way to get Kotlin Data Classes working is by making them a POJO (or implementing your own TypeInfo). Yo

Re: Is Scala the best language for Flink?

2022-01-28 Thread sri hari kali charan Tummala
Yes Scala is the best. On Fri, Jan 28, 2022, 9:57 AM Nicolás Ferrario wrote: > Hi Seb. > > In my team we are migrating things to Kotlin because we find it much > easier to deal with. It's like the best of both worlds, but you do give up > on Flink Scala serializers, since the only way to get Kot

EOS from checkpoints doesn't seem to work

2022-01-28 Thread Cristian Constantinescu
Hi everyone, >From the mailing list, I see this question asked a lot. But I can't seem to find a solution to my problem. I would appreciate some help. The requirement for our project is that we do not lose data, and not produce duplicate records. Our pipelines are written with Apache Beam (2.35.0

Log4j2 Issues

2022-01-28 Thread Clayton Wohl
When I run my Flink app via the Google Flink Operator, log4j2 logging doesn't show up in logs. System.out.println messages work. Everything should be very plain-vanilla-standard setup. I have a log4j2.xml config file in my classpath included in my application .jar file. I'm building a custom Docke

Re: Log4j2 Issues

2022-01-28 Thread Clayton Wohl
sorry. nevermind. I see the Flink operator overrides with JVM argument configuration. JVM Options: -Dlog.file=/opt/flink/log/flink--client-4b57ba2b2597.log -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties

How to put external data in EmbeddedRocksDB

2022-01-28 Thread Surendra Lalwani
Hi Team, I am working on a solution where we need to perform some lookup from flink job, earlier we were using Redis and calling that redis using UDF from Flink Job but now we are planning to remove external dependency from Flink of Redis and want to use the embedded rocks db as look up store, doe