Re: Scatter-Gather Iteration aggregators

2016-05-13 Thread Vasiliki Kalavri
Hi Lydia,

an iteration aggregator combines all aggregates globally once per superstep
and makes them available in the *next* superstep.
Within each scatter-gather iteration, one MessagingFunction (scatter phase)
and one VertexUpdateFunction (gather phase) are executed. Thus, if you set
an aggregate value within one of those, the value will be available in the
next superstep. You can retrieve it calling
the getPreviousIterationAggregate() method.
Let me know if that clears things up!

-Vasia.

On 13 May 2016 at 08:57, Lydia Ickler  wrote:

> Hi Vasia,
>
> yes, but only independently within each Function or not?
>
> If I set the aggregator in VertexUpdateFunction then the newly set value
> is not visible in the MessageFunction.
> Or am I doing something wrong? I would like to have a shared aggregator
> to normalize vertices.
>
>
> Am 13.05.2016 um 08:04 schrieb Vasiliki Kalavri  >:
>
> Hi Lydia,
>
> registered aggregators through the ScatterGatherConfiguration are
> accessible both in the VertexUpdateFunction and in the MessageFunction.
>
> Cheers,
> -Vasia.
>
> On 12 May 2016 at 20:08, Lydia Ickler  wrote:
>
>> Hi,
>>
>> I have a question regarding the Aggregators of a Scatter-Gather
>> Iteration.
>> Is it possible to have a global aggregator that is accessible in 
>> VertexUpdateFunction()
>> and MessagingFunction() at the same time?
>>
>> Thanks in advance,
>> Lydia
>>
>
>
>


Re: Confusion about multiple use of one ValueState

2016-05-13 Thread Balaji Rajagopalan
Even thought there are multiple instance of map object transient value
object state is accessible across the object, so as the stream is flowing
in the value can be updated based on application logic.

On Fri, May 13, 2016 at 11:26 AM, Balaji Rajagopalan <
balaji.rajagopa...@olacabs.com> wrote:

> I don't think the valuestate defined in one map function is accessible in
> other map function this is my understanding, also you need to be aware
> there will be instance of map function created for each of your tuple in
> your stream, I had a similar use case where I had to pass in some state
> from one map function to another, I used redis for that.
>
> On Fri, May 13, 2016 at 8:58 AM, Nirmalya Sengupta <
> sengupta.nirma...@gmail.com> wrote:
>
>> Hello all,
>>
>> Let's say I want to hold some state value derived during one
>> transformation, and then use that same state value in a subsequent
>> transformation? For example:
>>
>> myStream
>> .keyBy(fieldID) // Some field ID, may be 0
>> .map(new MyStatefulMapper())
>> .map(new MySubsequentMapper())
>> 
>>
>> Now, I define MyStatefulMapper in the usual fashion:
>>
>> public class MyStatefulMapper extends RichFlatMapFunction> Long>, Tuple2> {
>>
>> /** * The ValueState handle. The first field is the count, the 
>> second field a running sum. */
>> private transient ValueState> sum;
>>
>> @Override
>> public void flatMap(Tuple2 input, Collector> Long>> out) throws Exception {
>>
>>// logic of accessing and updating the ValueState 'sum' above
>> }
>>
>> @Override
>> public void open(Configuration config) {
>> ValueStateDescriptor> descriptor =
>> new ValueStateDescriptor<>(
>> "mySum", // the state name
>> TypeInformation.of(new TypeHint> Long>>() {}), // type information
>> Tuple2.of(0L, 0L)); // default value of the state, 
>> if nothing was set
>> sum = getRuntimeContext().getState(descriptor);
>> }}
>>
>>
>> So, by now, RuntimeContext has registered a State holder named 'mySum'.
>>
>> In the implementation of 'MySubsequentMapper', I need to access this
>> State holder named 'mySum', perhaps thus (my thinking, I may be wrong):
>>
>> public class MySubsequentMapper extends RichFlatMapFunction> Long>, Tuple2> {
>>
>> /** * The ValueState handle. The first field is the count, the 
>> second field a running sum. */
>> private transient ValueState> aSubsequentSum;
>>
>> private transient ValueState> sum; // defined earlier
>>
>>
>> @Override
>> public void flatMap(Tuple2 input, Collector> Long>> out) throws Exception {
>>
>>// logic of accessing and updating the ValueState 'aSubsequentSum' 
>> above
>>
>>// but this logic depends on the current contents of ValueState 'sum' 
>> created earlier
>> }
>>
>> @Override
>> public void open(Configuration config) {
>> // Logic to create ValueDescriptor for 'aSubsequentSum' which is 
>> owned by this operator
>>
>> // ...
>>
>> // Question: now, how do I prepare for accessing 'sum' which is a 
>> State holder, but created inside an earlier operator?
>> sum = getRuntimeContext().getState(descriptor) // how can I pass the 
>> name 'mySum' (used in StateDescriptor)?
>> }}
>>
>> I have two questions:
>>
>> 1) What I am trying to achieve: is that possible and even, advisable? If
>> not, then what is the alternative?
>> 2) Is there a guarantee that Flink will execute MyStatefulOperator.open()
>> always before MySubsequentOperator.open() because of the lexical order of
>> appearance in the source code?
>>
>> -- Nirmalya
>>
>>
>>
>>
>> --
>> Software Technologist
>> http://www.linkedin.com/in/nirmalyasengupta
>> "If you have built castles in the air, your work need not be lost. That
>> is where they should be.
>> Now put the foundation under them."
>>
>
>


Re: Confusion about multiple use of one ValueState

2016-05-13 Thread Nirmalya Sengupta
Hello Balaji ,

Thanks for your reply. This confirms my earlier assumption that one of
usual ways to do it is to hold and nurture the application-state in an
external body; in your case: Redis.

So, I am trying to understand how does one share the handle to this
external body amongst partitions: do I create a Connector to a Redis
instance (referring to your case as an example) at the beginning of Flink
application and share that amongst partitions using the _Broadcast_
mechanism? Obviously, the assumption is that the external body (Redis, in
this case) will have to deal with concurrent access of elements of State,
and updation etc. Operators simply call an API on Redis to store and
retrieve elements from the application-state.

Is my understanding correct?

Yes, I am aware of the fresh creation of a Mapper for every tuple that
comes in. In fact, this was the source of my original doubt before I posted
the question. Thanks again for underscoring that.

-- Nirmalya


-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Re: Scatter-Gather Iteration aggregators

2016-05-13 Thread Lydia Ickler
Hi Vasia, 

okay I understand now :)
So it works fine if I want to collect the sum of values.
But what if I need to reset the DoubleSumAggregator back to 0 in order to then 
set it to a new value to save the absolute maximum?
Please have a look at the code above. 

Any idea why it is not working?
 

public static class VertexDistanceUpdater extends VertexUpdateFunction {

DoubleSumAggregator aggregator = new DoubleSumAggregator();

public void preSuperstep() {
// retrieve the Aggregator
aggregator = getIterationAggregator("sumAggregator");
}

public void updateVertex(Vertex vertex, 
MessageIterator inMessages) {
double sum = 0;
for (double msg : inMessages) {
sum = sum + (msg);
}

if((Math.abs(sum) > Math.abs(aggregator.getAggregate().getValue({

aggregator.reset();
aggregator.aggregate(sum);

}
setNewVertexValue(sum);
}
}



> Am 13.05.2016 um 09:25 schrieb Vasiliki Kalavri :
> 
> Hi Lydia, 
> 
> an iteration aggregator combines all aggregates globally once per superstep 
> and makes them available in the *next* superstep.
> Within each scatter-gather iteration, one MessagingFunction (scatter phase) 
> and one VertexUpdateFunction (gather phase) are executed. Thus, if you set an 
> aggregate value within one of those, the value will be available in the next 
> superstep. You can retrieve it calling the getPreviousIterationAggregate() 
> method.
> Let me know if that clears things up!
> 
> -Vasia.
> 
> On 13 May 2016 at 08:57, Lydia Ickler  > wrote:
> Hi Vasia, 
> 
> yes, but only independently within each Function or not?
> 
> If I set the aggregator in VertexUpdateFunction then the newly set value is 
> not visible in the MessageFunction.
> Or am I doing something wrong? I would like to have a shared aggregator to 
> normalize vertices.
> 
> 
>> Am 13.05.2016 um 08:04 schrieb Vasiliki Kalavri > >:
>> 
>> Hi Lydia,
>> 
>> registered aggregators through the ScatterGatherConfiguration are accessible 
>> both in the VertexUpdateFunction and in the MessageFunction.
>> 
>> Cheers,
>> -Vasia.
>> 
>> On 12 May 2016 at 20:08, Lydia Ickler > > wrote:
>> Hi,
>> 
>> I have a question regarding the Aggregators of a Scatter-Gather Iteration.
>> Is it possible to have a global aggregator that is accessible in 
>> VertexUpdateFunction() and MessagingFunction() at the same time?
>> 
>> Thanks in advance,
>> Lydia
>> 
> 
> 



Re: checkpoints not being removed from HDFS

2016-05-13 Thread Maciek Próchniak

Hi Ufuk,

It seems I messed it up a bit :)
I cannot comment on jira, since it's temporarily locked...

1. org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is non
empty': Directory is not empty - this seems to be expected behaviour, as 
AbstractFileStateHandle.discardState():


// send a call to delete the checkpoint directory containing the file. 
This will

// fail (and be ignored) when some files still exist
try {
   getFileSystem().delete(filePath.getParent(), false);
} catch (IOException ignored) {}

- so this is working as expected, although it causes a lot of garbage in 
hdfs logs...


2. The problem with not discarded checkpoints seems to be related to 
periods when we don't have any traffic (during night).

At that point many checkpoints "expire before completing":
2016-05-13 00:00:10,585 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Triggering checkpoint 199 @ 1463090410585
2016-05-13 00:10:10,585 [Checkpoint Timer] INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Checkpoint 199 expired before completing.
2016-05-13 00:25:14,650 [flink-akka.actor.default-dispatcher-280300] 
WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Received late message for now expired checkpoint attempt 199


When checkpoint manage to complete they take v. long to do so:
2016-05-13 00:25:19,071 [flink-akka.actor.default-dispatcher-280176] 
INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - 
Completed checkpoint 201 (in 308472 ms)


- this is happening when no new messages arrive (we have simple process 
like kafka->keyBy->custom state aggregation->kafka, with EventTime time 
characteristic)
I think I messed sth up with eventTime & generating watermarks - I'll 
have to check it.
With RocksDB I made checkpoints at much larger intervals, so probably 
that's why I haven't noticed the disk is getting full.

OTOH - shouldn't expired checkpoints be cleaned up automatically?


Sorry for confustion and thanks for help

thanks,
maciek


On 12/05/2016 21:28, Maciek Próchniak wrote:

thanks,
I'll try to reproduce it in some test by myself...

maciek

On 12/05/2016 18:39, Ufuk Celebi wrote:

The issue is here: https://issues.apache.org/jira/browse/FLINK-3902

(My "explanation" before dosn't make sense actually and I don't see a
reason why this should be related to having many state handles.)

On Thu, May 12, 2016 at 3:54 PM, Ufuk Celebi  wrote:

Hey Maciek,

thanks for reporting this. Having files linger around looks like a 
bug to me.


The idea behind having the recursive flag set to false in the
AbstractFileStateHandle.discardState() call is that the
FileStateHandle is actually just a single file and not a directory.
The second call trying to delete the parent directory only succeeds
when all other files in that directory have been deleted as well. I
think this is what sometimes fails with many state handles. For
RocksDB there is only a single state handle, which works well.

I will open an issue for this and try to reproduce it reliably and 
then fix it.


– Ufuk


On Thu, May 12, 2016 at 10:28 AM, Maciek Próchniak  wrote:

Hi,

we have stream job with quite large state (few GB), we're using
FSStateBackend and we're storing checkpoints in hdfs.
What we observe is that v. often old checkpoints are not discarded 
properly.

In hadoop logs I can see:

2016-05-10 12:21:06,559 INFO BlockStateChange: BLOCK* 
addToInvalidates:

blk_1084791727_11053122 10.10.113.10:50010
2016-05-10 12:21:06,559 INFO org.apache.hadoop.ipc.Server: IPC Server
handler 9 on 8020, call
org.apache.hadoop.hdfs.protocol.ClientProtocol.delete from 
10.10.113.9:49233

Call#12337 Retry#0
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException:
`/flink/checkpoints_test/570d6e67d571c109daab468e5678402b/chk-62 is 
non

empty': Directory is not empty
 at
org.apache.hadoop.hdfs.server.namenode.FSDirDeleteOp.delete(FSDirDeleteOp.java:85) 


 at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.delete(FSNamesystem.java:3712) 



While on flink side (jobmanager log) we don't see any problems:
2016-05-10 12:20:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 62 @ 1462875622636
2016-05-10 12:20:32,507 
[flink-akka.actor.default-dispatcher-240088] INFO

org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 62 (in 9843 ms)
2016-05-10 12:20:52,637 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 63 @ 1462875652637
2016-05-10 12:21:06,563 
[flink-akka.actor.default-dispatcher-240028] INFO

org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 63 (in 13909 ms)
2016-05-10 12:21:22,636 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 64 @ 1462875682636

I see in the code th

Flink performance tuning

2016-05-13 Thread Serhiy Boychenko
Hey,

I have successfully integrated Flink into our very small test cluster (3 
machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am 
started the session to use YARN as RM and the data is being read from HDFS.
/yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024

My code is very simple, flatMap is being done on the CSV data, so I extract the 
signal name and value, I group by signal name and performing group reduce on 
the data in order to calculate max, min and average on the collected values.

I have observed on 3 nodes, the average processing rate is around 
11Mbytes/second. I have compared the results with MR execution(without any kind 
of tuning) and I am quite surprised, since the performance of Hadoop is 
85Mybtes/second when executing the same query on the same data. I have read few 
reports claiming that Flink is better in comparison to MR and other tools. I am 
wondering what is wrong? Any clue?

The processing rate is calculated according to the following formula:
Overall processing rate = sum of total amount of data read per job/sum of total 
time the job was running (including staging periods)

Best regards,
Serhiy.


Re: Confusion about multiple use of one ValueState

2016-05-13 Thread Balaji Rajagopalan
I wrote a simple helper class, the redis connection are initialized in
the constructor and there are

set and get methods to store and retreive values from your map
functions. If you find any better way

to do this please share :). I am using redis scala client.

object class RedisHelper

{

val redisHost = GlobalConfiguration.getString("redis.host", "localhost")
val redisPort = GlobalConfiguration.getInteger("redis.port", 6379)
LOG.info(s"Using host: [$redisHost] and port : [$redisPort] to connect
to redis")

implicit val executionContext = ExecutionContext.global
implicit val akkaSystem = akka.actor.ActorSystem("redis-flink-actorsystem")
val redisClient = RedisClient(host = redisHost, port = redisPort)

def set(k: String, v: String, exTime: Option[Long]): Unit = {
  redisClient.set(k, v, exTime)
}

def set(k: String, v: String): Unit = {
  redisClient.set(k, v)
}

def get(k: String): Option[String] = {
  try
{
  type K = Option[String]
  val f = redisClient.get[String](k)
  Await.result(f, 2.seconds)
}
  catch {
case e: Exception => {
  LOG.error("Exception while getting data from redis " +
e.fillInStackTrace())
  None
}
  }

}


On Fri, May 13, 2016 at 2:09 PM, Nirmalya Sengupta <
sengupta.nirma...@gmail.com> wrote:

>
> Hello Balaji ,
>
> Thanks for your reply. This confirms my earlier assumption that one of
> usual ways to do it is to hold and nurture the application-state in an
> external body; in your case: Redis.
>
> So, I am trying to understand how does one share the handle to this
> external body amongst partitions: do I create a Connector to a Redis
> instance (referring to your case as an example) at the beginning of Flink
> application and share that amongst partitions using the _Broadcast_
> mechanism? Obviously, the assumption is that the external body (Redis, in
> this case) will have to deal with concurrent access of elements of State,
> and updation etc. Operators simply call an API on Redis to store and
> retrieve elements from the application-state.
>
> Is my understanding correct?
>
> Yes, I am aware of the fresh creation of a Mapper for every tuple that
> comes in. In fact, this was the source of my original doubt before I posted
> the question. Thanks again for underscoring that.
>
> -- Nirmalya
>
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>


"Memory ran out" error when running connected components

2016-05-13 Thread Arkay
Hi to all,

I’m aware there are a few threads on this, but I haven’t been able to solve
an issue I am seeing and hoped someone can help.  I’m trying to run the
following:

val connectedNetwork = new org.apache.flink.api.scala.DataSet[Vertex[Long,
Long]](
  Graph.fromTuple2DataSet(inputEdges, vertexInitialiser, env)
.run(new ConnectedComponents[Long, NullValue](100)))

And hitting the error:

java.lang.RuntimeException: Memory ran out. numPartitions: 32 minPartition:
8 maxPartition: 8 number of overflow segments: 122 bucketSize: 206 Overall
memory: 19365888 Partition memory: 8388608
 at
org.apache.flink.runtime.operators.hash.CompactingHashTable.getNextBuffer(CompactingHashTable.java:753)
 at
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertBucketEntryFromStart(CompactingHashTable.java:546)
 at
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:423)
 at
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
 at
org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
 at
org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
 at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
 at java.lang.Thread.run(Unknown Source)

I’m running Flink 1.0.3 on windows 10 using start-local.bat.  I have Xmx set
to 6500MB, 8 workers, parallelism 8 and other memory settings left at
default.

The inputEdges dataset contains 141MB of Long,Long pairs (which is around 6
million edges).  ParentID is unique and always negative, ChildID is
non-unique and always positive (simulating a bipartite graph)

An example few rows:
-91498683401,1738
-135344401,5370
-100260517801,7970
-154352186001,12311
-160265532002,12826

The vast majority of the childIds are actually unique, and the most popular
ID only occurs 10 times.

VertexInitialiser just sets the vertex value to the id.

Hopefully this is just a memory setting I’m not seeing for the hashTable as
it dies almost instantly,  I don’t think it gets very far into the dataset. 
I understand that the CompactingHashTable cannot spill, but I’d be surprised
if it needed to at these low volumes.

Many thanks for any help!

Rob 




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


killing process in Flink cluster

2016-05-13 Thread Ramkumar
Hi All,

I am new to Flink. I am running wordcount streaming program in cluster. It
has take more time. So I stopped that process manually. But it still in
canceling, there are two subtasks in cluster one has successfully canceled
but another one is still canceling. We tried to kill the process in command
prompt using grep and kill command. But couldn't kill that process.Please
help me to kill that process.

Best Regards,
Ramkumar L




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/killing-process-in-Flink-cluster-tp6889.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Confusion about multiple use of one ValueState

2016-05-13 Thread Nirmalya Sengupta
Hello Balaji 

Yes. The State holder 'sum' in my example is actually created outside the
Mapper objects; so it stays where it is. I am creating 'var's inside the
Mapper objects to _refer_ to the same object, irrespective of multiplicity
of the Mappers.  The _open_ function is helping to make that association.
This indeed was the basis of my logic. So, we are on the same page, there.

However, my question was about order of creation of State holders through
multiple Operators and accessing one such State holder from inside an
Operator which did not  create it.

I understand from your response, that you sidestepped this difficulty by
resorting to Redis. Redis has a global presence and hence, any operator
from any partition can access it.

My question was how did you share the Redis handle across partitions? Did
you broadcast it?

-- Nirmalya







-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."


Re: Confusion about multiple use of one ValueState

2016-05-13 Thread nsengupta
Sorry, Balaji! Somehow, I missed this particular post of yours. Please ignore
my last mail, where I am asking the same question.

--  Nirmalya



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Confusion-about-multiple-use-of-one-ValueState-tp6876p6891.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Scatter-Gather Iteration aggregators

2016-05-13 Thread Vasiliki Kalavri
Hi Lydia,

aggregators are automatically reset at the beginning of each iteration. As
far as I know, the reset() method is not supposed to be called from user
code. Also, in the code you pasted, you use "aggregator.getAggregate()".
Please, use the "getPreviousIterationAggregate()" method as I wrote above,
otherwise you won't get the correct value.

Cheers,
-Vasia.

On 13 May 2016 at 11:03, Lydia Ickler  wrote:

> Hi Vasia,
>
> okay I understand now :)
> So it works fine if I want to collect the sum of values.
> But what if I need to reset the DoubleSumAggregator back to 0 in order to
> then set it to a new value to save the absolute maximum?
> Please have a look at the code above.
>
> Any idea why it is not working?
>
>
> public static class VertexDistanceUpdater extends 
> VertexUpdateFunction {
>
> DoubleSumAggregator aggregator = new DoubleSumAggregator();
>
> public void preSuperstep() {
> // retrieve the Aggregator
> aggregator = getIterationAggregator("sumAggregator");
> }
>
> public void updateVertex(Vertex vertex, 
> MessageIterator inMessages) {
> double sum = 0;
> for (double msg : inMessages) {
> sum = sum + (msg);
> }
>
> if((Math.abs(sum) > Math.abs(aggregator.getAggregate().getValue({
>
> aggregator.reset();
> aggregator.aggregate(sum);
>
> }
> setNewVertexValue(sum);
> }
> }
>
>
>
>
> Am 13.05.2016 um 09:25 schrieb Vasiliki Kalavri  >:
>
> Hi Lydia,
>
> an iteration aggregator combines all aggregates globally once per
> superstep and makes them available in the *next* superstep.
> Within each scatter-gather iteration, one MessagingFunction (scatter
> phase) and one VertexUpdateFunction (gather phase) are executed. Thus, if
> you set an aggregate value within one of those, the value will be available
> in the next superstep. You can retrieve it calling
> the getPreviousIterationAggregate() method.
> Let me know if that clears things up!
>
> -Vasia.
>
> On 13 May 2016 at 08:57, Lydia Ickler  wrote:
>
>> Hi Vasia,
>>
>> yes, but only independently within each Function or not?
>>
>> If I set the aggregator in VertexUpdateFunction then the newly set value
>> is not visible in the MessageFunction.
>> Or am I doing something wrong? I would like to have a shared aggregator
>> to normalize vertices.
>>
>>
>> Am 13.05.2016 um 08:04 schrieb Vasiliki Kalavri <
>> vasilikikala...@gmail.com>:
>>
>> Hi Lydia,
>>
>> registered aggregators through the ScatterGatherConfiguration are
>> accessible both in the VertexUpdateFunction and in the MessageFunction.
>>
>> Cheers,
>> -Vasia.
>>
>> On 12 May 2016 at 20:08, Lydia Ickler  wrote:
>>
>>> Hi,
>>>
>>> I have a question regarding the Aggregators of a Scatter-Gather
>>> Iteration.
>>> Is it possible to have a global aggregator that is accessible in 
>>> VertexUpdateFunction()
>>> and MessagingFunction() at the same time?
>>>
>>> Thanks in advance,
>>> Lydia
>>>
>>
>>
>>
>
>


Re: Barriers at work

2016-05-13 Thread Matthias J. Sax
I don't think barries can "expire" as of now. Might be a nice idea
thought -- I don't know if this might be a problem in production.

Furthermore, I want to point out, that an "expiring checkpoint" would
not break exactly-once processing, as the latest successful checkpoint
can always be used to recover correctly. Only the recovery-time would be
increase. because if a "barrier expires" and no checkpoint can be
stored, more data has to be replayed using the "old" checkpoint".


-Matthias

On 05/12/2016 09:21 PM, Srikanth wrote:
> Hello,
> 
> I was reading about Flink's checkpoint and wanted to check if I
> correctly understood the usage of barriers for exactly once processing.
>  1) Operator does alignment by buffering records coming after a barrier
> until it receives barrier from all upstream operators instances.
>  2) Barrier is always preceded by a watermark to trigger processing all
> windows that are complete.
>  3) Records in windows that are not triggered are also saved as part of
> checkpoint. These windows are repopulated when restoring from checkpoints. 
> 
> In production setups, were there any cases where alignment during
> checkpointing caused unacceptable latency?
> If so, is there a way to indicate say wait for a MAX 100 ms? That way we
> have exactly-once in most situations but prefer at least once over
> higher latency in corner cases.
> 
> Srikanth



signature.asc
Description: OpenPGP digital signature


Re: "Memory ran out" error when running connected components

2016-05-13 Thread Vasiliki Kalavri
Hi Rob,


On 13 May 2016 at 11:22, Arkay  wrote:

> Hi to all,
>
> I’m aware there are a few threads on this, but I haven’t been able to solve
> an issue I am seeing and hoped someone can help.  I’m trying to run the
> following:
>
> val connectedNetwork = new org.apache.flink.api.scala.DataSet[Vertex[Long,
> Long]](
>   Graph.fromTuple2DataSet(inputEdges, vertexInitialiser, env)
> .run(new ConnectedComponents[Long, NullValue](100)))
>
> And hitting the error:
>
> java.lang.RuntimeException: Memory ran out. numPartitions: 32 minPartition:
> 8 maxPartition: 8 number of overflow segments: 122 bucketSize: 206 Overall
> memory: 19365888 Partition memory: 8388608
>  at
>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.getNextBuffer(CompactingHashTable.java:753)
>  at
>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertBucketEntryFromStart(CompactingHashTable.java:546)
>  at
>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:423)
>  at
>
> org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325)
>  at
>
> org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212)
>  at
>
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273)
>  at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>  at java.lang.Thread.run(Unknown Source)
>
> I’m running Flink 1.0.3 on windows 10 using start-local.bat.  I have Xmx
> set
> to 6500MB, 8 workers, parallelism 8 and other memory settings left at
> default.
>

The ​start-local script will start a single JobManager and TaskManager.
​What do you mean by 8 workers? Have you set the numberOfTaskSlots to 8? To
give all available memory to your TaskManager, you should set the
"taskmanager.heap.mb" configuration option in flink-conf.yaml. Can you open
the Flink dashboard at http://localhost:8081/ and check the configuration
of your taskmanager?

​Cheers,
-Vasia.​


> The inputEdges dataset contains 141MB of Long,Long pairs (which is around 6
> million edges).  ParentID is unique and always negative, ChildID is
> non-unique and always positive (simulating a bipartite graph)
>
> An example few rows:
> -91498683401,1738
> -135344401,5370
> -100260517801,7970
> -154352186001,12311
> -160265532002,12826
>
> The vast majority of the childIds are actually unique, and the most popular
> ID only occurs 10 times.
>
> VertexInitialiser just sets the vertex value to the id.
>
> Hopefully this is just a memory setting I’m not seeing for the hashTable as
> it dies almost instantly,  I don’t think it gets very far into the dataset.
> I understand that the CompactingHashTable cannot spill, but I’d be
> surprised
> if it needed to at these low volumes.
>
> Many thanks for any help!
>
> Rob
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: "Memory ran out" error when running connected components

2016-05-13 Thread Arkay
Thanks Vasia,

Apologies, yes by workers i mean I have set taskmanager.numberOfTaskSlots: 8
and parallelism.default: 8 in flink-conf.yaml. I have also set
taskmanager.heap.mb: 6500

In the dashboard it is showing free memory as 5.64GB and Flink Managed
Memory as 3.90GB.

Thanks,
Rob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888p6895.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Barriers at work

2016-05-13 Thread Stephan Ewen
Hi Srikanth!

That is an interesting idea.
I have it on my mind to create a design doc for checkpointing improvements.
That could be added as a proposal there.

I hope I'll be able to start with that design doc next week.

Greetings,
Stephan


On Fri, May 13, 2016 at 1:35 PM, Matthias J. Sax  wrote:

> I don't think barries can "expire" as of now. Might be a nice idea
> thought -- I don't know if this might be a problem in production.
>
> Furthermore, I want to point out, that an "expiring checkpoint" would
> not break exactly-once processing, as the latest successful checkpoint
> can always be used to recover correctly. Only the recovery-time would be
> increase. because if a "barrier expires" and no checkpoint can be
> stored, more data has to be replayed using the "old" checkpoint".
>
>
> -Matthias
>
> On 05/12/2016 09:21 PM, Srikanth wrote:
> > Hello,
> >
> > I was reading about Flink's checkpoint and wanted to check if I
> > correctly understood the usage of barriers for exactly once processing.
> >  1) Operator does alignment by buffering records coming after a barrier
> > until it receives barrier from all upstream operators instances.
> >  2) Barrier is always preceded by a watermark to trigger processing all
> > windows that are complete.
> >  3) Records in windows that are not triggered are also saved as part of
> > checkpoint. These windows are repopulated when restoring from
> checkpoints.
> >
> > In production setups, were there any cases where alignment during
> > checkpointing caused unacceptable latency?
> > If so, is there a way to indicate say wait for a MAX 100 ms? That way we
> > have exactly-once in most situations but prefer at least once over
> > higher latency in corner cases.
> >
> > Srikanth
>
>


flink-kafka-connector offset management

2016-05-13 Thread Arun Balan
Hi, I am trying to use the flink-kafka-connector and I notice that every
time I restart my application it re-reads the last message on the kafka
topic. So if the latest offset on the topic is 10, then when the
application is restarted, kafka-connector will re-read message 10. Why is
this the behavior? I would assume that the last message has already been
read and offset committed. I require that messages that are already
processed from the topic not be reprocessed. Any insight would be helpful.

Thanks
Arun Balan


Re: "Memory ran out" error when running connected components

2016-05-13 Thread Vasiliki Kalavri
Thanks for checking Rob! I don't see any reason for the job to fail with
this configuration and input size.
I have no experience running Flink on windows though, so I might be missing
something. Do you get a similar error with smaller inputs?

-Vasia.

On 13 May 2016 at 13:27, Arkay  wrote:

> Thanks Vasia,
>
> Apologies, yes by workers i mean I have set taskmanager.numberOfTaskSlots:
> 8
> and parallelism.default: 8 in flink-conf.yaml. I have also set
> taskmanager.heap.mb: 6500
>
> In the dashboard it is showing free memory as 5.64GB and Flink Managed
> Memory as 3.90GB.
>
> Thanks,
> Rob
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888p6895.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: "Memory ran out" error when running connected components

2016-05-13 Thread Arkay
Hi Vasia,

It seems to work OK up to about 50MB of input, and dies after that point. 
If i disable just this connected components step the rest of my program is
happy with the full 1.5GB test dataset.  It seems to be specifically limited
to GraphAlgorithms in my case.

Do you know what the units are when it is saying Partition memory: 8388608? 
If it is bytes then it sounds like its using around 256MB per hash table of
32 partitions (which is then multiplied by number of task slots i guess). 
Can this number be configured do you know?  Perhaps the windows version of
the JVM is defaulting it to a lower value than on Linux?

Thanks,
Rob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888p6899.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink performance tuning

2016-05-13 Thread Robert Metzger
Hi,

Can you try running the job with 8 slots, 7 GB (maybe you need to go down
to 6 GB) and only three TaskManagers (-n 3) ?

I'm suggesting this, because you have many small JVMs running on your
machines. On such small machines you can probably get much more use out of
your available memory by running a few big task managers (which can share
all the common management infra).
Another plus of running a few JVMs is that you are deducing network
overhead, because communication can happen within the process, and less
network transfer is required.

Another big factor for performance are the datatypes used. How do you
represent your data in Flink? (Are you using the TupleX types? or POJOs?)
How do you select the key for the grouping?

Regards,
Robert


On Fri, May 13, 2016 at 11:25 AM, Serhiy Boychenko  wrote:

> Hey,
>
>
>
> I have successfully integrated Flink into our very small test cluster (3
> machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am
> started the session to use YARN as RM and the data is being read from HDFS.
>
> /yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024
>
>
>
> My code is very simple, flatMap is being done on the CSV data, so I
> extract the signal name and value, I group by signal name and performing
> group reduce on the data in order to calculate max, min and average on the
> collected values.
>
>
>
> I have observed on 3 nodes, the average processing rate is around
> 11Mbytes/second. I have compared the results with MR execution(without any
> kind of tuning) and I am quite surprised, since the performance of Hadoop
> is 85Mybtes/second when executing the same query on the same data. I have
> read few reports claiming that Flink is better in comparison to MR and
> other tools. I am wondering what is wrong? Any clue?
>
>
>
> The processing rate is calculated according to the following formula:
>
> Overall processing rate = sum of total amount of data read per job/sum of
> total time the job was running (including staging periods)
>
>
>
> Best regards,
>
> Serhiy.
>


Re: Flink performance tuning

2016-05-13 Thread Stephan Ewen
One issue may be that the selection of YARN containers is not HDFS locality
aware here.
Hence, Flink may read more splits remotely, where MR reads more splits
locally.

On Fri, May 13, 2016 at 3:25 PM, Robert Metzger  wrote:

> Hi,
>
> Can you try running the job with 8 slots, 7 GB (maybe you need to go down
> to 6 GB) and only three TaskManagers (-n 3) ?
>
> I'm suggesting this, because you have many small JVMs running on your
> machines. On such small machines you can probably get much more use out of
> your available memory by running a few big task managers (which can share
> all the common management infra).
> Another plus of running a few JVMs is that you are deducing network
> overhead, because communication can happen within the process, and less
> network transfer is required.
>
> Another big factor for performance are the datatypes used. How do you
> represent your data in Flink? (Are you using the TupleX types? or POJOs?)
> How do you select the key for the grouping?
>
> Regards,
> Robert
>
>
> On Fri, May 13, 2016 at 11:25 AM, Serhiy Boychenko <
> serhiy.boyche...@cern.ch> wrote:
>
>> Hey,
>>
>>
>>
>> I have successfully integrated Flink into our very small test cluster (3
>> machines with 8 cores, 8GBytes of memory and 2x1TB disks). Basically I am
>> started the session to use YARN as RM and the data is being read from HDFS.
>>
>> /yarn-session.sh -n 21 -s 1 -jm 1024 -tm 1024
>>
>>
>>
>> My code is very simple, flatMap is being done on the CSV data, so I
>> extract the signal name and value, I group by signal name and performing
>> group reduce on the data in order to calculate max, min and average on the
>> collected values.
>>
>>
>>
>> I have observed on 3 nodes, the average processing rate is around
>> 11Mbytes/second. I have compared the results with MR execution(without any
>> kind of tuning) and I am quite surprised, since the performance of Hadoop
>> is 85Mybtes/second when executing the same query on the same data. I have
>> read few reports claiming that Flink is better in comparison to MR and
>> other tools. I am wondering what is wrong? Any clue?
>>
>>
>>
>> The processing rate is calculated according to the following formula:
>>
>> Overall processing rate = sum of total amount of data read per job/sum of
>> total time the job was running (including staging periods)
>>
>>
>>
>> Best regards,
>>
>> Serhiy.
>>
>
>


Re: "Memory ran out" error when running connected components

2016-05-13 Thread Vasiliki Kalavri
On 13 May 2016 at 14:28, Arkay  wrote:

> Hi Vasia,
>
> It seems to work OK up to about 50MB of input, and dies after that point.
> If i disable just this connected components step the rest of my program is
> happy with the full 1.5GB test dataset.  It seems to be specifically
> limited
> to GraphAlgorithms in my case.
>

​So your program has other ​steps before/after the connected components
algorithm?
Could it be that you have some expensive operation that competes for memory
with the hash table?



>
> Do you know what the units are when it is saying Partition memory: 8388608?
> If it is bytes then it sounds like its using around 256MB per hash table of
> 32 partitions (which is then multiplied by number of task slots i guess).
>

​Yes, that's bytes.​



> Can this number be configured do you know?  Perhaps the windows version of
> the JVM is defaulting it to a lower value than on Linux?
>

​By default, the hash table uses Fink's managed memory. That's 3.0GB in
your case (0.7 of the total memory by default).
You can change this fraction by setting the "taskmanager.memory.fraction"
in the configuration. See [1] for other managed memory options.

Hope this helps!
-Vasia.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#managed-memory



>
> Thanks,
> Rob
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888p6899.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Availability of OrderedKeyedDataStream

2016-05-13 Thread nsengupta
Hello Flinksters,

Have you decided to do away with the 'OrderedKeyedDataStream' type
altogether? I didn't find it in the API  documents
  .
It is mentioned and elaborated  here

  
and I think it can be a good fit for an UseCase I am trying to implement.

Could someone please confirm? If it exists but with a different name or if
the same behaviour can be achieved using other methods, I would like to be
wiser.

-- Nirmalya




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Availability-of-OrderedKeyedDataStream-tp6903.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: "Memory ran out" error when running connected components

2016-05-13 Thread Arkay
Thanks for the link, I had experimented with those options, apart from
taskmanager.memory.off-heap: true.  Turns out that allows it to run through
happily!  I don't know if that is a peculiarity of a windows JVM, as I
understand that setting is purely an efficiency improvement?

For your first question, yes I have a number of steps that get scheduled
around the same time in the job, its not really avoidable unless there are
optimizer hints to tell the system to only run certain steps on their own? 
I will try cutting the rest of the program out as a test however.

Thanks very much for your help with this, and all your excellent work on
Flink and Gelly :)

Rob



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-ran-out-error-when-running-connected-components-tp6888p6904.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Barriers at work

2016-05-13 Thread Srikanth
Thanks Matthias & Stephan!

Yes, if we choose to fail checkpoint on expiry, we can restore from
previous checkpoint.

Looking forward to read the new design proposal.

Srikanth


On Fri, May 13, 2016 at 8:09 AM, Stephan Ewen  wrote:

> Hi Srikanth!
>
> That is an interesting idea.
> I have it on my mind to create a design doc for checkpointing
> improvements. That could be added as a proposal there.
>
> I hope I'll be able to start with that design doc next week.
>
> Greetings,
> Stephan
>
>
> On Fri, May 13, 2016 at 1:35 PM, Matthias J. Sax  wrote:
>
>> I don't think barries can "expire" as of now. Might be a nice idea
>> thought -- I don't know if this might be a problem in production.
>>
>> Furthermore, I want to point out, that an "expiring checkpoint" would
>> not break exactly-once processing, as the latest successful checkpoint
>> can always be used to recover correctly. Only the recovery-time would be
>> increase. because if a "barrier expires" and no checkpoint can be
>> stored, more data has to be replayed using the "old" checkpoint".
>>
>>
>> -Matthias
>>
>> On 05/12/2016 09:21 PM, Srikanth wrote:
>> > Hello,
>> >
>> > I was reading about Flink's checkpoint and wanted to check if I
>> > correctly understood the usage of barriers for exactly once processing.
>> >  1) Operator does alignment by buffering records coming after a barrier
>> > until it receives barrier from all upstream operators instances.
>> >  2) Barrier is always preceded by a watermark to trigger processing all
>> > windows that are complete.
>> >  3) Records in windows that are not triggered are also saved as part of
>> > checkpoint. These windows are repopulated when restoring from
>> checkpoints.
>> >
>> > In production setups, were there any cases where alignment during
>> > checkpointing caused unacceptable latency?
>> > If so, is there a way to indicate say wait for a MAX 100 ms? That way we
>> > have exactly-once in most situations but prefer at least once over
>> > higher latency in corner cases.
>> >
>> > Srikanth
>>
>>
>


Flink recovery

2016-05-13 Thread Madhire, Naveen
Hi,

We are trying to test the recovery mechanism of Flink with Kafka and HDFS sink 
during failures.

I’ve killed the job after processing some messages and restarted the same job 
again. Some of the messages I am seeing are processed more than once and not 
following the exactly once semantics.


Also, using the checkpointing mechanism and saving the state checkpoints into 
HDFS.
Below is the checkpoint code,


envStream.enableCheckpointing(11);
envStream.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
envStream.getCheckpointConfig().setCheckpointTimeout(6);
envStream.getCheckpointConfig().setMaxConcurrentCheckpoints(4);

envStream.setStateBackend(new 
FsStateBackend("hdfs://ipaddr/mount/cp/checkpoint/"));

One thing I’ve noticed is lowering the time to checkpointing is actually 
lowering the number of messages processed more than once and 11ms is the lowest 
I can use.

Is there anything else I should try to have exactly once message processing 
functionality.

I am using Flink 1.0.0 and kafka 0.8


Thank you.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Barriers at work

2016-05-13 Thread Srikanth
I have a follow up. Is there a recommendation of list of knobs that can be
tuned if at least once guarantee while handling failure is good enough?
For cases like alert generation, non idempotent sink, etc where the system
can live with duplicates or has other mechanism to handle them.

Srikanth

On Fri, May 13, 2016 at 8:09 AM, Stephan Ewen  wrote:

> Hi Srikanth!
>
> That is an interesting idea.
> I have it on my mind to create a design doc for checkpointing
> improvements. That could be added as a proposal there.
>
> I hope I'll be able to start with that design doc next week.
>
> Greetings,
> Stephan
>
>
> On Fri, May 13, 2016 at 1:35 PM, Matthias J. Sax  wrote:
>
>> I don't think barries can "expire" as of now. Might be a nice idea
>> thought -- I don't know if this might be a problem in production.
>>
>> Furthermore, I want to point out, that an "expiring checkpoint" would
>> not break exactly-once processing, as the latest successful checkpoint
>> can always be used to recover correctly. Only the recovery-time would be
>> increase. because if a "barrier expires" and no checkpoint can be
>> stored, more data has to be replayed using the "old" checkpoint".
>>
>>
>> -Matthias
>>
>> On 05/12/2016 09:21 PM, Srikanth wrote:
>> > Hello,
>> >
>> > I was reading about Flink's checkpoint and wanted to check if I
>> > correctly understood the usage of barriers for exactly once processing.
>> >  1) Operator does alignment by buffering records coming after a barrier
>> > until it receives barrier from all upstream operators instances.
>> >  2) Barrier is always preceded by a watermark to trigger processing all
>> > windows that are complete.
>> >  3) Records in windows that are not triggered are also saved as part of
>> > checkpoint. These windows are repopulated when restoring from
>> checkpoints.
>> >
>> > In production setups, were there any cases where alignment during
>> > checkpointing caused unacceptable latency?
>> > If so, is there a way to indicate say wait for a MAX 100 ms? That way we
>> > have exactly-once in most situations but prefer at least once over
>> > higher latency in corner cases.
>> >
>> > Srikanth
>>
>>
>


Re: Barriers at work

2016-05-13 Thread Stephan Ewen
You can use the checkpoint mode to "at least once".
That way, barriers never block.

On Fri, May 13, 2016 at 6:05 PM, Srikanth  wrote:

> I have a follow up. Is there a recommendation of list of knobs that can be
> tuned if at least once guarantee while handling failure is good enough?
> For cases like alert generation, non idempotent sink, etc where the system
> can live with duplicates or has other mechanism to handle them.
>
> Srikanth
>
> On Fri, May 13, 2016 at 8:09 AM, Stephan Ewen  wrote:
>
>> Hi Srikanth!
>>
>> That is an interesting idea.
>> I have it on my mind to create a design doc for checkpointing
>> improvements. That could be added as a proposal there.
>>
>> I hope I'll be able to start with that design doc next week.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Fri, May 13, 2016 at 1:35 PM, Matthias J. Sax 
>> wrote:
>>
>>> I don't think barries can "expire" as of now. Might be a nice idea
>>> thought -- I don't know if this might be a problem in production.
>>>
>>> Furthermore, I want to point out, that an "expiring checkpoint" would
>>> not break exactly-once processing, as the latest successful checkpoint
>>> can always be used to recover correctly. Only the recovery-time would be
>>> increase. because if a "barrier expires" and no checkpoint can be
>>> stored, more data has to be replayed using the "old" checkpoint".
>>>
>>>
>>> -Matthias
>>>
>>> On 05/12/2016 09:21 PM, Srikanth wrote:
>>> > Hello,
>>> >
>>> > I was reading about Flink's checkpoint and wanted to check if I
>>> > correctly understood the usage of barriers for exactly once processing.
>>> >  1) Operator does alignment by buffering records coming after a barrier
>>> > until it receives barrier from all upstream operators instances.
>>> >  2) Barrier is always preceded by a watermark to trigger processing all
>>> > windows that are complete.
>>> >  3) Records in windows that are not triggered are also saved as part of
>>> > checkpoint. These windows are repopulated when restoring from
>>> checkpoints.
>>> >
>>> > In production setups, were there any cases where alignment during
>>> > checkpointing caused unacceptable latency?
>>> > If so, is there a way to indicate say wait for a MAX 100 ms? That way
>>> we
>>> > have exactly-once in most situations but prefer at least once over
>>> > higher latency in corner cases.
>>> >
>>> > Srikanth
>>>
>>>
>>
>


Re: Barriers at work

2016-05-13 Thread Srikanth
Thanks. I didn't know we could set that.

On Fri, May 13, 2016 at 12:44 PM, Stephan Ewen  wrote:

> You can use the checkpoint mode to "at least once".
> That way, barriers never block.
>
> On Fri, May 13, 2016 at 6:05 PM, Srikanth  wrote:
>
>> I have a follow up. Is there a recommendation of list of knobs that can
>> be tuned if at least once guarantee while handling failure is good enough?
>> For cases like alert generation, non idempotent sink, etc where the
>> system can live with duplicates or has other mechanism to handle them.
>>
>> Srikanth
>>
>> On Fri, May 13, 2016 at 8:09 AM, Stephan Ewen  wrote:
>>
>>> Hi Srikanth!
>>>
>>> That is an interesting idea.
>>> I have it on my mind to create a design doc for checkpointing
>>> improvements. That could be added as a proposal there.
>>>
>>> I hope I'll be able to start with that design doc next week.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Fri, May 13, 2016 at 1:35 PM, Matthias J. Sax 
>>> wrote:
>>>
 I don't think barries can "expire" as of now. Might be a nice idea
 thought -- I don't know if this might be a problem in production.

 Furthermore, I want to point out, that an "expiring checkpoint" would
 not break exactly-once processing, as the latest successful checkpoint
 can always be used to recover correctly. Only the recovery-time would be
 increase. because if a "barrier expires" and no checkpoint can be
 stored, more data has to be replayed using the "old" checkpoint".


 -Matthias

 On 05/12/2016 09:21 PM, Srikanth wrote:
 > Hello,
 >
 > I was reading about Flink's checkpoint and wanted to check if I
 > correctly understood the usage of barriers for exactly once
 processing.
 >  1) Operator does alignment by buffering records coming after a
 barrier
 > until it receives barrier from all upstream operators instances.
 >  2) Barrier is always preceded by a watermark to trigger processing
 all
 > windows that are complete.
 >  3) Records in windows that are not triggered are also saved as part
 of
 > checkpoint. These windows are repopulated when restoring from
 checkpoints.
 >
 > In production setups, were there any cases where alignment during
 > checkpointing caused unacceptable latency?
 > If so, is there a way to indicate say wait for a MAX 100 ms? That way
 we
 > have exactly-once in most situations but prefer at least once over
 > higher latency in corner cases.
 >
 > Srikanth


>>>
>>
>


Re: How to measure Flink performance

2016-05-13 Thread Ken Krugler
Hi Dhruv,

> On May 12, 2016, at 11:07pm, Dhruv Gohil  wrote:
> 
> Hi Prateek,
> 
>  
> https://github.com/dataArtisans/yahoo-streaming-benchmark/blob/master/flink-benchmarks/src/main/java/flink/benchmark/utils/ThroughputLogger.java
>  
> 
> https://github.com/dataArtisans/yahoo-streaming-benchmark/blob/master/flink-benchmarks/src/main/java/flink/benchmark/utils/AnalyzeTool.java
>  
> 
> 
> help you measure  throughput and latency both, from within topology.

The AnalyzeTool is processing a file that has lines which match the pattern:

Pattern latencyPattern = Pattern.compile(".*Latency ([0-9]+) ms.*”);

This isn’t something created by the ThroughputLogger.

Who generates these files, and how do they calculate the latency?

Thanks,

— Ken



> On Thursday 12 May 2016 11:21 PM, Konstantin Knauf wrote:
>> Hi Prateek,
>> 
>> regarding throughput, what about simply filling the input Kafka topic
>> with some (a lot) of messages and monitor (e.g.
>> http://quantifind.github.io/KafkaOffsetMonitor/ 
>> ) how quickly Flink can
>> work the lag off. The messages should be representative of your use
>> case, of course.
>> 
>> Latency is harder, I think, and I would also be interested in the
>> approaches of others to measure latency in Flink.
>> 
>> To some extend, you could do it by adding some logging inside Flink, but
>> this effects latency and only measure latency whithin Flink (excluding
>> reading from source and writing to sink).
>> 
>> Cheers,
>> 
>> Konstantin
>> 
>> On 12.05.2016 18:57, prateekarora wrote:
>>> Hi
>>> 
>>> How can i measure  throughput and latency  of my application in flink 1.0.2
>>> ?
>>> 
>>> Regards
>>> Prateek
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-measure-Flink-performance-tp6741p6863.html
>>>  
>>> 
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>>> at Nabble.com.
>>> 
> 

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Sharing State between Operators

2016-05-13 Thread nsengupta
Hello Flinksters


Alright. So, I had a fruitful exchange of messages with Balaji earlier
today, on this topic. I moved ahead with the understanding derived from the
exchange (thanks, Balaji) at the time. But, now I am back because I think my
approach is unclean, if not incorrect. There probably is a smarter way to
achieve the same but I can't figure it out.

Here's the problem:

A building has 4 walls (0,1,2,3). On each wall, a number of devices has been
planted to capture some physical attribute: let's say temperature at that
spot. Every device has a unique ID. 

A typical tuple looks like this (Reading ==> Temperature as an Integer):
(TupleType,Time,WallID,DeviceID,Reading) 

The system works on the basis of records arriving in a time-window of 60
seconds. We can consider this to be a Tumbling Window. The time (and Window
assignment etc.) is not the issue here. The 'Time' field increases
monotonically.

If TupleType == 0, I need to compute and update my data structures from the
stream

If TupleType == 1, I need to emit the maximum temperature recorded by the
DeviceID out of last 5 readings.

If TupleType == 2, I need to emit the number of readings so far arrived from
the particular wall. Obviously, in this case, we will ignore the value of
fields 'DeviceID' and 'Reading' in the tuple.

The Application generates output for TupleType 1 and TupleType 2. 

The TupleTypes can arrive in any order. For example, TupleType 1 may arrive
with a DeviceID which the application hasn't seen before (no corresponding
TupleType 0 has arrived earlier with that DeviceID). Let us assume that we
have a fallback value to be emitted for such cases, to keep things simple.

In my mind, the implementation should be along this line:

- Split the incoming Stream in three separate substreams using SplitStream,
based upon TupleType
- For StreamOFTupleType0,
  - KeyBy(DeviceID)
  - Apply a Mapper
 - Update a Map [DeviceID, [Tuple2(MaxReadingSoFar,
FixedSizeList[Reading])] somewhere
  - Apply (next) Mapper
 - Calculate the total count of reading the Wall so far
 - Update a Map [WallID, Count]

- For StreamOFTupleType1
 - Access the Map created/updated through the first Mapper above
 - Emit

- For StreamOFTupleType2
- Access the Map created/updated through the second Mapper above. 
- Emit

I have hit a wall to decide how the live data structures should be created,
updated and accessed, correctly and efficiently  in a situation like above.
More importantly, how will they be shared between operators, across
partitions (nodes).

I can't broadcast the Maps because they are not READONLY (/aka/ LookUp
only).

I can't create RichMapFunction local data structures because they are not
shared between partitions (my understanding). They will be blind to the
effect of accumulation. Each will begin with an empty Map.

I have done a bit of exploration and I have found this  thread

  
in the forum. I have understood what  Stephano

  
is suggesting ('..State is moved along pipeline ..')  but then, failed to
figure out how to apply in my case, if at all possible.

I have been thinking about using an external DB-like datastore but I want to
be sure about the inevitability of that decision. If I use a DB, then the
focus may go to the INSERT/SELECT like queries. My application then becomes
more of a distributed DB application rather than a lean Streaming
application. That thought doesn't make me happy! :-)

Please make me wiser (by pointing out gaps in understanding where they
exist). If any more specific information helps you, please ask me.

My primary aim is to have a clarity of the recipe of a UseCase like this.

-- Nirmalya
  






--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Sharing-State-between-Operators-tp6911.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink recovery

2016-05-13 Thread Madhire, Naveen
I checked the JIRA and looks like FLINK-2111 should address the issue which I 
am facing. I am canceling the job from dashboard.

I am using kafka source and HDFS rolling sink.

https://issues.apache.org/jira/browse/FLINK-2111

Is this JIRA part of Flink 1.0.0?



Thanks,
Naveen

From: "Madhire, Venkat Naveen Kumar Reddy" 
mailto:naveen.madh...@capitalone.com>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Friday, May 13, 2016 at 10:58 AM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Flink recovery

Hi,

We are trying to test the recovery mechanism of Flink with Kafka and HDFS sink 
during failures.

I’ve killed the job after processing some messages and restarted the same job 
again. Some of the messages I am seeing are processed more than once and not 
following the exactly once semantics.


Also, using the checkpointing mechanism and saving the state checkpoints into 
HDFS.
Below is the checkpoint code,


envStream.enableCheckpointing(11);
envStream.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
envStream.getCheckpointConfig().setCheckpointTimeout(6);
envStream.getCheckpointConfig().setMaxConcurrentCheckpoints(4);

envStream.setStateBackend(new 
FsStateBackend("hdfs://ipaddr/mount/cp/checkpoint/"));

One thing I’ve noticed is lowering the time to checkpointing is actually 
lowering the number of messages processed more than once and 11ms is the lowest 
I can use.

Is there anything else I should try to have exactly once message processing 
functionality.

I am using Flink 1.0.0 and kafka 0.8


Thank you.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Flink recovery

2016-05-13 Thread Fabian Hueske
Hi,

Flink's exactly-once semantics do not mean that events are processed
exactly-once but that events will contribute exactly-once to the state of
an operator such as a counter.
Roughly, the mechanism works as follows:
- Flink peridically injects checkpoint markers into the data stream. This
happens synchronously across all sources and markers.
- When an operator receives a checkpoint marker from all its sources, it
checkpoints its state and forwards the marker
- When the marker was received by all sinks, the distributed checkpoint is
noted as successful.

In case of a failure, the state of all operators is reset to the last
successful checkpoint and the sources are reset to the point when the
marker was injected.
Hence, some events are sent a second time to the operators but the state of
the operators was reset as well. So the repeated events contribute exactly
once to the state of an operator.

Note, you need a SinkFunction that supports Flink's checkpointing mechanism
to achieve exactly-once output. Otherwise, it might happen that results are
emitted multiple times.

Cheers, Fabian

2016-05-13 22:58 GMT+02:00 Madhire, Naveen :

> I checked the JIRA and looks like FLINK-2111 should address the issue
> which I am facing. I am canceling the job from dashboard.
>
> I am using kafka source and HDFS rolling sink.
>
> https://issues.apache.org/jira/browse/FLINK-2111
>
> Is this JIRA part of Flink 1.0.0?
>
>
>
> Thanks,
> Naveen
>
> From: "Madhire, Venkat Naveen Kumar Reddy" 
> Reply-To: "user@flink.apache.org" 
> Date: Friday, May 13, 2016 at 10:58 AM
> To: "user@flink.apache.org" 
> Subject: Flink recovery
>
> Hi,
>
> We are trying to test the recovery mechanism of Flink with Kafka and HDFS
> sink during failures.
>
> I’ve killed the job after processing some messages and restarted the same
> job again. Some of the messages I am seeing are processed more than once
> and not following the exactly once semantics.
>
>
> Also, using the checkpointing mechanism and saving the state checkpoints
> into HDFS.
> Below is the checkpoint code,
>
> envStream.enableCheckpointing(11);
> envStream.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> envStream.getCheckpointConfig().setCheckpointTimeout(6);
> envStream.getCheckpointConfig().setMaxConcurrentCheckpoints(4);
>
> envStream.setStateBackend(new 
> FsStateBackend("hdfs://ipaddr/mount/cp/checkpoint/"));
>
>
> One thing I’ve noticed is lowering the time to checkpointing is actually
> lowering the number of messages processed more than once and 11ms is the
> lowest I can use.
>
> Is there anything else I should try to have exactly once message
> processing functionality.
>
> I am using Flink 1.0.0 and kafka 0.8
>
>
> Thank you.
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: Flink recovery

2016-05-13 Thread Madhire, Naveen
Thank you Fabian.

I am using HDFS rolling sink. This should support the exactly once output in 
case of failures, isn’t it? I am following the below documentation,

https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html#fault-tolerance-guarantees-of-data-sources-and-sinks

If not what other Sinks can I use to have the exactly once output since getting 
exactly once output is critical for our use case.



Thanks,
Naveen

From: Fabian Hueske mailto:fhue...@gmail.com>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Friday, May 13, 2016 at 4:13 PM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Flink recovery

Hi,

Flink's exactly-once semantics do not mean that events are processed 
exactly-once but that events will contribute exactly-once to the state of an 
operator such as a counter.
Roughly, the mechanism works as follows:
- Flink peridically injects checkpoint markers into the data stream. This 
happens synchronously across all sources and markers.
- When an operator receives a checkpoint marker from all its sources, it 
checkpoints its state and forwards the marker
- When the marker was received by all sinks, the distributed checkpoint is 
noted as successful.

In case of a failure, the state of all operators is reset to the last 
successful checkpoint and the sources are reset to the point when the marker 
was injected.
Hence, some events are sent a second time to the operators but the state of the 
operators was reset as well. So the repeated events contribute exactly once to 
the state of an operator.

Note, you need a SinkFunction that supports Flink's checkpointing mechanism to 
achieve exactly-once output. Otherwise, it might happen that results are 
emitted multiple times.

Cheers, Fabian

2016-05-13 22:58 GMT+02:00 Madhire, Naveen 
mailto:naveen.madh...@capitalone.com>>:
I checked the JIRA and looks like FLINK-2111 should address the issue which I 
am facing. I am canceling the job from dashboard.

I am using kafka source and HDFS rolling sink.

https://issues.apache.org/jira/browse/FLINK-2111

Is this JIRA part of Flink 1.0.0?



Thanks,
Naveen

From: "Madhire, Venkat Naveen Kumar Reddy" 
mailto:naveen.madh...@capitalone.com>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Friday, May 13, 2016 at 10:58 AM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Flink recovery

Hi,

We are trying to test the recovery mechanism of Flink with Kafka and HDFS sink 
during failures.

I’ve killed the job after processing some messages and restarted the same job 
again. Some of the messages I am seeing are processed more than once and not 
following the exactly once semantics.


Also, using the checkpointing mechanism and saving the state checkpoints into 
HDFS.
Below is the checkpoint code,


envStream.enableCheckpointing(11);
envStream.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
envStream.getCheckpointConfig().setCheckpointTimeout(6);
envStream.getCheckpointConfig().setMaxConcurrentCheckpoints(4);

envStream.setStateBackend(new 
FsStateBackend("hdfs://ipaddr/mount/cp/checkpoint/"));

One thing I’ve noticed is lowering the time to checkpointing is actually 
lowering the number of messages processed more than once and 11ms is the lowest 
I can use.

Is there anything else I should try to have exactly once message processing 
functionality.

I am using Flink 1.0.0 and kafka 0.8


Thank you.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communi

Multi-tenant, deploying flink cluster

2016-05-13 Thread Alexander Smirnov
Hi,

source data, read from MQ, contains tenant Id.
Is there a way to route messages from particular tenant to particular Flink
node? Is it what can be configured?

Thank you,
Alex


Re: Flink recovery

2016-05-13 Thread Fabian Hueske
Hi Naveen,

the RollingFileSink supports exactly-once output. So you should be good.

Did you see events being emitted multiple times (should not happen with the
RollingFileSink) or being processed multiple times within the Flink program
(might happen as explained before)?

Best, Fabian

2016-05-13 23:19 GMT+02:00 Madhire, Naveen :

> Thank you Fabian.
>
> I am using HDFS rolling sink. This should support the exactly once output
> in case of failures, isn’t it? I am following the below documentation,
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html#fault-tolerance-guarantees-of-data-sources-and-sinks
>
> If not what other Sinks can I use to have the exactly once output since
> getting exactly once output is critical for our use case.
>
>
>
> Thanks,
> Naveen
>
> From: Fabian Hueske 
> Reply-To: "user@flink.apache.org" 
> Date: Friday, May 13, 2016 at 4:13 PM
> To: "user@flink.apache.org" 
> Subject: Re: Flink recovery
>
> Hi,
>
> Flink's exactly-once semantics do not mean that events are processed
> exactly-once but that events will contribute exactly-once to the state of
> an operator such as a counter.
> Roughly, the mechanism works as follows:
> - Flink peridically injects checkpoint markers into the data stream. This
> happens synchronously across all sources and markers.
> - When an operator receives a checkpoint marker from all its sources, it
> checkpoints its state and forwards the marker
> - When the marker was received by all sinks, the distributed checkpoint is
> noted as successful.
>
> In case of a failure, the state of all operators is reset to the last
> successful checkpoint and the sources are reset to the point when the
> marker was injected.
> Hence, some events are sent a second time to the operators but the state
> of the operators was reset as well. So the repeated events contribute
> exactly once to the state of an operator.
>
> Note, you need a SinkFunction that supports Flink's checkpointing
> mechanism to achieve exactly-once output. Otherwise, it might happen that
> results are emitted multiple times.
>
> Cheers, Fabian
>
> 2016-05-13 22:58 GMT+02:00 Madhire, Naveen 
> :
>
>> I checked the JIRA and looks like FLINK-2111 should address the issue
>> which I am facing. I am canceling the job from dashboard.
>>
>> I am using kafka source and HDFS rolling sink.
>>
>> https://issues.apache.org/jira/browse/FLINK-2111
>>
>> Is this JIRA part of Flink 1.0.0?
>>
>>
>>
>> Thanks,
>> Naveen
>>
>> From: "Madhire, Venkat Naveen Kumar Reddy" > >
>> Reply-To: "user@flink.apache.org" 
>> Date: Friday, May 13, 2016 at 10:58 AM
>> To: "user@flink.apache.org" 
>> Subject: Flink recovery
>>
>> Hi,
>>
>> We are trying to test the recovery mechanism of Flink with Kafka and HDFS
>> sink during failures.
>>
>> I’ve killed the job after processing some messages and restarted the same
>> job again. Some of the messages I am seeing are processed more than once
>> and not following the exactly once semantics.
>>
>>
>> Also, using the checkpointing mechanism and saving the state checkpoints
>> into HDFS.
>> Below is the checkpoint code,
>>
>> envStream.enableCheckpointing(11);
>> envStream.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> envStream.getCheckpointConfig().setCheckpointTimeout(6);
>> envStream.getCheckpointConfig().setMaxConcurrentCheckpoints(4);
>>
>> envStream.setStateBackend(new 
>> FsStateBackend("hdfs://ipaddr/mount/cp/checkpoint/"));
>>
>>
>> One thing I’ve noticed is lowering the time to checkpointing is actually
>> lowering the number of messages processed more than once and 11ms is the
>> lowest I can use.
>>
>> Is there anything else I should try to have exactly once message
>> processing functionality.
>>
>> I am using Flink 1.0.0 and kafka 0.8
>>
>>
>> Thank you.
>>
>> --
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>> --
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the inten

Re: Flink recovery

2016-05-13 Thread Madhire, Naveen
Thanks Fabian. Yes, I am seeing few records more than once in the output.
I am running the job and canceling it from the dashboard, and running again. 
And using different HDFS file outputs both the times. I was thinking when I 
cancel the job, it’s not doing a clean cancel.
Is there anything else which I have to use to make it exactly once in the 
output?

I am using a simple read from kafka, transformations and rolling file sink 
pipeline.



Thanks,
Naveen

From: Fabian Hueske mailto:fhue...@gmail.com>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Friday, May 13, 2016 at 4:26 PM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Flink recovery

Hi Naveen,

the RollingFileSink supports exactly-once output. So you should be good.

Did you see events being emitted multiple times (should not happen with the 
RollingFileSink) or being processed multiple times within the Flink program 
(might happen as explained before)?

Best, Fabian

2016-05-13 23:19 GMT+02:00 Madhire, Naveen 
mailto:naveen.madh...@capitalone.com>>:
Thank you Fabian.

I am using HDFS rolling sink. This should support the exactly once output in 
case of failures, isn’t it? I am following the below documentation,

https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html#fault-tolerance-guarantees-of-data-sources-and-sinks

If not what other Sinks can I use to have the exactly once output since getting 
exactly once output is critical for our use case.



Thanks,
Naveen

From: Fabian Hueske mailto:fhue...@gmail.com>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Friday, May 13, 2016 at 4:13 PM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Re: Flink recovery

Hi,

Flink's exactly-once semantics do not mean that events are processed 
exactly-once but that events will contribute exactly-once to the state of an 
operator such as a counter.
Roughly, the mechanism works as follows:
- Flink peridically injects checkpoint markers into the data stream. This 
happens synchronously across all sources and markers.
- When an operator receives a checkpoint marker from all its sources, it 
checkpoints its state and forwards the marker
- When the marker was received by all sinks, the distributed checkpoint is 
noted as successful.

In case of a failure, the state of all operators is reset to the last 
successful checkpoint and the sources are reset to the point when the marker 
was injected.
Hence, some events are sent a second time to the operators but the state of the 
operators was reset as well. So the repeated events contribute exactly once to 
the state of an operator.

Note, you need a SinkFunction that supports Flink's checkpointing mechanism to 
achieve exactly-once output. Otherwise, it might happen that results are 
emitted multiple times.

Cheers, Fabian

2016-05-13 22:58 GMT+02:00 Madhire, Naveen 
mailto:naveen.madh...@capitalone.com>>:
I checked the JIRA and looks like FLINK-2111 should address the issue which I 
am facing. I am canceling the job from dashboard.

I am using kafka source and HDFS rolling sink.

https://issues.apache.org/jira/browse/FLINK-2111

Is this JIRA part of Flink 1.0.0?



Thanks,
Naveen

From: "Madhire, Venkat Naveen Kumar Reddy" 
mailto:naveen.madh...@capitalone.com>>
Reply-To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Date: Friday, May 13, 2016 at 10:58 AM
To: "user@flink.apache.org" 
mailto:user@flink.apache.org>>
Subject: Flink recovery

Hi,

We are trying to test the recovery mechanism of Flink with Kafka and HDFS sink 
during failures.

I’ve killed the job after processing some messages and restarted the same job 
again. Some of the messages I am seeing are processed more than once and not 
following the exactly once semantics.


Also, using the checkpointing mechanism and saving the state checkpoints into 
HDFS.
Below is the checkpoint code,


envStream.enableCheckpointing(11);
envStream.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
envStream.getCheckpointConfig().setCheckpointTimeout(6);
envStream.getCheckpointConfig().setMaxConcurrentCheckpoints(4);

envStream.setStateBackend(new 
FsStateBackend("hdfs://ipaddr/mount/cp/checkpoint/"));

One thing I’ve noticed is lowering the time to checkpointing is actually 
lowering the number of messages processed more than once and 11ms is the lowest 
I can use.

Is there anything else I should try to have exactly once message processing 
functionality.

I am using Flink 1.0.0 and kafka 0.8


Thank you.



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital