Folks,
I am using the CEP library to create ComplexEvents. My question is, should
the ComplexEvents be serializable?
Thanks Ziyad. That was a cut and paste error. Anyway, I figured out a
solution to the issue. All of my Flink dependancies were pointing at 1.3.1.
Pointing at 1.3.0 resolved the issue.
On Wed, Jul 12, 2017 at 2:17 AM, Ziyad Muhammed wrote:
> Hi Sridhar
>
> Are you using *ParameterTool *to set the
Hi Sridhar
Are you using *ParameterTool *to set the properties? I couldn't see it in
your code, but you use it in the below line:
FlinkKafkaConsumer010 flinkConsumer =
new FlinkKafkaConsumer010(
Arrays.asList(parameterTool.getRequired("topic").split(",")), new
LogDeserializationS
Let us assume that I want to perform some kind of aggregation in specified
time windows (e.g. tumbling window of 1 minute) and my aggregation operation
is associative. Wouldn't it be possible to represent windowAll in runtime as
/parallelism + 1/ operator instances where /parallelism/ number of ope
Dear Ziyad,
could you kindly share some additional info about your environment
(local/cluster, nodes, machines' configuration)?
What does exactly you mean by "indefinitely"? How much time the job is
hanging?
Hope to help you, then.
Cheers,
Andrea
--
View this message in context:
http://apach
(back to list)
state.checkpoints.dir is a configuration parameter which you set in the flink
configuration itself (see [1]). This will be used for checkpoint metadata only
(for RocksDB and Fs) while the checkpoints themselves are stored in the given
directory.
Nico
[1] https://ci.apache.org/
Hi all,
I am trying to use S3 backend with custom endpoint. However, it is not
supported in hadoop-aws@2.7.3, I need to use at least 2.8.0 version. The
underyling reason is that the requests are being sent as following
DEBUG [main] (AmazonHttpClient.java:337) - Sending Request: HEAD
http://mustaf
I am pretty sure I am doing something wrong here. Just that I do not
understand why?
I wrote a small program that reads messages from Kafka and prints it out.
public class Main {
private static final int CHECKPOINT_INTERVAL = 10;
private static Properties getpropsFromEnv() {