SLF4j logging system gets clobbered?

2017-10-18 Thread Jared Stehler
I’m having an issue where I’ve got logging setup and functioning for my 
flink-mesos deployment, and works fine up to a point (the same point every 
time) where it seems to fall back to “defaults” and loses all of my configured 
filtering.

2017-10-11 21:37:17.454 [flink-akka.actor.default-dispatcher-17] INFO  
o.a.f.m.runtime.clusterframework.MesosFlinkResourceManager  - TaskManager 
taskmanager-8 has started.
2017-10-11 21:37:17.454 [flink-akka.actor.default-dispatcher-16] INFO  
org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager at 
ip-10-80-54-201 
(akka.tcp://fl...@ip-10-80-54-201.us-west-2.compute.internal:31014/user/taskmanager)
 as 697add78bd00fe7dc6a7aa60bc8d75fb. Current number of registered hosts is 39. 
Current number of alive task slots is 39.
2017-10-11 21:37:18.820 [flink-akka.actor.default-dispatcher-17] INFO  
org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager at 
ip-10-80-54-201 
(akka.tcp://fl...@ip-10-80-54-201.us-west-2.compute.internal:31018/user/taskmanager)
 as a6cff0f18d71aabfb3b112f5e2c36c2b. Current number of registered hosts is 40. 
Current number of alive task slots is 40.
2017-10-11 21:37:18.821 [flink-akka.actor.default-dispatcher-17] INFO  
o.a.f.m.runtime.clusterframework.MesosFlinkResourceManager  - TaskManager 
taskmanager-00010 has started.
2017-10-11 21:39:04,371:6171(0x7f67fe9cd700):ZOO_WARN@zookeeper_interest@1570: 
Exceeded deadline by 13ms

— here is where it turns over into default pattern layout ---
21:39:05.616 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.blob.BlobClient - 
Blob client connecting to akka://flink/user/jobmanager

21:39:09.322 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.client.JobClient - 
Checking and uploading JAR files
21:39:09.322 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.blob.BlobClient - 
Blob client connecting to akka://flink/user/jobmanager
21:39:09.788 [flink-akka.actor.default-dispatcher-4] INFO  
o.a.f.m.r.c.MesosJobManager - Submitting job 005b570ff2866023aa905f2bc850f7a3 
(Sa-As-2b-Submission-Join-V3 := demos-demo500--data-canvas-2-sa-qs-as-v3).
21:39:09.789 [flink-akka.actor.default-dispatcher-4] INFO  
o.a.f.m.r.c.MesosJobManager - Using restart strategy 
FailureRateRestartStrategy(failuresInterval=12 msdelayInterval=1000 
msmaxFailuresPerInterval=3) for 005b570ff2866023aa905f2bc850f7a3.
21:39:09.789 [flink-akka.actor.default-dispatcher-4] INFO  
o.a.f.r.e.ExecutionGraph - Job recovers via failover strategy: full graph 
restart
21:39:09.790 [flink-akka.actor.default-dispatcher-4] INFO  
o.a.f.m.r.c.MesosJobManager - Running initialization on master for job 
Sa-As-2b-Submission-Join-V3 := demos-demo500--data-canvas-2-sa-qs-as-v3 
(005b570ff2866023aa905f2bc850f7a3).
21:39:09.790 [flink-akka.actor.default-dispatcher-4] INFO  
o.a.f.m.r.c.MesosJobManager - Successfully ran initialization on master in 0 ms.
21:39:09.791 [flink-akka.actor.default-dispatcher-4] WARN  
o.a.f.configuration.Configuration - Config uses deprecated configuration key 
'high-availability.zookeeper.storageDir' instead of proper key 
'high-availability.storageDir'
21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
o.a.f.c.GlobalConfiguration - Loading configuration property: 
mesos.failover-timeout, 60
21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
o.a.f.c.GlobalConfiguration - Loading configuration property: 
mesos.initial-tasks, 1
21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
o.a.f.c.GlobalConfiguration - Loading configuration property: 
mesos.maximum-failed-tasks, -1
21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO  
o.a.f.c.GlobalConfiguration - Loading configuration property: 
mesos.resourcemanager.framework.role, '*'

The reason this is a vexing issue is that the app master then proceeds to dump 
megabytes of " o.a.f.c.GlobalConfiguration - Loading configuration property:” 
messages into the log, and I’m unable to filter them out.

My logback config is:





%d{-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n





ERROR





















--
Jared Stehler
Chief Architect - Intellify Learning
o: 617.701.6330 x703





Re: Task Manager was lost/killed due to full GC

2017-10-18 Thread Fabian Hueske
Thanks for the heads-up and explaining how you resolve the issue!

Best, Fabian

2017-10-18 3:50 GMT+02:00 ShB :

> I just wanted to leave an update about this issue, for someone else who
> might
> come across it. The problem was with memory, but it was disk memory and not
> heap/off-heap memory. Yarn was killing off my containers as they exceeded
> the threshold for disk utilization and this was manifesting as Task manager
> was lost/killed or JobClientActorConnectionTimeoutException: Lost
> connection
> to the JobManager. Digging deep into the individual instance node manager
> logs provided some hints about it being a disk issue.
>
> Some fixes for this problem:
> yarn.nodemanager.disk-health-checker.max-disk-utilization-
> per-disk-percentage
> -- can be increased to alleviate the problem temporarily.
> Increasing the disk capacity on each task manager is a more long-term fix.
> Increasing the number of task managers increases available disk memory and
> hence is also a fix.
>
> Thanks!
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Off heap memory issue

2017-10-18 Thread Javier Lopez
Hi Robert,

Sorry to reply this late. We did a lot of tests, trying to identify if the
problem was in our custom sources/sinks. We figured out that none of our
custom components is causing this problem. We came up with a small test,
and realized that the Flink nodes run out of non-heap JVM memory and crash
after deployment of thousands of jobs.

When rapidly deploying thousands or hundreds of thousands of Flink jobs -
depending on job complexity in terms of resource consumption - Flink nodes
non-heap JVM memory consumption grows until there is no more memory left on
the machine and the Flink process crashes. Both TaskManagers and JobManager
exhibit the same behavior. The TaskManagers die faster though. The memory
consumption doesn't decrease after stopping the deployment of new jobs,
with the cluster being idle (no running jobs).

We could replicate the behavior by the rapid deployment of the WordCount
Job provided in the Quickstart with a Python script.  We started 24
instances of the deployment script to run in parallel.

The non-heap JVM memory consumption grows faster with more complex jobs,
i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less
deployed jobs are needed until the TaskManagers/JobManager dies.

We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with 4GB
RAM inside Docker containers. For the test, we used 2 TaskManagers and 1
JobManager.

( * ) a slightly changed Python script was used, which waited after
deployment 15 seconds for the 10K events to be read from Kafka, then it
canceled the freshly deployed job via Flink REST API.

If you want we can provide the Scripts and Jobs we used for this test. We
have a workaround for this, which restarts the Flink nodes once a memory
threshold is reached. But this has lowered the availability of our services.

Thanks for your help.

On 30 August 2017 at 10:39, Robert Metzger  wrote:

> I just saw that your other email is about the same issue.
>
> Since you've done a heapdump already, did you see any pattern in the
> allocated objects? Ideally none of the classes from your user code should
> stick around when no job is running.
> What's the size of the heap dump? I'm happy to take a look at it if it's
> reasonably small.
>
> On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger 
> wrote:
>
>> Hi Javier,
>>
>> I'm not aware of such issues with Flink, but if you could give us some
>> more details on your setup, I might get some more ideas on what to look for.
>>
>> are you using the RocksDBStateBackend? (RocksDB is doing some JNI
>> allocations, that could potentially leak memory)
>> Also, are you passing any special garbage collector options? (Maybe some
>> classes are not unloaded)
>> Are you using anything else that is special (such as protobuf or avro
>> formats, or any other big library)?
>>
>> Regards,
>> Robert
>>
>>
>>
>> On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez 
>> wrote:
>>
>>> Hi all,
>>>
>>> we are starting a lot of Flink jobs (streaming), and after we have
>>> started 200 or more jobs we see that the non-heap memory in the
>>> taskmanagers increases a lot, to the point of killing the instances. We
>>> found out that every time we start a new job, the committed non-heap memory
>>> increases by 5 to 10MB. Is this an expected behavior? Are there ways to
>>> prevent this?
>>>
>>
>>
>


Re: Off heap memory issue

2017-10-18 Thread Flavio Pompermaier
We also faced the same problem, but the number of jobs we can run before
restarting the cluster depends on the volume of the data to shuffle around
the network. We even had problems with a single job and in order to avoid
OOM issues we had to put some configuration to limit Netty memory usage,
i.e.:
 - Add to flink.yaml -> env.java.opts: -Dio.netty.recycler.maxCapacit
y.default=1
 - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g

At this purpose we wrote a small test to reproduce the problem and we
opened an issue for that [1].
We still don't know if the problems are related however..

I hope that could be helpful,
Flavio

[1] https://issues.apache.org/jira/browse/FLINK-7845

On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez 
wrote:

> Hi Robert,
>
> Sorry to reply this late. We did a lot of tests, trying to identify if the
> problem was in our custom sources/sinks. We figured out that none of our
> custom components is causing this problem. We came up with a small test,
> and realized that the Flink nodes run out of non-heap JVM memory and crash
> after deployment of thousands of jobs.
>
> When rapidly deploying thousands or hundreds of thousands of Flink jobs -
> depending on job complexity in terms of resource consumption - Flink nodes
> non-heap JVM memory consumption grows until there is no more memory left on
> the machine and the Flink process crashes. Both TaskManagers and JobManager
> exhibit the same behavior. The TaskManagers die faster though. The memory
> consumption doesn't decrease after stopping the deployment of new jobs,
> with the cluster being idle (no running jobs).
>
> We could replicate the behavior by the rapid deployment of the WordCount
> Job provided in the Quickstart with a Python script.  We started 24
> instances of the deployment script to run in parallel.
>
> The non-heap JVM memory consumption grows faster with more complex jobs,
> i.e. reading from Kafka 10K events and printing to STDOUT( * ). Thus less
> deployed jobs are needed until the TaskManagers/JobManager dies.
>
> We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes with
> 4GB RAM inside Docker containers. For the test, we used 2 TaskManagers and
> 1 JobManager.
>
> ( * ) a slightly changed Python script was used, which waited after
> deployment 15 seconds for the 10K events to be read from Kafka, then it
> canceled the freshly deployed job via Flink REST API.
>
> If you want we can provide the Scripts and Jobs we used for this test. We
> have a workaround for this, which restarts the Flink nodes once a memory
> threshold is reached. But this has lowered the availability of our services.
>
> Thanks for your help.
>
> On 30 August 2017 at 10:39, Robert Metzger  wrote:
>
>> I just saw that your other email is about the same issue.
>>
>> Since you've done a heapdump already, did you see any pattern in the
>> allocated objects? Ideally none of the classes from your user code should
>> stick around when no job is running.
>> What's the size of the heap dump? I'm happy to take a look at it if it's
>> reasonably small.
>>
>> On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger 
>> wrote:
>>
>>> Hi Javier,
>>>
>>> I'm not aware of such issues with Flink, but if you could give us some
>>> more details on your setup, I might get some more ideas on what to look for.
>>>
>>> are you using the RocksDBStateBackend? (RocksDB is doing some JNI
>>> allocations, that could potentially leak memory)
>>> Also, are you passing any special garbage collector options? (Maybe some
>>> classes are not unloaded)
>>> Are you using anything else that is special (such as protobuf or avro
>>> formats, or any other big library)?
>>>
>>> Regards,
>>> Robert
>>>
>>>
>>>
>>> On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez 
>>> wrote:
>>>
 Hi all,

 we are starting a lot of Flink jobs (streaming), and after we have
 started 200 or more jobs we see that the non-heap memory in the
 taskmanagers increases a lot, to the point of killing the instances. We
 found out that every time we start a new job, the committed non-heap memory
 increases by 5 to 10MB. Is this an expected behavior? Are there ways to
 prevent this?

>>>
>>>
>>
>


Garbage collection concerns with Task Manager memory

2017-10-18 Thread Marchant, Hayden
I read in the Flink documentation that the TaskManager runs all tasks within 
its own JVM, and that the recommendation is to set the taskmanager.heap.mb to 
be as much as is available on the server. I have a very large server with 192GB 
so thinking of giving most of it to the Task Manager.

I recall that there are concerns with long stop-the-world garbage collection 
pauses about allocating too much memory to a JVM - is this still a concern with 
G1 ?


Thanks,
Hayden Marchant





Re: Maven release

2017-10-18 Thread Gary Yao
Hi Biswajit,

The distribution management configuration can be found in the parent pom of
flink-parent:


  org.apache
  apache
  18



  
apache.releases.https
Apache Release Distribution Repository
https://repository.apache.org/service/local/staging/deploy/maven2

  
  
apache.snapshots.https
${distMgmtSnapshotsName}
${distMgmtSnapshotsUrl}
  


The properties are set to the following values in the parent pom:

  Apache Development Snapshot
Repository
  
https://repository.apache.org/content/repositories/snapshots


With -DdistMgmtSnapshotsUrl you should be able to provide the url of your
own
repository.

Best,

Gary


On Wed, Oct 18, 2017 at 12:56 AM, Biswajit Das 
wrote:

> Hi Team ,
>
> Is there any instruction any where like how to publish release  , I have
> been trying to publish release to my own private nexus  repository but some
> how it seems always trying to upload  
> *https://repository.apache.org/service/local/staging/deploy/
>   *even I
> tried to set with *--settings* my *settings.xml *and *-Durl *but some
> where it's is picking always apache nexus repository . I don't see any
> distribution entry in any POM .
>
> Thank you .
> Biswajit
>
>
>


Re: Garbage collection concerns with Task Manager memory

2017-10-18 Thread Kien Truong

Hi,

Yes, GC is still a major concern. Even G1 has a hard time dealing with 
>64GB heap in our experience.


To mitigate, we run multiple TMs with smaller heap per machine, and use 
RocksDBStateBackend.


Best regards,

Kien

On 10/18/2017 4:40 PM, Marchant, Hayden wrote:

I read in the Flink documentation that the TaskManager runs all tasks within 
its own JVM, and that the recommendation is to set the taskmanager.heap.mb to 
be as much as is available on the server. I have a very large server with 192GB 
so thinking of giving most of it to the Task Manager.

I recall that there are concerns with long stop-the-world garbage collection 
pauses about allocating too much memory to a JVM - is this still a concern with 
G1 ?


Thanks,
Hayden Marchant





Re: Stumped writing to KafkaJSONSink

2017-10-18 Thread Fabian Hueske
Hi Kenny,

this look almost correct.
The Table class has a method writeToSink(TableSink) that should address
your use case (so the same as yours but without the TableEnvironment
argument).

Does that work for you?
If not what kind of error and error message do you get?

Best, Fabian

2017-10-18 1:28 GMT+02:00 Kenny Gorman :

> I am hoping you guys can help me. I am stumped how to actually write to
> Kafka using Kafka09JsonTableSink using the Table API. Here is my code
> below, I am hoping you guys can shed some light on how this should be done.
> I don’t see any methods for the actual write to Kafka. I am probably doing
> something stupid. TIA.
>
> Thanks!
> Kenny
>
> // run some SQL to filter results where a key is not null
> String sql = "SELECT icao FROM flights WHERE icao is not null";
> tableEnv.registerTableSource("flights", kafkaTableSource);
> Table result = tableEnv.sql(sql);
>
> // create a partition for the data going into kafka
> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>
> // create new tablesink of JSON to kafka
> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
> params.getRequired("write-topic"),
> params.getProperties(),
> partition);
>
> result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do
> this, but no such method..


Re: Off heap memory issue

2017-10-18 Thread Kien Truong

Hi,

We saw a similar issue in one of our job due to ByteBuffer memory leak[1].

We fixed it using the solution in the article, setting 
-Djdk.nio.maxCachedBufferSize


This variable is available for Java > 8u102

Best regards,

Kien

[1]http://www.evanjones.ca/java-bytebuffer-leak.html


On 10/18/2017 4:06 PM, Flavio Pompermaier wrote:
We also faced the same problem, but the number of jobs we can run 
before restarting the cluster depends on the volume of the data to 
shuffle around the network. We even had problems with a single job and 
in order to avoid OOM issues we had to put some configuration to limit 
Netty memory usage, i.e.:
 - Add to flink.yaml -> env.java.opts: 
-Dio.netty.recycler.maxCapacity.default=1

 - Edit taskmanager.sh and change TM_MAX_OFFHEAP_SIZE from 8388607T to 5g

At this purpose we wrote a small test to reproduce the problem and we 
opened an issue for that [1].

We still don't know if the problems are related however..

I hope that could be helpful,
Flavio

[1] https://issues.apache.org/jira/browse/FLINK-7845 



On Wed, Oct 18, 2017 at 10:48 AM, Javier Lopez 
mailto:javier.lo...@zalando.de>> wrote:


Hi Robert,

Sorry to reply this late. We did a lot of tests, trying to
identify if the problem was in our custom sources/sinks. We
figured out that none of our custom components is causing this
problem. We came up with a small test, and realized that the Flink
nodes run out of non-heap JVM memory and crash after deployment of
thousands of jobs.

When rapidly deploying thousands or hundreds of thousands of Flink
jobs - depending on job complexity in terms of resource
consumption - Flink nodes non-heap JVM memory consumption grows
until there is no more memory left on the machine and the Flink
process crashes. Both TaskManagers and JobManager exhibit the same
behavior. The TaskManagers die faster though. The memory
consumption doesn't decrease after stopping the deployment of new
jobs, with the cluster being idle (no running jobs).

We could replicate the behavior by the rapid deployment of the
WordCount Job provided in the Quickstart with a Python script.  We
started 24 instances of the deployment script to run in parallel.

The non-heap JVM memory consumption grows faster with more complex
jobs, i.e. reading from Kafka 10K events and printing to STDOUT( *
). Thus less deployed jobs are needed until the
TaskManagers/JobManager dies.

We employ Flink 1.3.2 in standalone mode on AWS EC2 t2.large nodes
with 4GB RAM inside Docker containers. For the test, we used 2
TaskManagers and 1 JobManager.

( * ) a slightly changed Python script was used, which waited
after deployment 15 seconds for the 10K events to be read from
Kafka, then it canceled the freshly deployed job via Flink REST API.

If you want we can provide the Scripts and Jobs we used for this
test. We have a workaround for this, which restarts the Flink
nodes once a memory threshold is reached. But this has lowered the
availability of our services.

Thanks for your help.

On 30 August 2017 at 10:39, Robert Metzger mailto:rmetz...@apache.org>> wrote:

I just saw that your other email is about the same issue.

Since you've done a heapdump already, did you see any pattern
in the allocated objects? Ideally none of the classes from
your user code should stick around when no job is running.
What's the size of the heap dump? I'm happy to take a look at
it if it's reasonably small.

On Wed, Aug 30, 2017 at 10:27 AM, Robert Metzger
mailto:rmetz...@apache.org>> wrote:

Hi Javier,

I'm not aware of such issues with Flink, but if you could
give us some more details on your setup, I might get some
more ideas on what to look for.

are you using the RocksDBStateBackend? (RocksDB is doing
some JNI allocations, that could potentially leak memory)
Also, are you passing any special garbage collector
options? (Maybe some classes are not unloaded)
Are you using anything else that is special (such as
protobuf or avro formats, or any other big library)?

Regards,
Robert



On Mon, Aug 28, 2017 at 5:04 PM, Javier Lopez
mailto:javier.lo...@zalando.de>>
wrote:

Hi all,

we are starting a lot of Flink jobs (streaming), and
after we have started 200 or more jobs we see that the
non-heap memory in the taskmanagers increases a lot,
to the point of killing the instances. We found out
that every time we start a new job, the committed
non-heap memory increases by 5 to 10MB. Is this an
expected behavior? Are the

Re: start-cluster.sh not working in HA mode

2017-10-18 Thread Fabian Hueske
Hi Hayden,

I tried to reproduce the problem you described and followed the HA setup
instructions of the documentation [1].
For me the instructions worked and start-cluster.sh started two JobManagers
on my local machine (master contained two localhost entries).

The bash scripts tend to be a bit fragile, especially when it comes to
handling spaces in variables and quotes.
What kind of environment are you running on (I'm on macOS) and do you try
to start the JMs on localhost or remote machines?

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/jobmanager_high_availability.html#configuration

2017-10-16 11:53 GMT+02:00 Marchant, Hayden :

> I am attempting to run Flink 1.3.2 in HA mode with zookeeper.
>
> When I run the start-cluster.sh, the job manager is not started, even
> though the task manager is started. When I delved into this, I saw that
> the  command:
>
> ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l
> \"${FLINK_BIN_DIR}/jobmanager.sh\" start cluster ${master} ${webuiport} &"
>
> is not actually running anything on the host. i.e. I do not see "Starting
> jobmanager daemon on host ."
>
> Only when I remove ALL quotes, do I see it working. i.e. if I run:
>
> ssh -n $FLINK_SSH_OPTS $master -- nohup /bin/bash -l
> ${FLINK_BIN_DIR}/jobmanager.sh start cluster ${master} ${webuiport} &
>
> I see that it manages to run the job manager - I see " Starting jobmanager
> daemon on host.".
>
> Did anyone else experience a similar problem? Any elegant workarounds
> without having to change source code?
>
> Thanks,
> Hayden Marchant
>
>


Re: Stumped writing to KafkaJSONSink

2017-10-18 Thread Kenny Gorman
Yep we hung out and got it working. I should have replied sooner! Thx for the 
reply.

-kg

> On Oct 18, 2017, at 7:06 AM, Fabian Hueske  wrote:
> 
> Hi Kenny,
> 
> this look almost correct. 
> The Table class has a method writeToSink(TableSink) that should address your 
> use case (so the same as yours but without the TableEnvironment argument).
> 
> Does that work for you?
> If not what kind of error and error message do you get?
> 
> Best, Fabian
> 
> 2017-10-18 1:28 GMT+02:00 Kenny Gorman :
>> I am hoping you guys can help me. I am stumped how to actually write to 
>> Kafka using Kafka09JsonTableSink using the Table API. Here is my code below, 
>> I am hoping you guys can shed some light on how this should be done. I don’t 
>> see any methods for the actual write to Kafka. I am probably doing something 
>> stupid. TIA.
>> 
>> Thanks!
>> Kenny
>> 
>> // run some SQL to filter results where a key is not null
>> String sql = "SELECT icao FROM flights WHERE icao is not null";
>> tableEnv.registerTableSource("flights", kafkaTableSource);
>> Table result = tableEnv.sql(sql);
>> 
>> // create a partition for the data going into kafka
>> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>> 
>> // create new tablesink of JSON to kafka
>> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>> params.getRequired("write-topic"),
>> params.getProperties(),
>> partition);
>> 
>> result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do 
>> this, but no such method..
> 


Re: Stumped writing to KafkaJSONSink

2017-10-18 Thread Fabian Hueske
No worries :-) Thanks for the notice.

2017-10-18 15:07 GMT+02:00 Kenny Gorman :

> Yep we hung out and got it working. I should have replied sooner! Thx for
> the reply.
>
> -kg
>
> On Oct 18, 2017, at 7:06 AM, Fabian Hueske  wrote:
>
> Hi Kenny,
>
> this look almost correct.
> The Table class has a method writeToSink(TableSink) that should address
> your use case (so the same as yours but without the TableEnvironment
> argument).
>
> Does that work for you?
> If not what kind of error and error message do you get?
>
> Best, Fabian
>
> 2017-10-18 1:28 GMT+02:00 Kenny Gorman :
>
>> I am hoping you guys can help me. I am stumped how to actually write to
>> Kafka using Kafka09JsonTableSink using the Table API. Here is my code
>> below, I am hoping you guys can shed some light on how this should be done.
>> I don’t see any methods for the actual write to Kafka. I am probably doing
>> something stupid. TIA.
>>
>> Thanks!
>> Kenny
>>
>> // run some SQL to filter results where a key is not null
>> String sql = "SELECT icao FROM flights WHERE icao is not null";
>> tableEnv.registerTableSource("flights", kafkaTableSource);
>> Table result = tableEnv.sql(sql);
>>
>> // create a partition for the data going into kafka
>> FlinkFixedPartitioner partition =  new FlinkFixedPartitioner();
>>
>> // create new tablesink of JSON to kafka
>> KafkaJsonTableSink kafkaTableSink = new Kafka09JsonTableSink(
>> params.getRequired("write-topic"),
>> params.getProperties(),
>> partition);
>>
>> result.writeToSink(tableEnv, kafkaTableSink);  // Logically, I want to do
>> this, but no such method..
>
>
>


RE: GROUP BY TUMBLE on ROW range

2017-10-18 Thread Stefano Bortoli
Great, thanks for the explanation. I noticed now indeed that the examples are 
for the table API. I believe over window is sufficient for the purpose right 
now, was just curious.

Best,
Stefano

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Tuesday, October 17, 2017 9:24 PM
To: Stefano Bortoli 
Cc: user@flink.apache.org
Subject: Re: GROUP BY TUMBLE on ROW range

Hi Stefano,
this is not supported in Flink's SQL and we would need new Group Window 
functions (like TUMBLE) for this.
A TUMBLE_COUNT function would be somewhat similar to SESSION, which also 
requires checks on the sorted neighboring rows to identify the window of a row.
Such a function would first need to be added to Calcite and then integrated 
with Flink.

A tumble count could also be expressed in plain SQL but wouldn't be very 
intuitive. You would have to
- define an over window (maybe partitioned on some key) sorted on time with a 
ROW_NUMBER function that assigns increasing numbers to rows.
- do a group by on the row number modulo the window size.
Btw. count windows are supported by the Table API.
Best, Fabian


2017-10-17 17:16 GMT+02:00 Stefano Bortoli 
mailto:stefano.bort...@huawei.com>>:
Hi all,
Is there a way to use a tumble window group by with row range in streamSQL?
I mean, something like this:
//  "SELECT COUNT(*) " +
// "FROM T1 " +
//"GROUP BY TUMBLE(rowtime, INTERVAL '2' ROWS PRECEDING )"

However, even looking at tests and looking at the “row interval expression 
generation” I could not find any examples in SQL. I know it is supported by the 
stream APIs, and countWindow is the chosen abstraction.

table
  .window(Tumble over 2.rows on 'long as 'w)
  .groupBy('w)
  .select('int.count)
  .toDataSet[Row]

I fear I am missing something simple. Thanks a lot for the support guys!

Best,
Stefano



Problems with taskmanagers in Mesos Cluster

2017-10-18 Thread Manuel Montesino
Hi,

We have deployed a Mesos cluster with Marathon, we deploy flink sessions 
through marathon with multiple taskmanagers configured. Some times in previous 
stages usually change configuration on marathon json about memory and other 
stuff, but when redeploy the flink session the jobmanagers stop and start with 
new configuration, but the taskmanagers not reuse the same was configured. So 
we have to kill/stop the dockers of each taskmanager task.

There is a way that kill or stop the taskmanagers when the session is 
redeployed?

Some environment configuration from marathon json file related to taskmanagers:

```
"flink_akka.ask.timeout": "1min",
"flink_akka.framesize": "102400k",
"flink_high-availability": "zookeeper",
"flink_high-availability.zookeeper.path.root": "/flink",
"flink_jobmanager.web.history": "200",
"flink_mesos.failover-timeout": "86400",
"flink_mesos.initial-tasks": "16",
"flink_mesos.maximum-failed-tasks": "-1",
"flink_mesos.resourcemanager.tasks.container.type": "docker",
"flink_mesos.resourcemanager.tasks.mem": "6144",
"flink_metrics.reporters": "jmx",
"flink_metrics.reporter.jmx.class": "org.apache.flink.metrics.jmx.JMXReporter",
"flink_state.backend": 
"org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory",
"flink_taskmanager.maxRegistrationDuration": "10 min",
"flink_taskmanager.network.numberOfBuffers": "8192",
"flink_jobmanager.heap.mb": "768",
"flink_taskmanager.debug.memory.startLogThread": "true",
"flink_mesos.resourcemanager.tasks.cpus": "1.3",
"flink_env.java.opts.taskmanager": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200 
-XX:ConcGCThreads=1 -XX:InitiatingHeapOccupancyPercent=35 
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 
-XX:MaxMetaspaceFreeRatio=80 -XX:+DisableExplicitGC -Djava.awt.headless=true 
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=10M",
"flink_containerized.heap-cutoff-ratio": "0.67"
```

Thanks in advance and kind regards,

Manuel Montesino
Devops Engineer

E manuel.montesino@piksel(dot)com

Marie Curie,1. Ground Floor. Campanillas, Malaga 29590
liberating viewing | piksel.com

[Piksel_Email.png]

This message is private and confidential. If you have received this message in 
error, please notify the sender or serviced...@piksel.com and remove it from 
your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road 
SE, Suite 400, Atlanta, GA 30339