Hi!
https://github.com/apache/flink/pull/2425 is one my list of “to reviews”.
However, at the moment I can not guarantee whether it’ll make it in time into
the 1.3 release.
If you would like to, please feel free to review and comment on the pull
request also!
We always appreciate extra pairs of
Thanks for clarifying.
From the looks of your exception:
Caused by: java.io.NotSerializableException:
com.sy.flink.test.Tuple2Serializerr$1
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStre
But it is not an inner class.
On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai
wrote:
> Since I don’t have your complete code, I’m guessing this is the problem:
> Is your `Tuple2Serializer` an inner class? If yes, you should be able to
> solve the problem by declaring `Tuple2Serializer` to
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve
the problem by declaring `Tuple2Serializer` to be `static`.
This is more of a Java problem -
It isn’t serializable if it isn’t static, because it
Hi Mohit,
I don’t completely understand your question, but I’m assuming that you know the
type of records your custom sink will be receiving, but you don’t know how to
extract values from the records.
Assume that the type of the incoming records will be `Tuple2`.
When writing your custom sink,
This is at high level what I am doing:
Serialize:
String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()
Deserialize:
String s = new String(message);
String [] sarr = s.split(",");
Tuple2 tuple = new Tuple2<>(Integer.valueOf(sarr[0]),
Integer.valueOf(sarr[1]));
return tuple;
Hi Mohit,
As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer`
contains fields that are not serializable, so `Tuple2Serializer` itself is not
serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we
can pinpoint the problem?
A snippet of
I am using String inside to convert into bytes.
On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 wrote:
> Hi Mohit
> As you did not give the whole codes of Tuple2Serializerr. I guess the
> reason is some fields of Tuple2Serializerr do not implement Serializable.
>
> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia :
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the
reason is some fields of Tuple2Serializerr do not implement Serializable.
2017-02-24 9:07 GMT+08:00 Mohit Anchlia :
> I wrote a key serialization class to write to kafka however I am getting
> this error. Not sure why
Currently, OutputFormat is used for DataSet, SinkFunction is used for
DataStream. Maybe I misunderstand your problem. That will be better if you
give more details.
2017-02-24 5:21 GMT+08:00 Mohit Anchlia :
> This works for Kafka but for the other types of sink am I supposed to use
> some type of
I wrote a key serialization class to write to kafka however I am getting
this error. Not sure why as I've already implemented the interfaces.
Caused by: java.io.NotSerializableException:
com.sy.flink.test.Tuple2Serializerr$1
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.jav
This works for Kafka but for the other types of sink am I supposed to use
some type of outputformat?
On Tue, Feb 21, 2017 at 7:13 PM, 刘彪 wrote:
> Hi
> I think there is a good way in FlinkKafkaProducerBase.java to deal with
> this situation. There is a KeyedSerializationSchema user have to implem
Mh. The user jar is put into every classpath. So the jobmanager /
taskmanagers are potentially affected by this as well.
Probably the data transfer between the TMs doesn't call the same methods as
the UI on the JobManager :)
The simplest solution is to shade your netty in the user jar into a
diffe
Hi Patrick,
as Robert said, partitionBy() shuffles the data such that all records with
the same key end up in the same partition. That's all it does.
groupBy() also prepares the data in each partition to be processed per key.
For example, if you run a groupReduce after a groupBy(), the data is fir
Hi Dmitry,
Cool! Looks like you've taken the right approach to analyze the performance
issues!
Often the deserialization of the input is already a performance killer :)
What is this one operator that is the bottleneck doing?
Does it have a lot of state? Is it CPU intensive, or talking to an exter
Hi Robert,
It definitely explains the behaviour.
This only applies to the frontend right?
If so what is the rationale behind it, and how should I handle the
dependency conflict?
Thanks,
Gyula
Robert Metzger ezt írta (időpont: 2017. febr. 23.,
Cs, 21:44):
> Hi,
> Since Flink 1.2 "per job yarn a
Hi Patrick,
I think (but I'm not 100% sure) its not a difference in what the engine
does in the end, its more of an API thing. When you are grouping, you can
perform operations such as reducing afterwards.
On a partitioned dataset, you can do stuff like processing each partition
in parallel, or so
Miguel and Vasia,
My thought is to change the example drivers to "print" verbose strings to
the console, for example:
Vertex ID: 0, vertex degree: 42, triangle count: 7, local clustering
coefficient: 0.00406504
Output to CSV will still be the compact tuple representations which do not
include der
May be .. I will try to log in to the machine directly and see ..
regards.
On Fri, Feb 24, 2017 at 2:05 AM, Robert Metzger wrote:
> Hi,
>
> It is possible that the stdout file is not properly available in the
> taskmanager UI.
> I guess if you log into the machine directly to get the stout file
Hi,
Since Flink 1.2 "per job yarn applications" (when you do "-m yarn-cluster")
include the job jar into the classpath as well.
Does this change explain the behavior?
On Thu, Feb 23, 2017 at 4:59 PM, Gyula Fóra wrote:
> Hi,
>
> I have a problem that the frontend somehow seems to have the user ja
Hi,
It is possible that the stdout file is not properly available in the
taskmanager UI.
I guess if you log into the machine directly to get the stout file, you'll
find the output.
On Thu, Feb 23, 2017 at 9:24 PM, Debasish Ghosh
wrote:
> Yes .. I was running Flink on a DC/OS cluster.
>
> AFAIR
Hi Shai,
I think we don't have so many users running Flink on Azure. Maybe you are
the first to put some heavy load onto that infrastructure using Flink.
I would guess that your problems are caused by the same root cause, just
the way the job is being cancelled is a bit different based on what is
Yes .. I was running Flink on a DC/OS cluster.
AFAIR I checked the taskmanager log from the Flink UI in Mesos. It said
stdout was not available. But this may be due to the fact that Flink on
DC/OS is not yet very stable ..
regards.
On Fri, Feb 24, 2017 at 1:41 AM, Robert Metzger wrote:
> Hi De
Hi Vinay!
If you see that the memory usage is different when you checkpoint, it can
be two things:
(1) RocksDB needs to hold onto some snapshot state internally while the
async background snapshot happens. That requires some memory.
(2) There is often data buffered during the alignment of checkpo
The FSStatebackend uses the heap to keep the data, only the state snapshots
are stored in the file system.
On Thu, Feb 23, 2017 at 6:13 PM, vinay patil
wrote:
> Hi,
>
> When I disabled checkpointing the memory usage is similar for all nodes,
> this means that for checkpointing enabled case the
Hi Debashish,
did you execute Flink in a distributed setting? print() will output the
stream contents on stdout on the respective worker node (taskmanager), not
on the machine that submitted the job.
On Thu, Feb 23, 2017 at 5:41 PM, Debasish Ghosh
wrote:
> I was facing a similar problem yesterd
Hi Gwen,
I would recommend looking into a data structure called RTree that is designed
specifically for this use case, i.e matching point to a region.
Thanks
Ankit
From: Fabian Hueske
Date: Wednesday, February 22, 2017 at 2:41 PM
To:
Subject: Re: Cross operation on two huge datasets
Hi Gwen,
Hi Robert,
In dev environment I load data via zipped csv files from s3.
Data is parsed in a case classes.
It's quite fast, I'm able to get ~80k/sec with only source and "dev/null"
sink.
Checkpointing is enabled with 1 hour intervals.
Yes, one of the operators is a bottleneck and it backpressures
Hi,
When I disabled checkpointing the memory usage is similar for all nodes,
this means that for checkpointing enabled case the data is first flushed
to memory of CORE nodes (DataNode daemon is running here in case of EMR ) .
I am going to run with FSStatebackend on a high end cluster with 122G
What is the basic difference between partitioning datasets by key or
grouping them by key ?
Does it make a difference in terms of paralellism ?
Thx
I was facing a similar problem yesterday. In my case print() was not
working. Try adding a Sink and write the output to another Kafka topic.
Something like
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Wri
Hi Vijay,
Regarding your second question: First of all, the example jobs of Flink
need to pass.
Secondly, I would recommend implementing a test job that uses a lot of
state, different state backends (file system and rocks) and some artificial
failures.
We at data Artisans have some testing jobs in
I think Till is referring to regular windows. The *All variants combine the
data into one task.
On Fri, Feb 17, 2017 at 4:14 PM, Sonex wrote:
> Hi Till,
>
> when you say parallel windows, what do you mean? Do you mean the use of
> timeWindowAll which has all the elements of a window in a single
Absolutely agreed. I have such a task on my todo list and I hope to find
time to address this soon.
On Mon, Feb 20, 2017 at 8:08 PM, Stephan Ewen wrote:
> How about adding this to the "logging" docs - a section on how to run
> log4j2
>
> On Mon, Feb 20, 2017 at 8:50 AM, Robert Metzger
> wrote:
Hi Dmitry,
sorry for the late response.
Where are you reading the data from?
Did you check if one operator is causing backpressure?
Are you using checkpointing?
Serialization is often the cause for slow processing. However, its very
hard to diagnose potential other causes without any details on
Hi Mohit,
is there new data being produced into the topic?
The properties.setProperty("auto.offset.reset", "earliest"); setting only
applies if you haven't consumed anything in this consumer group.
So if you have read all the data in the topic before, you won't see
anything new showing up.
On Sat
Hi,
exactly. You have to make sure that you can write data for the same ID
multiple times.
Exactly once in Flink is only guaranteed for registered state. So if you
have a flatMap() with a "counter" variable, that is held in a "ValueState",
this counter will always be in sync with the number of ele
Hi,
This looks like a shading issue. Can you post the classpath the JobManager
/ AppMaster is logging on startup on the mailing list?
If seems that Hadoop loads an unshaded version of the SecurityProtos.
Maybe there is some hadoop version mixup.
Are you using a Hadoop distribution (like CDH or HD
Hi,
I have a problem that the frontend somehow seems to have the user jar on
the classpath and it leads to a netty conflict:
https://gist.github.com/gyfora/4ec2c8a8a6b33adb80d411460432ce8d
So in the jobmanager logs I can see that my job started (running on YARN),
but can't access the frontend, i
Please ignore these messages.
I'll talk to the ASF infra how we can resolve the issue.
On Thu, Feb 23, 2017 at 4:54 PM, Robert Metzger wrote:
> I'm testing what happens if I'm sending an email to the user@flink list
> without being subscribed.
>
> On the dev@ list, moderators get an email in th
I'm testing what happens if I'm sending an email to the user@flink list
without being subscribed.
On the dev@ list, moderators get an email in that case.
I have the suspicion that you can post on the user@ list without
subscribing first. We have often users that ask a question, we give an
initial
I am working on a program that uses a complex window and have run into some
issues. It is a 1 hour window with 7 days allowed lateness including a custom
trigger that gives us intermediate results every 5 minutes of processing time
until the end of 7 days event time when a final fire is triggere
Hi Stephan,
Anyways the Async exception is gone.
I have increased my instance type to r3.2xlarge having 60GB of memory.
BUt what I have observed here is that for two task managers the memory usage
is close to 30GB but for other two it goes up to 55GB, the load is equally
distributed among all TM'
For reasons I cannot grasp, I am unable to move ahead.
Here's the code:
-
import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.
Hi,
@Gwen, sorry that I missed the cross function and showed you the wrong way.
@Fabian's answers are what I mean.
Considering that the cross function is so expensive, can we find a way to
restrict the broadcast. That is, if the groupBy function is a many-to-one
mapping, the cross function is an
HI,
Yes i have written custom jdbc sink function based on the jdbcoutformat
for streaming and its working and writing records in postgres db or H2
in memory db. However trying to figure out how many times open method is
called and establishes database connection because for my integration
tes
Hi Gwen,
sorry I didn't read your answer, I was still writing mine when you sent
yours ;-)
Regarding your strategy, this is basically what Cross does:
It keeps on input partitioned and broadcasts (replicates) the other one. On
each partition, it combines the records of the partition of the first
Hi,
Flink's batch DataSet API does already support (manual) theta-joins via the
CrossFunction. It combines each pair of records of two input data sets.
This is done by broadcasting (and hence replicating) one of the inputs.
@Xingcan, so I think what you describe is already there.
However, as I sai
Hi and thanks for your answers !
I’m not sure I can define any index to split the workload since in my case any
point could be in any zone...
I think I’m currently trying to do it the way you call “theta-join”:
1- Trying to split one dataset over the cluster and prepare it for work
against
And now it's happening again
-Original Message-
From: Shai Kaplan [mailto:shai.kap...@microsoft.com]
Sent: Wednesday, February 22, 2017 12:02 PM
To: user@flink.apache.org
Subject: RE: Flink checkpointing gets stuck
I changed the checkpoint interval to 30 minutes, and also switched RocksD
50 matches
Mail list logo