Dropping messages based on timestamp.

2020-05-28 Thread Joe Malt
Hi, I'm working on a custom TimestampAssigner which will do different things depending on the value of the extracted timestamp. One of the actions I want to take is to drop messages entirely if their timestamp meets certain criteria. Of course there's no direct way to do this in the TimestampAssi

Performance impact of many open windows at the same time

2020-05-21 Thread Joe Malt
Hi all, I'm looking into what happens when messages are ingested with timestamps far into the future (e.g. due to corruption or a wrong clock at the sender). I'm aware of the effect on watermarking, but another thing I'm concerned about is the performance impact of the extra windows this will cre

Re: [External] Re: Setting a custom Kryo serializer in Flink-Python

2018-09-18 Thread Joe Malt
v = f.get(python_execution_env) > java_execution_env.addDefaultKryoSerializer(Message, > MessageKryoSerializer) > > With or without these lines, the job crashes with a KryoException (full > stack trace at https://pastebin.com/zxxzCqH0), it doesn't appear that > addDefaultKry

LocalEnvironment and Python streaming

2018-09-14 Thread Joe Malt
Hi, Is there any way to execute a job using the LocalEnvironment when using the Python streaming API? This would make it much easier to debug jobs. At the moment I'm not aware of any way of running them except firing up a local cluster and submitting the job with pyflink-stream.sh. Thanks,

Setting a custom Kryo serializer in Flink-Python

2018-09-14 Thread Joe Malt
es, the job crashes with a KryoException (full stack trace at https://pastebin.com/zxxzCqH0), it doesn't appear that addDefaultKryoSerializer is doing anything. Is there an officially supported way to set custom serializers in Python? Thanks, Joe Malt Engineering Intern, Stream Processing Yelp

Re: [External] Re: How to do test in Flink?

2018-08-24 Thread Joe Malt
h for fc:org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder Best, Joe Malt Engineering Intern, Stream Processing Yelp Inc. On Fri, Aug 24, 2018 at 4:50 AM, Chang Liu wrote: > No worries, I found it here: > > > org.apache.flink > flink-runtime_${scala.binary.version} > ${flink.

ClassCastException when writing to a Kafka stream with Flink + Python

2018-08-14 Thread Joe Malt
ang.String, which should happen automatically in Jython. I've tried adding a MapFunction that maps each input to String(input)where String is the constructor for java.lang.String. This made no difference; I get the same error. Any ideas? Thanks, Joe Malt Software Engineering Intern Yelp

Re: VerifyError when running Python streaming job

2018-08-08 Thread Joe Malt
ample, it works fine to me. > > An example of an official document may not guarantee your success due to > maintenance issues. > > cc @Chesnay > > [1]: https://github.com/apache/flink/blob/master/flink-libraries/flink- > streaming-python/src/test/python/org/apache/flink/ &

VerifyError when running Python streaming job

2018-08-07 Thread Joe Malt
would be much appreciated. Thanks, Joe Malt Software Engineer Intern Yelp

Using a custom DeserializationSchema with Kafka and Python

2018-08-06 Thread Joe Malt
ms that the internals of Flink can't find the class for some reason. The command I'm using to run the pipeline: ./pyflink-stream.sh /Users/jmalt/flink-python/KafkaRead.py /Users/jmalt/flink-python/MyCustomKafkaDeserializer.py - --local How can I make Flink see the custom deserializer?

Running a Python streaming job with Java dependencies

2018-07-25 Thread Joe Malt
he.flink.streaming.python.api.PythonStreamBinder -v "$FLINK_ROOT_DIR"/opt/flink-streaming-python*.jar "$@" Thanks, Joe Malt Engineering Intern, Stream Processing Yelp