Hello,
How to specify the deserialization schema for multiple Kafka topics
using Flink (python)
I want to read from multiple Kafka topics with JSON schema using
FlinkKafkaConsumer, and I assume that I need to use
JsonRowDeserializationSchema to deserialize the data. The schema of the
topics is v
For questions like this one, please address them to either Stack Overflow
or the user mailing list, but not both at once. Those two forums are
appropriate places to get help with using Flink's APIs. And once you've
asked a question, please allow some days for folks to respond before trying
again.
Instead of using `reinterpretAsKeyedStream` can you use keyBey and see if
the behavior gets deterministic?
On Thu, Jan 27, 2022 at 9:49 PM Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> wrote:
> I'm not sure if the issue in [1] is relevant since it mentions the Table
> API, but it
Hi Paul,
where are you storing your checkpoints, and what's their size?
IIRC, Flink won't trigger a new checkpoint before the old ones haven't been
cleaned up, and if your checkpoints are large and stored on S3, it can take
a while to clean them up (especially with the Hadoop S3 plugin, using
pre
Hi Parag,
it seems that you are submitting a job with the same job id multiple times.
An easy fix would be generating a new job id each time you are submitting
the job.
To debug this: check out the Flink jobmanager logs, there are log messages
for every job submission.
On Thu, Jan 27, 2022 at 9
Hi Javier,
I suspect that TwitterServer is using some classloading / dependency
injection / service loading "magic" that is causing this.
I would try to find out, either by attaching a remote debugger (should be
possible when executing in cluster mode locally) or by adding log
statements in the co
Hi Sweta,
yes, you can not run a Flink job compiled against Flink 1.13. against a
1.14 cluster. But if you are only using stable APIs of Flink, you should be
able to compile your job with the 1.14 dependencies without touching the
code.
See also:
https://nightlies.apache.org/flink/flink-docs-rele
Hi Oran,
as you've already suggested, you could just use a (flat)map function that
takes an ObjectNode and outputs a string.
In the mapper, you can do whatever you want in case of an invalid object:
logging about it, discarding it, writing an "error json string", writing to
a side output stream, .
Hi there
I would like to use a socket stream as input for my Flink workflow in Python.
This works in scala with the socketTextStream() method, for instance
val stream = senv.socketTextStream("localhost", 9000, '\n')
I cannot find an equivalent in PyFlink, although it is briefly mentioned in the
Hi Balazs,
you are right, the new APIs only allow the serialization of resolved
instances. This ensures that only validated, correct instances are put
into the persistent storage such as a database. The framework will
always provide resolved instances and call the corresponding methods
with t
Hey there,
Hope everyone is well!
I have a question:
```
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream dataStream = env.addSource(new CustomSource());
OutputTag outputTag = new OutputTag("late"
Thanks
On Fri, Jan 28, 2022, 07:47 Caizhi Weng wrote:
> Hi!
>
> This job will work as long as your SQL statement is valid. Did you meet
> some difficulties? Or what is your concern? A record of 100K is sort of
> large, but I've seen quite a lot of jobs with such record size so it is OK.
>
> HG
Hello
I want to send and receive dict Python values. According to the PyFlink doc,
this is specified with Types.MAP(). Unfortunately I
found no example of the required arguments, and I am stuck with the following
error:
TypeError: MAP() missing 2 required positional arguments: 'key_type_info'
Hi Seb.
In my team we are migrating things to Kotlin because we find it much easier
to deal with. It's like the best of both worlds, but you do give up on
Flink Scala serializers, since the only way to get Kotlin Data Classes
working is by making them a POJO (or implementing your own TypeInfo). Yo
Yes Scala is the best.
On Fri, Jan 28, 2022, 9:57 AM Nicolás Ferrario
wrote:
> Hi Seb.
>
> In my team we are migrating things to Kotlin because we find it much
> easier to deal with. It's like the best of both worlds, but you do give up
> on Flink Scala serializers, since the only way to get Kot
Hi everyone,
>From the mailing list, I see this question asked a lot. But I can't seem to
find a solution to my problem. I would appreciate some help.
The requirement for our project is that we do not lose data, and not
produce duplicate records. Our pipelines are written with Apache Beam
(2.35.0
When I run my Flink app via the Google Flink Operator, log4j2 logging
doesn't show up in logs. System.out.println messages work.
Everything should be very plain-vanilla-standard setup. I have a log4j2.xml
config file in my classpath included in my application .jar file. I'm
building a custom Docke
sorry. nevermind. I see the Flink operator overrides with JVM argument
configuration.
JVM Options:
-Dlog.file=/opt/flink/log/flink--client-4b57ba2b2597.log
-Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties
-Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties
Hi Team,
I am working on a solution where we need to perform some lookup from flink
job, earlier we were using Redis and calling that redis using UDF from
Flink Job but now we are planning to remove external dependency from Flink
of Redis and want to use the embedded rocks db as look up store, doe
19 matches
Mail list logo