Hi Juho,
I tried multi times follow the simple code you privoded, but still can't
reproduce the bug you met. There's one more question I'd like to confirm with
you, is the stateRetentionMillis a fixed(final) field or it might be changed on
some condition?
Best, Sihua
On 05/19/2018 08:19,sihua
Sorry for the incorrect information, that's not the case.
Best, Sihua
On 05/19/2018 07:58,sihua zhou wrote:
Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is
that we are currently at the RC4...
The bug is here.
try (RocksIteratorWrapper iter
Hi Juho & Stefan,
just a quick summarize, good news is that I think I found the bug, bad news is
that we are currently at the RC4...
The bug is here.
try (RocksIteratorWrapper iterator = getRocksIterator(restoreDb,
columnFamilyHandle)) {
int startKeyGroup = stateBackend.getKeyGroupRange().getS
Hi Juho,
thanks for trying this out. I'm running out of myself now... Let's do bref
summarize.
- have we reached a consensus that the map state losts when restoring?(because
you can restore successfully without rescaling)
- the timer state is correctly restore, because for timer, when restoring
Tested with
http://people.apache.org/~trohrmann/flink-1.5.0-rc4/flink-1.5.0-bin-hadoop27-scala_2.11.tgz
.
It hits the same problem.
Btw, why is this error logged on INFO level?
2018-05-18 09:03:52,595 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
PlatformIDsProcessFunction
Thanks Sihua, I'll give that RC a try.
On Fri, May 18, 2018 at 10:58 AM, sihua zhou wrote:
> Hi Juho,
> would you like to try out the latest RC(http://people.apache.org/~
> trohrmann/flink-1.5.0-rc4/) to rescaling the job from the "problematic"
> checkpoint? The latest RC includes a fix for the
> If you say that you can reproduce the problem, does that mean reproduce
from the single existing checkpoint
Yes.
I haven't tried creating another checkpoint and rescaling from it. I can
try that.
> We are including rescaling in some end-to-end tests now and then let’s
see what happens.
If I u
Hi Juho,
would you like to try out the latest
RC(http://people.apache.org/~trohrmann/flink-1.5.0-rc4/) to rescaling the job
from the "problematic" checkpoint? The latest RC includes a fix for the
potential silently data lost. If it's the reason, you will see a different
exception when you tryin
Hi,
I had a look at the logs from the restoring job and couldn’t find anything
suspicious in them. Everything looks as expected and the state files are
properly found and transferred from S3. We are including rescaling in some
end-to-end tests now and then let’s see what happens.
If you say th
I see. I appreciate keeping this option available even if it's "beta". The
current situation could be documented better, though.
As long as rescaling from checkpoint is not officially supported, I would
put it behind a flag similar to --allowNonRestoredState. The flag could be
called --allowRescal
Hi,
>
> This raises a couple of questions:
> - Is it a bug though, that the state restoring goes wrong like it does for my
> job? Based on my experience it seems like rescaling sometimes works, but then
> you can have these random errors.
If there is a problem, I would still consider it a bug
Hi Juho,
I'm agree with you that sometimes it seems like a nightmare to create a
savepoint for a state heavy job, it's also the reason that we use the
checkpoint to recover the job. In fact, in our cases, it often takes more that
10min to take a savepoint successfully...Even though, we didn't me
Thanks Sihua.
Stefan wrote: "we do not want that user rely on their checkpoints to be
rescalable" (
https://github.com/apache/flink/pull/5490#issuecomment-365887734)
This raises a couple of questions:
- Is it a bug though, that the state restoring goes wrong like it does for
my job? Based on my e
Hi, Juho
> If restoring + rescaling a checkpoint is not supported properly, I don't
> understand why Flink doesn't entirely refuse to restore in that case?
I think you're asking the question I have asked in
https://github.com/apache/flink/pull/5490, you can refer to it and find the
comments
Yes, I'm rescaling from a checkpoint.
> that behavior is not guaranteed yet
If restoring + rescaling a checkpoint is not supported properly, I don't
understand why Flink doesn't entirely refuse to restore in that case?
Note that I'm using a rather new 1.5-SNAPSHOT, not release 1.4. To be
exact,
Hi,
I think that is very good to know and fix. It feels a bit like a not so nice
API design in RocksDB that iterators are required to check on two methods and
the documentation of this is also newer than most of our RocksDB code, so an
update there clearly makes sense.
@Sihua: if you want to f
Hi,
I have a bref loop of the code that related to the restoring of incremental
checkpoint, not abvious bug could be found. But there is a suspicious loophole
that may lead to data loss, the suspicious code is pasted below.
while (iterator.isValid()) {
int keyGroup = 0;
for (int j = 0; j <
Hi Juho,
if I'm not misunderstand, you saied your're rescaling the job from the
checkpoint? If yes, I think that behavior is not guaranteed yet, you can find
this on the doc
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#difference-to-savepoints.
So, I no
I was able to reproduce this error.
I just happened to notice an important detail about the original failure:
- checkpoint was created with a 1-node cluster (parallelism=8)
- restored on a 2-node cluster (parallelism=16), caused that null exception
I tried restoring again from the problematic che
Hi,
> Am 15.05.2018 um 10:34 schrieb Juho Autio :
>
> Ok, that should be possible to provide. Are there any specific packages to
> set on trace level? Maybe just go with org.apache.flink.* on TRACE?
The following packages would be helpful:
org.apache.flink.contrib.streaming.state.*
org.apache.
> Btw having a trace level log of a restart from a problematic checkpoint
could actually be helpful
Ok, that should be possible to provide. Are there any specific packages to
set on trace level? Maybe just go with org.apache.flink.* on TRACE?
> did the „too many open files“ problem only happen wi
Btw having a trace level log of a restart from a problematic checkpoint could
actually be helpful if we cannot find the problem from the previous points.
This can give a more detailed view of what checkpoint files are mapped to which
operator.
I am having one more question: did the „too many op
What I would like to see from the logs is (also depending a bit on your log
level):
- all exceptions.
- in which context exactly the „too many open files“ problem occurred, because
I think for checkpoint consistency it should not matter as a checkpoint with
such a problem should never succeed.
Thanks all. I'll have to see about sharing the logs & configuration..
Is there something special that you'd like to see from the logs? It may be
easier for me to get specific lines and obfuscate sensitive information
instead of trying to do that for the full logs.
We basically have: RocksDBStateB
Hi,
I agree, this looks like a bug. Can you tell us your exact configuration of the
state backend, e.g. if you are using incremental checkpoints or not. Are you
using the local recovery feature? Are you restarting the job from a checkpoint
or a savepoint? Can you provide logs for both the job t
Hi Juho,
As Sihua said, this shouldn't happen and indicates a bug. Did you only
encounter this once or can you easily reproduce the problem?
Best,
Aljoscha
> On 15. May 2018, at 05:57, sihua zhou wrote:
>
> Hi Juho,
> in fact, from your code I can't see any possible that the MapState could be
Hi Juho,
in fact, from your code I can't see any possible that the MapState could be
inconsistency with the timer, it's looks like a bug to me, because once the
checkpoint's complete and you haven't query the state in a customer thread
async, then the result of the checkpoint should be consisten
Hi Juho,
You are right, there's no transactional guarantee on timers and state in
processElement(). They may end up with inconsistency if your job was
cancelled in the middle of processing an element.
To avoid the situation, the best programming practice is to always check if
the state you're try
We have a Flink streaming job (1.5-SNAPSHOT) that uses timers to clear old
state. After restoring state from a checkpoint, it seems like a timer had
been restored, but not the data that was expected to be in a related
MapState if such timer has been added.
The way I see this is that there's a bug,
29 matches
Mail list logo