Flink optimizer optimizations

2016-04-15 Thread CPC
Hi

When i look for what kind of optimizations flink does, i found
https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals  is
it up to date? Also i couldnt understand:

"Reusing of partitionings and sort orders across operators. If one operator
leaves the data in partitioned fashion (and or sorted order), the next
operator will automatically try and reuse these characteristics. The
planning for this is done holistically and can cause earlier operators to
pick more expensive algorithms, if they allow for better reusing of
sort-order and partitioning."

Can you give example for "earlier operators to pick more expensive
algorithms" ?

Regards


Re: Flink optimizer optimizations

2016-04-17 Thread CPC
Himmm i understand now. Thank you guys:)
On Apr 16, 2016 2:21 PM, "Matthias J. Sax"  wrote:

> Sure. WITHOUT.
>
> Thanks. Good catch :)
>
> On 04/16/2016 01:18 PM, Ufuk Celebi wrote:
> > On Sat, Apr 16, 2016 at 1:05 PM, Matthias J. Sax 
> wrote:
> >> (with the need to sort the data, because both
> >> datasets will be sorted on A already). Thus, the overhead of sorting in
> >> the group might pay of in the join.
> >
> > I think you meant to write withOUT the need to the sort the data, right?
> >
>
>


Data locality and scheduler

2016-04-26 Thread CPC
Hi,

I look at some scheduler documentations but could not find answer to my
question. My question is: suppose that i have a big file on 40 node hadoop
cluster and since it is a big file every node has at least one chunk of the
file. If i write a flink job and want to filter file and if job has
parelelism of 4(less that 40 actually) how datalocality is working? Does
some tasks read some chunks from remote nodes? Or scheduler schedule tasks
in way that keeping max paralelism at 4 but schedule tasks on every node?

Regards


Re: Data locality and scheduler

2016-04-26 Thread CPC
Hi

But isnt this behaviour can cause a lot of network activity? Is there any
roadmap or plan to change this behaviour?
On Apr 26, 2016 7:06 PM, "Fabian Hueske"  wrote:

> Hi,
>
> Flink starts four tasks and then lazily assigns input splits to these tasks
> with locality preference. So each task may consume more than one split.
> This is different from Hadoop MapReduce or Spark which schedule a new task
> for each input split.
> In your case, the four tasks would be scheduled to four of the 40 machines
> and most of the splits will be remotely read.
>
> Best, Fabian
>
>
> 2016-04-26 16:59 GMT+02:00 CPC :
>
> > Hi,
> >
> > I look at some scheduler documentations but could not find answer to my
> > question. My question is: suppose that i have a big file on 40 node
> hadoop
> > cluster and since it is a big file every node has at least one chunk of
> the
> > file. If i write a flink job and want to filter file and if job has
> > parelelism of 4(less that 40 actually) how datalocality is working? Does
> > some tasks read some chunks from remote nodes? Or scheduler schedule
> tasks
> > in way that keeping max paralelism at 4 but schedule tasks on every node?
> >
> > Regards
> >
>


Dataset split/demultiplex

2016-05-12 Thread CPC
Hi folks,

Is there any way in dataset api to split Dataset[A] to Dataset[A] and
Dataset[B] ? Use case belongs to a custom filter component that we want to
implement. We will want to direct input elements whose result is false
after we apply the predicate. Actually we want to direct input elements
that throw exception to another output as well(demultiplexer like
component).

Thank you in advance...


Re: Dataset split/demultiplex

2016-05-12 Thread CPC
Hi Gabor,

Yes functionally this helps. But in this case i am processing an element
twice and sending  whole data to two different operator . What i am trying
to achieve is like datastream split  like functionality or a little bit
more:
In filter like scenario i want to do below pseudo operation:

def function(iter: Iterator[URLOutputData], trueEvents:
>> Collector[URLOutputData], falseEvents: Collector[URLOutputData], errEvents:
>> Collector[URLOutputData]) {
>
> iter.foreach {
>
>   i =>
>
> try {
>
>   if (predicate(i))
>
> trueEvents.collect(i)
>
>   else
>
> falseEvents.collect(i)
>
> } catch {
>
>   case _ => errEvents.collect(i)
>
> }
>
> }
>
>   }
>
>
Another case could be,suppose i have an input set of web events comes from
different web apps and i want to split dataset based on application category

Thanks,


On 12 May 2016 at 17:28, Gábor Gévay  wrote:

> Hello,
>
> You can split a DataSet into two DataSets with two filters:
>
> val xs: DataSet[A] = ...
> val split1: DataSet[A] = xs.filter(f1)
> val split2: DataSet[A] = xs.filter(f2)
>
> where f1 and f2 are true for those elements that should go into the
> first and second DataSets respectively. So far, the splits will just
> contain elements from the input DataSet, but you can of course apply
> some map after one of the filters.
>
> Does this help?
>
> Best,
> Gábor
>
>
>
> 2016-05-12 16:03 GMT+02:00 CPC :
> > Hi folks,
> >
> > Is there any way in dataset api to split Dataset[A] to Dataset[A] and
> > Dataset[B] ? Use case belongs to a custom filter component that we want
> to
> > implement. We will want to direct input elements whose result is false
> > after we apply the predicate. Actually we want to direct input elements
> > that throw exception to another output as well(demultiplexer like
> > component).
> >
> > Thank you in advance...
>


Re: Dataset split/demultiplex

2016-05-12 Thread CPC
Hi,

if it just require implementing a custom operator(i mean does not require
changes to network stack or other engine level changes)  i can try to
implement it since i am working on optimizer and plan generation for a
month. Also  we are going to implement our etl framework on flink and this
kind of scenario is a good fit and a common requirement in etl like flows.
If you can help me which parts of the project I should look for , i can try
it.

Thanks
On May 12, 2016 6:54 PM, "Aljoscha Krettek"  wrote:

> Hi,
> I agree that this would be very nice. Unfortunately Flink does only allow
> one output from an operation right now. Maybe we can extends this somehow
> in the future.
>
> Cheers,
> Aljoscha
>
> On Thu, 12 May 2016 at 17:27 CPC  wrote:
>
> > Hi Gabor,
> >
> > Yes functionally this helps. But in this case i am processing an element
> > twice and sending  whole data to two different operator . What i am
> trying
> > to achieve is like datastream split  like functionality or a little bit
> > more:
> > In filter like scenario i want to do below pseudo operation:
> >
> > def function(iter: Iterator[URLOutputData], trueEvents:
> > >> Collector[URLOutputData], falseEvents: Collector[URLOutputData],
> > errEvents:
> > >> Collector[URLOutputData]) {
> > >
> > > iter.foreach {
> > >
> > >   i =>
> > >
> > > try {
> > >
> > >   if (predicate(i))
> > >
> > > trueEvents.collect(i)
> > >
> > >   else
> > >
> > > falseEvents.collect(i)
> > >
> > > } catch {
> > >
> > >   case _ => errEvents.collect(i)
> > >
> > > }
> > >
> > > }
> > >
> > >   }
> > >
> > >
> > Another case could be,suppose i have an input set of web events comes
> from
> > different web apps and i want to split dataset based on application
> > category
> >
> > Thanks,
> >
> >
> > On 12 May 2016 at 17:28, Gábor Gévay  wrote:
> >
> > > Hello,
> > >
> > > You can split a DataSet into two DataSets with two filters:
> > >
> > > val xs: DataSet[A] = ...
> > > val split1: DataSet[A] = xs.filter(f1)
> > > val split2: DataSet[A] = xs.filter(f2)
> > >
> > > where f1 and f2 are true for those elements that should go into the
> > > first and second DataSets respectively. So far, the splits will just
> > > contain elements from the input DataSet, but you can of course apply
> > > some map after one of the filters.
> > >
> > > Does this help?
> > >
> > > Best,
> > > Gábor
> > >
> > >
> > >
> > > 2016-05-12 16:03 GMT+02:00 CPC :
> > > > Hi folks,
> > > >
> > > > Is there any way in dataset api to split Dataset[A] to Dataset[A] and
> > > > Dataset[B] ? Use case belongs to a custom filter component that we
> want
> > > to
> > > > implement. We will want to direct input elements whose result is
> false
> > > > after we apply the predicate. Actually we want to direct input
> elements
> > > > that throw exception to another output as well(demultiplexer like
> > > > component).
> > > >
> > > > Thank you in advance...
> > >
> >
>


DataStream split/select behaviour

2016-06-07 Thread CPC
Hello everyone,

When i use DataStream split/select,it always send all selected records to
same taskmanager. Is there any reason for this behaviour? Also is it
possible to implement same split/select behaviour for DataSet api(without
 using a different filter for every output )? I found this
https://issues.apache.org/jira/browse/FLINK-87 issue but it is still open...

Thanks...


Re: DataStream split/select behaviour

2016-06-07 Thread CPC
Sorry i think i misunderstand the issue. But it seams DataStream partition
the data by some field and when i select that  field only one taskmanager
processing the data. I can achieve same result when i use filter.Below is
the code piece:


import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.scala._

case class WikiData(prevID: Option[Int], curID: Int, num: Int, prevTitle:
String, curTitle: String, ttype: String)

object StreamingSelect {


  def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
//val rootPath = "gs://cpcflink/wikistream/"
val stream: DataStream[String] = env.createInput(new
TextInputFormat(new
Path("/home/capacman/Data/wiki/2015_01_en_clickstream.tsv")))

val wikiStream = stream.map {
  line =>
val values = line.split("\t")
WikiData(
  if (values(0).isEmpty) None else
Some(Integer.parseInt(values(0))),
  Integer.parseInt(values(1)),
  Integer.parseInt(values(2)),
  values(3),
  values(4),
  if (values.length < 6) null else values(5)
)
}

val split = wikiStream
  .split(i => if (i.curID == 14533) List("14533") else List.empty)
val stream14533 = split.select("14533").map(i => (i.curID, i.num))

stream14533.writeAsCsv("/home/capacman/Data/wiki/14533")

env.execute()
  }
}

On 7 June 2016 at 19:26, CPC  wrote:

> Hello everyone,
>
> When i use DataStream split/select,it always send all selected records to
> same taskmanager. Is there any reason for this behaviour? Also is it
> possible to implement same split/select behaviour for DataSet api(without
>  using a different filter for every output )? I found this
> https://issues.apache.org/jira/browse/FLINK-87 issue but it is still
> open...
>
> Thanks...
>


Re: incremental Checkpointing , Rocksdb HA

2016-06-09 Thread CPC
Cassandra backend would be interesting especially  if flink could benefit
from cassandra data locality. Cassandra/spark integration is using this for
information to schedule spark tasks.

On 9 June 2016 at 19:55, Nick Dimiduk  wrote:

> You might also consider support for a Bigtable
> backend: HBase/Accumulo/Cassandra. The data model should be similar
> (identical?) to RocksDB and you get HA, recoverability, and support for
> really large state "for free".
>
> On Thursday, June 9, 2016, Chen Qin  wrote:
>
> > Hi there,
> >
> > What is progress on incremental checkpointing? Does flink dev has plan to
> > work on this or JIRA to track this? super interested to know.
> >
> > I also research and consider use rocksdbstatebackend without running HDFS
> > cluster nor talk to S3. Some primitive idea is to use ZK to store /
> notify
> > state propagation progress and propagate via implement chain replication
> on
> > top of YARN provisioned storage node.
> >
> > Thanks,
> > Chen
> >
>


DagConnection tempMode vs breakPipeline property

2016-06-16 Thread CPC
Hi everybody,

I am trying to understand flink optimizer in DataSet api. But i dont
understant tempMode and breakPipeline properties. TempMode enum also has a
breakPipeline property but they both accessed and set in different places.
DagConnection.breakPipeline is used to select network io DataExchangeMode.
Though i dont understand DagConnection.TempMode and also tempMode in
Channel. Could somebody explain why there is two different breakPipeline
property and the duty of TempMode. Thank you in advance.


offheap memory allocation and memory leak bug

2016-06-17 Thread CPC
Hi,

I am making some test about offheap memory usage and encounter an odd
behavior. My taskmanager heap limit is 12288 Mb and when i set
"taskmanager.memory.off-hep:true" for every job it allocates 11673 Mb off
heap area at most which is heapsize*0.95(value of
taskmanager.memory.fraction). But when i submit second job it allocated
another 11GB and does not free memory since MaxDirectMemorySize set to
 -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"  which is
TM_MAX_OFFHEAP_SIZE="8388607T" and my laptop goes to swap then kernel oom
killed taskmanager. If i hit perform gc from visualvm between jobs then it
release direct memory but memory usage of taskmanager in ps command is
still around 20GB(RSS) and 27GB(virtual size)  in that case i could submit
my test job a few times without oom killed task manager but after 10 submit
 it killed taskmanager again.  I dont understand why jvm memory usage is
still high even if all direct memory released. Do you have any idea? Then
 i set MaxDirectMemorySize to 12 GB  in this case it freed direct memory
without any explicit gc triggering from visualvm but jvm process memory
usage was still high around 20GB(RSS) and 27GB(virtual size). After again
maybe 10 submit it killed taskmanager. I think this is a bug and make it
imposible to reuse taskmanagers without restarting them in standalone mode.

[image: Inline images 1]

[image: Inline images 2]


Re: offheap memory allocation and memory leak bug

2016-06-18 Thread CPC
Hello,

I repeated the same test with conf values.

> taskmanager.heap.mb: 6500
>
> taskmanager.memory.off-heap: true
>
> taskmanager.memory.fraction: 0.9
>
>
i set TM_MAX_OFFHEAP_SIZE="6G" in taskmanager sh. Taskmanager started with

> capacman 14543  323 56.0 17014744 13731328 pts/1 Sl 16:23  35:25
> /home/capacman/programlama/java/jdk1.7.0_75/bin/java
> -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -Xms650M -Xmx650M
> -XX:MaxDirectMemorySize=6G -XX:MaxPermSize=256m
> -Dlog.file=/home/capacman/Data/programlama/flink-1.0.3/log/flink-capacman-taskmanager-0-capacman-Aspire-V3-771.log
> -Dlog4j.configuration=file:/home/capacman/Data/programlama/flink-1.0.3/conf/log4j.properties
> -Dlogback.configurationFile=file:/home/capacman/Data/programlama/flink-1.0.3/conf/logback.xml
> -classpath
> /home/capacman/Data/programlama/flink-1.0.3/lib/flink-dist_2.11-1.0.3.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/flink-python_2.11-1.0.3.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/log4j-1.2.17.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/slf4j-log4j12-1.7.7.jar:::
> org.apache.flink.runtime.taskmanager.TaskManager --configDir
> /home/capacman/Data/programlama/flink-1.0.3/conf
>

but memory usage reach up to 13Gb. Could somebodey explain me why memory
usage is so high? I expect it to be at most 8GB with some jvm internal
overhead.

[image: Inline images 1]

[image: Inline images 2]

On 17 June 2016 at 20:26, CPC  wrote:

> Hi,
>
> I am making some test about offheap memory usage and encounter an odd
> behavior. My taskmanager heap limit is 12288 Mb and when i set
> "taskmanager.memory.off-hep:true" for every job it allocates 11673 Mb off
> heap area at most which is heapsize*0.95(value of
> taskmanager.memory.fraction). But when i submit second job it allocated
> another 11GB and does not free memory since MaxDirectMemorySize set to
>  -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"  which is
> TM_MAX_OFFHEAP_SIZE="8388607T" and my laptop goes to swap then kernel oom
> killed taskmanager. If i hit perform gc from visualvm between jobs then it
> release direct memory but memory usage of taskmanager in ps command is
> still around 20GB(RSS) and 27GB(virtual size)  in that case i could submit
> my test job a few times without oom killed task manager but after 10 submit
>  it killed taskmanager again.  I dont understand why jvm memory usage is
> still high even if all direct memory released. Do you have any idea? Then
>  i set MaxDirectMemorySize to 12 GB  in this case it freed direct memory
> without any explicit gc triggering from visualvm but jvm process memory
> usage was still high around 20GB(RSS) and 27GB(virtual size). After again
> maybe 10 submit it killed taskmanager. I think this is a bug and make it
> imposible to reuse taskmanagers without restarting them in standalone mode.
>
> [image: Inline images 1]
>
> [image: Inline images 2]
>


Re: offheap memory allocation and memory leak bug

2016-06-19 Thread CPC
Hi,

I think i found some information regarding this behavior.  In jvm it is
almost imposible to free allocated memory via ByteBuffer.allocateDirect.
There is no explicit way to say jvm "free this direct bytebuffer". In some
forums they said you can free memory with below method:

> def releaseBuffers(buffers:List[ByteBuffer]):List[ByteBuffer] = {
>
> if(!buffers.isEmpty){
>
> val cleanerMethod = buffers.head.getClass.getMethod("cleaner")
>
> cleanerMethod.setAccessible(true)
>
> buffers.foreach{buffer=>
>
> val cleaner = cleanerMethod.invoke(buffer)
>
> val cleanMethod = cleaner.getClass().getMethod("clean")
>
> cleanMethod.setAccessible(true)
>
> cleanMethod.invoke(cleaner)
>
> }
>
> }
>
> List.empty[ByteBuffer]
>
> }
>
>
but since cleaner method is an internal method ,above  is not recommended
and not working in every jvm and java 9 does not support it also. I also
made some tests with above method and behavior is not predictable. If
memory allocated by some other thread and that thread exit then it release
memory. Actually GC controls directMemory buffers. If there is no gc
activity and memory is allocated and then dereferenced by different threads
memory usage goes beyond intended and machine goes to swap then os kills
taskmanager. In my tests i saw that behaviour:

Suppose that thread A allocated 8gb memory exit and there is no reference
to allocated memory
than thread B allocated 8gb memory exit and there is no reference to
allocated memory

when i look at direct memory usage from jvisualvm it looks like
below(-Xmx512m -XX:MaxDirectMemorySize=12G)

[image: Inline images 1]

but RSS of the process is 16 GB. If i call System.gc at that point RSS
drops to 8GB but not to expected point.

This is why Apache cassandra guys select sun.misc.Unsafe(
http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/Off-heap-caching-through-ByteBuffer-allocateDirect-when-JNA-not-available-td6977711.html
).

I think currently only way to limit memory usage in flink if you want to
use same taskmanager across jobs is via "taskmanager.memory.preallocate:
true". Since it allocate memory at the beginning and not freed its memory
usage stays constant.

PS: Sorry for my english i am not a native speaker. I hope i can explain
what i intended to :)



On 18 June 2016 at 16:36, CPC  wrote:

> Hello,
>
> I repeated the same test with conf values.
>
>> taskmanager.heap.mb: 6500
>>
>> taskmanager.memory.off-heap: true
>>
>> taskmanager.memory.fraction: 0.9
>>
>>
> i set TM_MAX_OFFHEAP_SIZE="6G" in taskmanager sh. Taskmanager started with
>
>> capacman 14543  323 56.0 17014744 13731328 pts/1 Sl 16:23  35:25
>> /home/capacman/programlama/java/jdk1.7.0_75/bin/java
>> -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -Xms650M -Xmx650M
>> -XX:MaxDirectMemorySize=6G -XX:MaxPermSize=256m
>> -Dlog.file=/home/capacman/Data/programlama/flink-1.0.3/log/flink-capacman-taskmanager-0-capacman-Aspire-V3-771.log
>> -Dlog4j.configuration=file:/home/capacman/Data/programlama/flink-1.0.3/conf/log4j.properties
>> -Dlogback.configurationFile=file:/home/capacman/Data/programlama/flink-1.0.3/conf/logback.xml
>> -classpath
>> /home/capacman/Data/programlama/flink-1.0.3/lib/flink-dist_2.11-1.0.3.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/flink-python_2.11-1.0.3.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/log4j-1.2.17.jar:/home/capacman/Data/programlama/flink-1.0.3/lib/slf4j-log4j12-1.7.7.jar:::
>> org.apache.flink.runtime.taskmanager.TaskManager --configDir
>> /home/capacman/Data/programlama/flink-1.0.3/conf
>>
>
> but memory usage reach up to 13Gb. Could somebodey explain me why memory
> usage is so high? I expect it to be at most 8GB with some jvm internal
> overhead.
>
> [image: Inline images 1]
>
> [image: Inline images 2]
>
> On 17 June 2016 at 20:26, CPC  wrote:
>
>> Hi,
>>
>> I am making some test about offheap memory usage and encounter an odd
>> behavior. My taskmanager heap limit is 12288 Mb and when i set
>> "taskmanager.memory.off-hep:true" for every job it allocates 11673 Mb off
>> heap area at most which is heapsize*0.95(value of
>> taskmanager.memory.fraction). But when i submit second job it allocated
>> another 11GB and does not free memory since MaxDirectMemorySize set to
>>  -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"  which is
>> TM_MAX_OFFHEAP_SIZE="8388607T" and my laptop goes to swap then kernel oom
>> killed taskmanager. If i hit perform gc from visualvm between jobs then it
>> release direct memory but memory usage o

Re: offheap memory allocation and memory leak bug

2016-06-20 Thread CPC
Hi Till,

I saw jira issue. Do you want me to upload input dataset as well? If you
want i can prepare a github repo if it would be more easier.
On Jun 20, 2016 1:10 PM, "Till Rohrmann"  wrote:

> Hi,
>
> your observation sounds like a bug to me and we have to further investigate
> it. I assume that you’re running a batch job, right? Could you maybe share
> your complete configuration and the job to reproduce the problem with us?
>
> I think that your investigation that direct buffers are not properly freed
> and garbage collected can be right. I will open a JIRA issue to further
> investigate and solve the problem. Thanks for reporting :-)
>
> At the moment, one way to solve this problem is, as you’ve already stated,
> to set taskmanager.memory.preallocate: true in your configuration. For
> batch jobs, this should actually improve the runtime performance at the
> cost of a slightly longer start-up time for your TaskManagers.
>
> Cheers,
> Till
> ​
>
> On Sun, Jun 19, 2016 at 6:16 PM, CPC  wrote:
>
> > Hi,
> >
> > I think i found some information regarding this behavior.  In jvm it is
> > almost imposible to free allocated memory via ByteBuffer.allocateDirect.
> > There is no explicit way to say jvm "free this direct bytebuffer". In
> some
> > forums they said you can free memory with below method:
> >
> >> def releaseBuffers(buffers:List[ByteBuffer]):List[ByteBuffer] = {
> >>
> >> if(!buffers.isEmpty){
> >>
> >> val cleanerMethod = buffers.head.getClass.getMethod("cleaner")
> >>
> >> cleanerMethod.setAccessible(true)
> >>
> >> buffers.foreach{buffer=>
> >>
> >> val cleaner = cleanerMethod.invoke(buffer)
> >>
> >> val cleanMethod = cleaner.getClass().getMethod("clean")
> >>
> >> cleanMethod.setAccessible(true)
> >>
> >> cleanMethod.invoke(cleaner)
> >>
> >> }
> >>
> >> }
> >>
> >> List.empty[ByteBuffer]
> >>
> >> }
> >>
> >>
> > but since cleaner method is an internal method ,above  is not recommended
> > and not working in every jvm and java 9 does not support it also. I also
> > made some tests with above method and behavior is not predictable. If
> > memory allocated by some other thread and that thread exit then it
> release
> > memory. Actually GC controls directMemory buffers. If there is no gc
> > activity and memory is allocated and then dereferenced by different
> threads
> > memory usage goes beyond intended and machine goes to swap then os kills
> > taskmanager. In my tests i saw that behaviour:
> >
> > Suppose that thread A allocated 8gb memory exit and there is no reference
> > to allocated memory
> > than thread B allocated 8gb memory exit and there is no reference to
> > allocated memory
> >
> > when i look at direct memory usage from jvisualvm it looks like
> > below(-Xmx512m -XX:MaxDirectMemorySize=12G)
> >
> > [image: Inline images 1]
> >
> > but RSS of the process is 16 GB. If i call System.gc at that point RSS
> > drops to 8GB but not to expected point.
> >
> > This is why Apache cassandra guys select sun.misc.Unsafe(
> >
> http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/Off-heap-caching-through-ByteBuffer-allocateDirect-when-JNA-not-available-td6977711.html
> > ).
> >
> > I think currently only way to limit memory usage in flink if you want to
> > use same taskmanager across jobs is via "taskmanager.memory.preallocate:
> > true". Since it allocate memory at the beginning and not freed its memory
> > usage stays constant.
> >
> > PS: Sorry for my english i am not a native speaker. I hope i can explain
> > what i intended to :)
> >
> >
> >
> > On 18 June 2016 at 16:36, CPC  wrote:
> >
> >> Hello,
> >>
> >> I repeated the same test with conf values.
> >>
> >>> taskmanager.heap.mb: 6500
> >>>
> >>> taskmanager.memory.off-heap: true
> >>>
> >>> taskmanager.memory.fraction: 0.9
> >>>
> >>>
> >> i set TM_MAX_OFFHEAP_SIZE="6G" in taskmanager sh. Taskmanager started
> >> with
> >>
> >>> capacman 14543  323 56.0 17014744 13731328 pts/1 Sl 16:23  35:25
> >>> /home/capacman/programlama/java/jdk1.7.0_75/bin/java
> >>> -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled 

Re: DagConnection tempMode vs breakPipeline property

2016-07-04 Thread CPC
Thank you ufuk. Yes it helps alot. Yardımın için teşekkürler.
On Jul 4, 2016 2:02 PM, "Ufuk Celebi"  wrote:

> The data exchange mode has been introduced recently as a replacement
> for the pipeline break logic, which was buggy. I'm not too familiar
> with the optimizer, but I would expect everything that goes back to
> the DataExchangeMode to be correct. The rest should be an artifact of
> the old pipeline breaker logic and not be used any more. Does this
> help in any way?
>
> On Thu, Jun 16, 2016 at 4:46 PM, CPC  wrote:
> > Hi everybody,
> >
> > I am trying to understand flink optimizer in DataSet api. But i dont
> > understant tempMode and breakPipeline properties. TempMode enum also has
> a
> > breakPipeline property but they both accessed and set in different
> places.
> > DagConnection.breakPipeline is used to select network io
> DataExchangeMode.
> > Though i dont understand DagConnection.TempMode and also tempMode in
> > Channel. Could somebody explain why there is two different breakPipeline
> > property and the duty of TempMode. Thank you in advance.
>


release 1.1 rc2 confusion

2016-08-07 Thread CPC
Hi,

On flink downloads page it says that the latest version is 1.1 and have
links for 1.1 binaries but hasnt rc2 voted recently? Are those binaries 1.1
or rc2?


Re: [Discuss] FLIP-13 Side Outputs in Flink

2016-10-26 Thread CPC
Is it just related to stream api? This feature could be really useful for
etl scenarios with dataset api as well.

On Oct 26, 2016 22:29, "Fabian Hueske"  wrote:

> Hi Chen,
>
> thanks for this interesting proposal. I think side output would be a very
> valuable feature to have!
>
> I went of the FLIP and have a few questions.
>
> - Will multiple side outputs of the same type be supported?
> - If I got it right, the FLIP proposes to change the signatures of many
> user-defined functions (FlatMapFunction, WindowFunction, ...). Most of
> these interfaces/classes are annotated with @Public, which means we cannot
> change them in the Flink 1.x release line. What would be alternatives? I
> can think of a) casting the Collector into a RichCollector (as you do in
> your prototype) or b) retrieve the RichCollector from the RuntimeContext
> that a RichFunction provides.
>
> I'm not so familiar with the internals of the DataStream API, so I leave
> comments on that to other.
>
> Best, Fabian
>
> 2016-10-25 18:00 GMT+02:00 Chen Qin :
>
> > Hey folks,
> >
> > Please give feedback on FLIP-13!
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > 13+Side+Outputs+in+Flink
> > JIRA task link to google doc
> > https://issues.apache.org/jira/browse/FLINK-4460
> >
> > Thanks,
> > Chen Qin
> >
>


Dataset and eager scheduling

2017-03-02 Thread CPC
Hi all,

Currently our team trying implement a runtime operator also playing with
scheduler. We are trying to understand batch optimizer but it will take
some time. What we want to know is whether changing batch scheduling mode
from LAZY_FROM_SOURCES to EAGER could affect optimizer? I mean whether
optimizer have some strong assumptions that batch jobs scheduling mode is
always lazy_from_sources?

Thanks...


Re: Dataset and eager scheduling

2017-03-02 Thread CPC
Hi Till,

Thank you.



On 2 March 2017 at 18:13, Till Rohrmann  wrote:

> Hi CPC,
>
> I think that the optimizer does not take the scheduling mode into account
> when optimizing a Flink job.
>
> Cheers,
> Till
>
> On Thu, Mar 2, 2017 at 11:52 AM, CPC  wrote:
>
> > Hi all,
> >
> > Currently our team trying implement a runtime operator also playing with
> > scheduler. We are trying to understand batch optimizer but it will take
> > some time. What we want to know is whether changing batch scheduling mode
> > from LAZY_FROM_SOURCES to EAGER could affect optimizer? I mean whether
> > optimizer have some strong assumptions that batch jobs scheduling mode is
> > always lazy_from_sources?
> >
> > Thanks...
> >
>


Dataset and select/split functionality

2017-03-02 Thread CPC
Hi all,

We will try to implement select/split functionality for batch api. We
looked at streaming side and understand how it works but since streaming
side does not include an optimizer it was easier. Since adding such a
runtime operator will affect optimizer layer as well, is there a part that
you want us to pay particular attention to?

Thanks...


Re: Dataset and select/split functionality

2017-03-03 Thread CPC
Hi Fabian,

Thank you for your explanation. Also can you give an example on how the
optimizer behaves on the assumption that the outputs of a function are
replicated?

Thank you...

On 3 March 2017 at 13:52, Fabian Hueske  wrote:

> Hi CPC,
>
> we had several requests in the past to add this features. However, adding
> select/split for DataSet is much! more work than you would expect.
> As you pointed out, we have to go through the optimizer, which assumes that
> the outputs of a function are replicated.
> This is pretty much wired in and you would have to touch a lot of code.
>
> I'm sorry, but am not comfortable doing such a big change.
> IMO, the potential gains are not worth the effort of implementation and
> verification and the risk of breaking something.
>
> Best, Fabian
>
>
>
> 2017-03-02 16:31 GMT+01:00 CPC :
>
> > Hi all,
> >
> > We will try to implement select/split functionality for batch api. We
> > looked at streaming side and understand how it works but since streaming
> > side does not include an optimizer it was easier. Since adding such a
> > runtime operator will affect optimizer layer as well, is there a part
> that
> > you want us to pay particular attention to?
> >
> > Thanks...
> >
>