Mads Chr. Olesen created FLINK-13059:
----------------------------------------

             Summary: Cassandra Connector leaks Semaphore on Exception; hangs 
on close
                 Key: FLINK-13059
                 URL: https://issues.apache.org/jira/browse/FLINK-13059
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Cassandra
    Affects Versions: 1.8.0
            Reporter: Mads Chr. Olesen


In CassandraSinkBase the following code is present (comments are mine):

 
{code:java}
public void invoke(IN value) throws Exception {
   checkAsyncErrors();
   tryAcquire();
   //Semaphore held here

   final ListenableFuture<V> result = send(value);

   Futures.addCallback(result, callback); //Callback releases semaphore
}{code}
Any Exception happening inside send(value) will result in the semaphore not 
being released. Such exceptions are possible, e.g.
{code:java}
com.datastax.driver.core.exceptions.InvalidQueryException: Some partition key 
parts are missing: hest
at 
com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:50)
at 
com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:98)
at com.datastax.driver.mapping.Mapper.getPreparedQuery(Mapper.java:118)
at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:201)
at com.datastax.driver.mapping.Mapper.saveQuery(Mapper.java:163)
at 
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.send(CassandraPojoSink.java:128)
at 
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.invoke(CassandraSinkBase.java:131)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
{code}
The result of the semaphore not being released will be that when the exception 
bubbles out and causes the job to close, CassandraSinkBase.flush() will 
eventually be called. Flush will be deadlocked trying to acquire 
config.getMaxConcurrentRequests() from the semaphore, which has 1 less than 
that available.

The Flink job will thus be half-way closed, but marked as "RUNNING". 
Checkpointing will however fail with
{noformat}
INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 201325 of job XXX. 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException:
 Task Source: XXX (3/4) was not running {noformat}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to