Hello again,
We recently upgraded from Flink 1.12.3 to 1.14.0 and we were hoping it would
solve our issue with checkpointing with finished data sources. We need the
checkpointing to work to trigger Flink's GenericWriteAheadSink class.
Firstly, the constant mentioned on FLIP-147 that enables the feature isn't
available as far as we can see (ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH). It's
not in ConfigConstants or CheckpointConfig for example. So instead we enabled
with the following:
conf.setBoolean("execution.checkpointing.checkpoints-after-tasks-finish.enabled",
true);
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(config)
env.enableCheckpointing(30 * 1000);
...
We can see the constant available in 1.15 on Google but not the version we were
expecting (1.14.0).
Previously we had to have long Thread.sleep(x) in to keep the sources alive
when checkpoints were taken. When we enable this feature using the explicit
string and removed these hacks we start seeing these errors:
INFO [flink-akka.actor.default-dispatcher-7] o.a.f.r.e.ExecutionGraph Source:
Order JDBC Source (1/1) (e015c4f0910fb27e15fec063616ab785) switched from
RUNNING to FINISHED.
[some lines removed for brevity]
INFO [flink-akka.actor.default-dispatcher-7] o.a.f.r.c.CheckpointCoordinator
Triggering Checkpoint 5 for job 53d42ae669fad6cc8df2fe8f5706b48d failed due to
{}
org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager received a
checkpoint request for unknown task e015c4f0910fb27e15fec063616ab785. Failure
reason: Task local checkpoint failure.
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:966)
~[flink-runtime-1.14.0.jar:1.14.0]
at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source) ~[na:na]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:1.8.0_91]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
~[na:na]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
~[na:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
~[na:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
~[na:na]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
~[na:na]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[na:na]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[na:na]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
~[scala-library-2.11.12.jar:na]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
~[scala-library-2.11.12.jar:na]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
~[na:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
~[scala-library-2.11.12.jar:na]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
~[scala-library-2.11.12.jar:na]
at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[na:na]
at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[na:na]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[na:na]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[na:na]
at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[na:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[na:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[na:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[na:na]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
~[na:1.8.0_91]
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
~[na:1.8.0_91]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
~[na:1.8.0_91]
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
~[na:1.8.0_91]
FYI, if we don't enable this feature we see a different error consistent with
the older version of Flink:
INFO [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator Failed to
trigger checkpoint for job 532b20d72d407bf82570c86f672ccf2c because Some tasks
of the job have already finished and checkpointing with finished tasks is not
enabled. Failure reason: Not all required tasks are currently running.
Can anyone advise if this feature is indeed available and working in 1.14.0 and
how to correctly enable?
Thanks,
James.
________________________________
From: Austin Cawley-Edwards <[email protected]>
Sent: 04 November 2021 18:46
To: James Sandys-Lumsdaine <[email protected]>
Cc: [email protected] <[email protected]>
Subject: Re: GenericWriteAheadSink, declined checkpoint for a finished source
Hi James,
You are correct that since Flink 1.14 [1] (which included FLIP-147 [2]) there
is support for checkpointing after some tasks has finished, which sounds like
it will solve this use case.
You may also want to look at the JDBC sink[3] which also supports batching, as
well as some other nice things like retries and batch intervals.
Hope that helps,
Austin
[1]:
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
[3]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine
<[email protected]<mailto:[email protected]>> wrote:
Hello,
I have a Flink workflow where I need to upload the output data into a legacy
SQL Server database and so I have read the section in the Flink book about data
sinks and utilizing the GenericWriteAheadSink base class. I am currently using
Flink 1.12.3 although we plan to upgrade to 1.14 shortly.
Firstly, given I will be generating a large amount of data I feel it best to
use the GenericWriteAheadSink base class so I can bulk copy all the data into
my SQL Server database rather than attempt a row by row insertion which would
be too slow. Hopefully this is a good use case for this class or is there now a
better approach?
Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the
program actually exists before a final checkpoint is taken so I miss many of
the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC
source to exit. This might be related to FLINK-21215 as I see the following
error:
org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source
(1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)
With the extra Thread.sleep(5000) I see all the rows handled by the
sendValues() method.
I have included the test code below which just logs the "insertions" for now
(and doesn't do real db access) but demonstrates the problem:
private void checkpointTest() throws Exception {
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.enableCheckpointing(500);
MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams,
fromDttm, toDttm, asOf);
DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC
Source");
jdbcTradesStreamIn.transform("SqlServerSink",
TypeInformation.of(MyObj.class), new SqlServerBulkCopySink(
new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new
ExecutionConfig()),
UUID.randomUUID().toString()));
env.execute();
}
private static class SqlServerBulkCopySink extends GenericWriteAheadSink<MyObj>
{
public SqlServerBulkCopySink(CheckpointCommitter committer,
TypeSerializer<MyObj> serializer, String jobID) throws Exception {
super(committer, serializer, jobID);
}
@Override
protected boolean sendValues(Iterable<MyObj> objects, long checkpointId,
long timestamp) {
logger.info("Sending
{},{}-----------------------------------------------", checkpointId, timestamp);
for (MyObj myObj: objects)
logger.info(" {},{}: {}", checkpointId, timestamp, trade); // this
will eventually be a bulk copy insert into the SQL Server database
return true;
}
}
Am I right in thinking the latest versions of Flink will not suffer from this
problem or am I hitting something else? To be clear, I am expecting a
checkpoint to be invoked by Flink to cover all the data I want to insert into
my DB - how else would I do the final bulk copy if my sendValues() is not
called?
I have more questions about my data sink but I will wait to hear your answers.
Many thanks in advance,
James.