+1 for dropping support for Scala 2.10
On Tue, Sep 19, 2017 at 3:29 AM, Sean Owen wrote:
> For the curious, here's the overall task in Spark:
>
> https://issues.apache.org/jira/browse/SPARK-14220
>
> and most of the code-related changes:
>
> https://github.com/apache/spark/pull/18645
>
> and wh
> How did you kill the TaskManagers? I assume you didn't kill the JVM
process because otherwise you wouldn't see the finalizer objects piling up.
Till, I configure Chao Monkey to always kill the newest/same TaskManager.
So other N-1 TaskManagers stayed up during the whole process. Each of them
exp
Hi Till,
Thanks for your answer,it worked when i use StandaloneMiniCluster,but
another problem is that i can’t find a way to cancel
a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster
i can do it with below code :
for (job <- cluster.getCurrentlyRunningJo
I just did the same test as you had with SocketWindowWordCount, and the
counter showed up all right.
You should probably connect Jconsole to localhost:28781 (or whatever port
you have your JMX server listened on)
That's how I setup the env, perhaps there is other better ways to do it.
On Wed, Se
Still got stuck, here are my steps (on my laptop)
for example:
Step1:
public class MetricsTest extends RichMapFunction {
private static final long serialVersionUID = 1L;
private org.apache.flink.metrics.Meter meter;
private Counter counter;
@Override
public void open(Configurat
Copied from my earlier response to some similar question:
"Here is a short description for how it works: there are totally 3 threads
working together, one for reading, one for sorting partial data in memory,
and the last one is responsible for spilling. Flink will first figure out
how many memory
Thanks for your response!
Recommendation to decrease allotted memory? Which allotted memory should be
decreased?
I tried decreasing taskmanager.memory.fraction to give more memory to user
managed operations, that doesn't work beyond a point. Also tried increasing
containerized.heap-cutoff-ratio,
Thanks Eron & Fabian.
The issue was hitting a yarn proxy url vs the node itself. For example this
worked
http://
{ip}:37716/jobs/1a0fd176ec8aabb9b8464fa481f755f0/cancel-with-savepoint/target-directory/s3%3A%2F%2F%2Fremit-flink
But this did not
http://
{ip}:20888/proxy/application_1504649135200_00
Good news, it can be done if you carefully encode the target directory with
percent-encoding, as per:
https://tools.ietf.org/html/rfc3986#section-2.1
For example, given the directory `s3:///savepoint-bucket/my-awesome-job`,
which encodes to `s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job`, I was
Hi Emily,
thanks for reaching out.
I'm not familiar with the details of the Rest API but Ufuk (in CC) might be
able to help you.
Best, Fabian
2017-09-19 10:23 GMT+02:00 Emily McMahon :
> I've tried every combination I can think of to pass an s3 path as the
> target directory (url encode, includ
Hello, the current behavior is that Flink holds onto received offers for up
to two minutes while it attempts to provision the TMs. Flink can combine
small offers to form a single TM, to combat fragmentation that develops
over time in a Mesos cluster. Are you saying that unused offers aren't
bei
Fabian,
It looks like hive instantiates both input and output formats when doing
either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler
where it tries to load both. It looks like its happening after the writes
complete and flink is in the finish/finalize stage. When I watch the
Hi Tovi,
your code looks OK to me. Maybe Gordon (in CC) has an idea what is going
wrong.
Just a side note: you don't need to set the parallelism to 2 to read from
two partitions. A single consumer instance reads can read from multiple
partitions.
Best,
Fabian
2017-09-19 17:02 GMT+02:00 Sofer, To
The number of timers is about 400 per second. We have observed that onTimer
calls are delayed only when the number of scheduled timers starts
increasing from a minima. It would be great if you can share pointers to
code I can look at to understand it better. :)
Narendra Joshi
On 14 Sep 2017 16:04,
Stephan, agree that it is not a real memory leak. I haven't found it
affecting the system. so it is sth odd for now.
but if it is not really necessary, why do we want to defer memory release
with unpredictable behavior? can StreamTask stop() method take care of the
cleanup work and don't need to r
Hi Kostas,
Thank you for the quick reply and the tips. I will check them out !
I would like to start by understanding the way secondary storage is used in
batch processing.
If you guys have additional pointers on that, it would certainly help me a lot.
Thanks again,
Florin
__
Hi Florin,
Unfortunately, there is no design document.
The UnilateralSortMerger.java is used in the batch processing mode (not is
streaming) and,
in fact, the code dates some years back. I cc also Fabian as he may have more
things to say on this.
Now for the streaming side, Flink uses 3 state
Hello everyone,
In our group at EPFL we're doing research on understanding and potentially
improving the performance of data-parallel frameworks that use secondary
storage.
I was looking at the Flink code to understand how spilling to disk actually
works.
So far I got to the UnilateralSortMe
Hello guys,
We have a flink 1.3.2 session deployed from Marathon json to Mesos with some of
the following parameters as environment variables:
"flink_mesos.initial-tasks": "8",
"flink_mesos.resourcemanager.tasks.mem": "4096",
And other environment variables including zookeeper, etc.
The mesos
If I apply a sharing slot as in the example:
DataStream LTzAccStream = env
.addSource(new FlinkKafkaConsumer010<>("topic",
new
CustomDeserializer(), properties))
.assignTimestampsAndWatermarks(new
CustomTimestampExtractor())
Till,
Using 1.3.2 and like Ufuk mentioned, using S3 for checkpointing.
On Tue, Sep 19, 2017 at 4:28 AM, Till Rohrmann wrote:
> Hi Elias,
>
> which version of Flink and which state backend are you running? I tried to
> reproduce it and wasn't successful so far.
>
> We recently changed a bit how
Hi Fabian,
This is a good advice, but I had already tried adding random value to my
data and it seems not very useful.
The key set of my data is small, around 10 ~ 20. If the range of random
number is small, the distribution might not be better, even worse. I think
the reason is that KeyedStream
Hi Garrett,
Flink distinguishes between two classloaders: 1) the system classloader
which is the main classloader of the process. This classloader loads all
jars in the ./lib folder and 2) the user classloader which loads the job
jar.
AFAIK, the different operators do not have distinct classloader
Hi,
I am trying to setup FlinkKafkaConsumer which reads from two partitions in
local mode, using setParallelism=2.
The producer writes to two partition (as it is shown in metrics report).
But the consumer seems to read always from one partition only.
Am I missing something in partition configura
Hi Tony,
operator state can only be kept on the heap.
One thing you could try is to add a random value to you data and keyBy on a
composite key that consists of your original key and the random value.
It is important though, that you actually add the random value to your data
to ensure that the e
The UUIDs are assigned.
As far as I can see (inspecting the metrics and how the task behaves) the
mergeElements apply function receives all the elements (the main element and
the other elements that it expects) so it seems that the correlation is
correct. Also, nothing indicates that there are el
PS: To answer the question. No, I think there is no reason for this
and it shouldn't happen. :-(
On Tue, Sep 19, 2017 at 2:44 AM, Elias Levy wrote:
> Is there a particular reason that GlobalConfiguration is so noisy?
>
> The task manager log is full of "Loading configuration property" messages
>
I saw this too recently when using HadoopFileSystem for checkpoints
(HDFS or S3). I thought I had opened an issue for this, but I didn't.
Here it is: https://issues.apache.org/jira/browse/FLINK-7643
On Tue, Sep 19, 2017 at 1:28 PM, Till Rohrmann wrote:
> Hi Elias,
>
> which version of Flink and
Hi,
I have a basic streaming job that continuously persist data from Kafka to
S3.
Those data would be grouped by some dimensions and a limited amount.
Originally, I used 'keyBy' and key state to fulfill the requirement.
However, because the data is extremely skewed, I turned to use map function
t
Hi Elias,
which version of Flink and which state backend are you running? I tried to
reproduce it and wasn't successful so far.
We recently changed a bit how we load the GlobalConfiguration in
combination with dynamic properties [1]. Maybe this has affected what
you've reported as well.
[1] http
For the curious, here's the overall task in Spark:
https://issues.apache.org/jira/browse/SPARK-14220
and most of the code-related changes:
https://github.com/apache/spark/pull/18645
and where it's stuck at the moment:
https://mail-archives.apache.org/mod_mbox/spark-dev/201709.mbox/%3CCAMAsSdK
Hi,
Are the UUIDs randomly generated when calling .uuid or are they assigned and
then .uuid will return the same UUID when calling multiple times? The latter
would be problematic because we would not correctly assign state.
Best,
Aljoscha
> On 19. Sep 2017, at 11:41, Fabian Hueske wrote:
>
>
There is no notion of "full" in Flink except that one slot will run at most
one subtask of each operator.
The scheduling depends on the structure of the job, the parallelism of the
operators, and the number of slots per TM.
It's hard to tell without knowing the details.
2017-09-19 11:57 GMT+02:00
Hi Aljoscha,
I am in favor of the change. No concerns on my side, just one remark that I
have talked to Sean last week (ccd) and he mentioned that he has faced some
technical issues while driving the transition from 2.10 to 2.12 for Spark.
It had to do with changes in the scope of implicits. You m
Given that the last maintenance release Scala 2.10.6 is from about 2 years
ago, I would also be in favour of dropping Scala 2.10 support from Flink.
This will make maintenance easier for us and allow us to drop artifacts
like Flakka.
Cheers,
Till
On Tue, Sep 19, 2017 at 11:56 AM, Aljoscha Krettek
Thank you, unfortunately it had no effects.
As I add more load on the computation appears the error taskmanager killed
on the node on use, without calling other nodes to sustain the computation.
I also increased
akka.watch.heartbeat.interval
akka.watch.heartbeat.pause
akka.transport.heartbeat.i
Hi!
>From my understanding, overriding finalize() still has some use cases and
is valid if done correctly, (although PhantomReference has more control
over the cleanup process). finalize() is still used in JDK classes as well.
Whenever one overrides finalize(), the object cannot be immediately ga
So Flink use the other nodes just if one is completely "full" ?
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Hi Le Xu,
the reason why all different SocketTextStreamFunction sources are scheduled
to the same machine is because of slot sharing. Slot sharing allows Flink
to schedule tasks belonging to different operators into the same slot. This
allows, for example, to achieve better colocation between task
Hi,
Talking to some people I get the impression that Scala 2.10 is quite outdated
by now. I would like to drop support for Scala 2.10 and my main motivation is
that this would allow us to drop our custom Flakka build of Akka that we use
because newer Akka versions only support Scala 2.11/2.12 a
If this would be the case, that would be a bug in Flink.
As I said before, your implementation looked good to me.
All state of window and trigger should be wiped if the trigger returns
FIRE_AND_PURGE (or PURGE) and it's clean() method is correctly implemented.
I'll CC Aljoscha again for his opinio
createInstance(Object[] fields) at TupleSerializerBase seems not to be part
of TypeSerializer API.
Will I be loosing any functionality? In what cases do you use this instead
of createInstance()?
// We use this in the Aggregate and Distinct Operators to create instances
// of immutable Tuples (i.e.
Thanks Fabian, I'll take a look to these improvements.
I was wondering if the increasing state size could be due to that the UUID
used in the keyBy are randomly generated. Maybe even if I correctly delete
all the state related to a given key there is still some metadata related to
the key wanderin
Hi,
Flink's scheduling aims to co-located tasks to reduce network communication
and ease the reasoning about resource/slot consumption.
A slot can execute one subtask of each operator of a program, i.e, a
parallel slice of the program.
You can control the scheduling of tasks by specifying resourc
I've tried every combination I can think of to pass an s3 path as the
target directory (url encode, include trailing slash, etc)
I can successfully pass a local path as the target directory (ie
/jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't think
there's a problem with the job
Thanks for the correction and the pointers Eron!
Cheers, Fabian
2017-09-18 18:34 GMT+02:00 Eron Wright :
> Unfortunately Flink does not yet support SSL mutual authentication nor any
> form of client authentication. There is an ongoing discussion about it:
> http://apache-flink-mailing-list-arc
Hi Gerard,
I had a look at your Trigger implementation but did not spot something
suspicious that would cause the state size to grow.
However, I notices a few things that can be improved:
- use ctx.getCurrentProcessingTime instead of System.currentTimeMillis to
make the Trigger easier to test (th
Have a look at the TupleTypeInfo class. It has a constructor that
accepts an array of TypeInformation,
and supports automatically generating a serializer from them.
On 18.09.2017 18:28, nragon wrote:
One other thing :). Can i set tuple generic type dynamically?
Meaning, build a tuple of N arity
48 matches
Mail list logo