Re: Flink QuickStart: On start up, running into ClassNotFoundException: org.apache.flink.streaming.runtime.tasks.OneInputStreamTask

2016-01-14 Thread Stephan Ewen
Hi!

I think this is a solid fix. Adding the classloader that loads Flink's
classes as the parent is a good.

Do you want to open a pull request with that?


Greetings,
Stephan

On Thu, Jan 14, 2016 at 2:26 AM, Prez Cannady 
wrote:

> Simply passing FlinkUserCodeClassLoader.class.getClassLoader to the
> parent constructor cleared the impasse.
>
> 2016-01-13 20:06:43.637  INFO 35403 --- [   main]
> o.o.e.j.s.SocketTextStreamWordCount$ : Started
> SocketTextStreamWordCount. in 5.176 seconds (JVM running for 12.58)
>
> [INFO]
> 
>
> [INFO] BUILD SUCCESS
>
> [INFO]
> 
>
> [INFO] Total time: 11.734 s
>
> [INFO] Finished at: 2016-01-13T20:06:43-05:00
>
> [INFO] Final Memory: 49M/4986M
>
> [INFO]
> 
>
> 2016-01-13 20:06:43.804  INFO 35403 --- [   Thread-3]
> s.c.a.AnnotationConfigApplicationContext : Closing
> org.springframework.context.annotation.AnnotationConfigApplicationContext@33248c18:
> startup date [Wed Jan 13 20:06:38 EST 2016]; root of context hierarchy
>
> 2016-01-13 20:06:43.806  INFO 35403 --- [   Thread-3]
> o.s.j.e.a.AnnotationMBeanExporter: Unregistering JMX-exposed beans
> on shutdown
>
>
> All tests in flink-runtime passed after the change
> `BlobLibraryCacheManager’, but I haven’t run the full test suite.
>
> Is this actually an appropriate fix, or just a way to highlight a
> configuration problem?
>
> I assume that injecting a parent class loader when registering a task
> might break things, but I don’t know nearly enough about Flink and this
> code to say one way or another.
>
>
> Prez Cannady
> p: 617 500 3378
> e: revp...@opencorrelate.org
> GH: https://github.com/opencorrelate
> LI: https://www.linkedin.com/in/revprez
>
>
>
>
>
>
>
>
>
> On Jan 13, 2016, at 6:50 PM, Stephan Ewen  wrote:
>
> Hi!
>
> Running this is Spring, the whole classloader configuration is probably a
> bit different than in Flink's standalone or YARN or local mode.
>
> Can you try if the following solves your problem:
>
> At the end of the file "BlobLibraryCacheManager", there is the private
> class "FlinkUserCodeClassloader".
>
> Can you replace the current FlinkUserCodeClassloader with this?
>
>
> private static class FlinkUserCodeClassLoader extends URLClassLoader {
>
> public FlinkUserCodeClassLoader(URL[] urls) {
> super(urls, FlinkUserCodeClassLoader.class.getClassLoader());
> }
> }
>
> You can also try and use instead of "
> FlinkUserCodeClassLoader.class.getClassLoader()" the statements "
> Thread.currentThread().getContextClassLoader()".
>
> Let me know if one of the two solves the problem.
>
> Greetings,
> Stephan
>
>
> On Wed, Jan 13, 2016 at 7:20 PM, Prez Cannady 
> wrote:
>
>> I’m experimenting combining Spring with Flink.  I’ve successfully
>> instrumented for Gradle, but Maven is emitting ClassNotFoundExceptions for
>> items ostensibly on the class path.
>>
>> Project is currently configured for:
>>
>> 1. Scala 2.10.4
>> 2. Flink 0.9.1
>>
>> I execute the following
>>
>> ```
>> # In one terminal
>> $ nc -lk -p  --sh-exec "cat /usr/share/dict/words | head -n 10”
>>
>>
>> # In another terminal
>> $ mvn clean install spring-boot:run -Drun.arguments=“localhost,”
>>
>> # observe output
>> ```
>>
>> The specific class not found is
>> *org.apache.flink.streaming.runtime.tasks.OneInputStreamTask*.  However,
>> Spring Boot Plugin is configured to repackage a fat jar, and I can see that
>> the class is present in the included flink-streaming-core jar.
>> Additionally, LogBack shows that the flink-streaming-core jar is in my
>> classpath.
>>
>>
>> I’m hoping I’m just missing something that should be obvious.  While I
>> wish could move forward with just Gradle, unfortunately I have to support
>> Maven builds.
>>
>> For reference, the complete project is available here:
>>
>> https://github.com/OCExercise/wordcount-processing
>>
>> Additionally
>>
>> 1. pom.xml (
>> https://github.com/OCExercise/wordcount-processing/blob/master/pom.xml)
>> 2. build.grade (
>> https://github.com/OCExercise/wordcount-processing/blob/master/build.gradle
>> )
>> 3. Gist containing the full exception (
>> https://gist.github.com/revprez/2c1fb01c40e5d6790247)
>>
>> Prez Cannady
>> p: 617 500 3378
>> e: revp...@opencorrelate.org
>> GH: https://github.com/opencorrelate
>> LI: https://www.linkedin.com/in/revprez
>>
>>
>
>


Re: Flink DataStream and KeyBy

2016-01-14 Thread Aljoscha Krettek
Hi,
using .keyBy(0) on a Scala DataStream[Tuple2] where Tuple2 is a Scala Tuple 
should work. Look, for example, at the SocketTextStreamWordCount example in 
Flink.

Cheers,
Aljoscha
> On 13 Jan 2016, at 18:25, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Saiph,
> 
> In Flink, the key for keyBy() can be provided in different ways:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#specifying-keys
> (the doc is for DataSet API, but specifying keys is basically the same for
> DataStream and DataSet).
> 
> As described in the documentation, calls like keyBy(0) are meant for Tuples,
> so it only works for DataStream[Tuple]. Other key definition types like
> keyBy(new KeySelector() {...}) can basically take any DataStream of
> arbitrary data type. Flink finds out whether or not there is a conflict
> between the type of the data in the DataStream and the way the key is
> defined at runtime.
> 
> Hope this helps!
> 
> Cheers,
> Gordon
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-DataStream-and-KeyBy-tp4271p4272.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: UpdateTaskExecutionState during JobManager failover

2016-01-14 Thread Stephan Ewen
Hi!

That is a super interesting idea. If I understand you correctly, you are
suggesting to try and reconcile the TaskManagers and the JobManager before
restarting the job. That would mean that in case of a master failure, the
jobs may simply continue to run. That would be a nice enhancements, but I
think it is slightly more complicated, for two reasons:

1)

An assumption that we currently make is that the JobManager view and
TaskManager view of the task status get out of sync (a JobManager failure
is a reason that can happen), then the task is restarted. That is a quite
robust solution, but may lead to restarts in cases that may be recovered
otherwise as well. For example only certain state transitions are valid
(like RUNNING to FINISHED). If the JobManager gets an update that the state
is FINISHED when the JobManager thought it was CANCELED or CREATED, it will
reject this under the assumption that something went wrong in the
distributed coordination.

If we want to keep this safety checks, it would probably need something
like the JobManager asking TaskManagers that connect for their current
status and resetting the Job status to that.

2)

To make sure that JobManagers and TaskManagers do not confuse messages from
different sessions (a session being a JobManager having leader role), we
filter the critical messages by a "leaderSessionId", which is again very
robust. This would also need a careful change.


Hope that helps in understanding the current rational. If you want to work
on improving this, it would be great. We should probably talk more about
the detailed changes needed.


Greetings,
Stephan





On Thu, Jan 14, 2016 at 3:49 AM, wangzhijiang999  wrote:

> Hi,
>
> As i know, when TaskManager send UpdateTaskExecutionState to
> JobManager, if the JobManager failover and the future response is fail, the
> task will be failed. Is it feasible to retry send UpdateTaskExecutionState
> again when future response fail until success. In JobManager HA mode,
> the UpdateTaskExecutionState should be success when the leader JobManager
> active. Or are there any suggestions for sending messages during JobManager
> failover instead of fail task.
>
> Thanks for any help in advance!
>
>
> Zhijiang Wang
>


Redeployements and state

2016-01-14 Thread Niels Basjes
Hi,

I'm working on a streaming application using Flink.
Several steps in the processing are state-full (I use custom Windows and
state-full operators ).

Now if during a normal run an worker fails the checkpointing system will be
used to recover.

But what if the entire application is stopped (deliberately) or stops/fails
because of a problem?

At this moment I have three main reasons/causes for doing this:
1) The application just dies because of a bug on my side or a problem like
for example this (which I'm actually confronted with):  *Failed to Update
HDFS Delegation Token for long running application in HA mode *
https://issues.apache.org/jira/browse/HDFS-9276
2) I need to rebalance my application (i.e. stop, change parallelism, start)
3) I need a new version of my software to be deployed. (i.e. I fixed a bug,
changed the topology and need to continue)

I assume the solution will be in some part be specific for my application.
The question is what features exist in Flink to support such a clean
"continue where I left of" scenario?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Redeployements and state

2016-01-14 Thread Gábor Gévay
Hello,

You are probably looking for this feature:
https://issues.apache.org/jira/browse/FLINK-2976

Best,
Gábor




2016-01-14 11:05 GMT+01:00 Niels Basjes :
> Hi,
>
> I'm working on a streaming application using Flink.
> Several steps in the processing are state-full (I use custom Windows and
> state-full operators ).
>
> Now if during a normal run an worker fails the checkpointing system will be
> used to recover.
>
> But what if the entire application is stopped (deliberately) or stops/fails
> because of a problem?
>
> At this moment I have three main reasons/causes for doing this:
> 1) The application just dies because of a bug on my side or a problem like
> for example this (which I'm actually confronted with):  Failed to Update
> HDFS Delegation Token for long running application in HA mode
> https://issues.apache.org/jira/browse/HDFS-9276
> 2) I need to rebalance my application (i.e. stop, change parallelism, start)
> 3) I need a new version of my software to be deployed. (i.e. I fixed a bug,
> changed the topology and need to continue)
>
> I assume the solution will be in some part be specific for my application.
> The question is what features exist in Flink to support such a clean
> "continue where I left of" scenario?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes


Re: Working with storm compatibility layer

2016-01-14 Thread Matthias J. Sax
Hi,

I can submit the topology without any problems. Your code is fine.

If your program "exits silently" I would actually assume, that you
submitted the topology successfully. Can you see the topology in
JobManager WebFrontend? If not, do you see any errors in the log files?

-Matthias

On 01/14/2016 07:37 AM, Shinhyung Yang wrote:
> Dear Matthias,
> 
> Thank you for the reply! I am so sorry to respond late on the matter.
> 
>> I just double checked the Flink code and during translation from Storm
>> to Flink declareOuputFields() is called twice. You are right that is
>> does the same job twice, but that is actually not a problem. The Flink
>> code is cleaner this way to I guess we will not change it.
> 
> Thank you for checking. I don't think it contributed any part of my
> current problem anyways. For my case though, it is called 3 times if
> the number is important at all.
> 
>> About lifecyle:
>> If you submit your code, during deployment, Spout.open() and
>> Bolt.prepare() should be called for each parallel instance on each
>> Spout/Bolt of your topology.
>>
>> About your submission (I guess this should solve your current problem):
>> If you use bin/start-local.sh, you should *not* use FlinkLocalCluster,
>> but FlinkSubmitter. You have to distinguish three cases:
>>
>>   - local/debug/IDE mode: use FlinkLocalCluster
>> => you do not need to start any Flink cluster before --
>> FlinkLocalCluster is started up in you current JVM
>> * the purpose is local debugging in an IDE (this allows to easily
>> set break points and debug code)
>>
>>   - pseudo-distributed mode: use FlinkSubmitter
>> => you start up a local Flink cluster via bin/start-local.sh
>> * this local Flink cluster run in an own JVM and looks like a real
>> cluster to the Flink client, ie, "bin/flink run"
>> * thus, you just use FlinkSubmitter as for a real cluster (with
>> JobManager/Nimbus hostname "localhost")
>> * in contrast to FlinkLocalCluster, no "internal Flink Cluster" is
>> started in your current JVM, but your code is shipped to the local
>> cluster you started up beforehand via bin/start-local.sh and executed in
>> this JVM
>>
>>   - distributed mode: use FlinkSubmitter
>> => you start up Flink in a real cluster using bin/start-cluster.sh
>> * you use "bin/flink run" to submit your code to the real cluster
> 
> Thank you for the explanation, now I have clearer understanding of
> clusters and submitters. However my problem is not fixed yet. Here's
> my code:
> 
> 
> // ./src/main/java/myexample/App.java
> 
> 
> package myexample;
> 
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import myexample.spout.StandaloneSpout;
> import backtype.storm.generated.StormTopology;
> import backtype.storm.topology.IRichSpout;
> import backtype.storm.topology.TopologyBuilder;
> import backtype.storm.topology.base.BaseBasicBolt;
> 
> import myexample.bolt.Node;
> import myexample.bolt.StandardBolt;
> 
> import java.util.Arrays;
> import java.util.List;
> 
> //
> import org.apache.flink.storm.api.FlinkTopology;
> //import org.apache.flink.storm.api.FlinkLocalCluster;
> import org.apache.flink.storm.api.FlinkSubmitter;
> //import org.apache.flink.storm.api.FlinkClient;
> import org.apache.flink.storm.api.FlinkTopologyBuilder;
> 
> public class App
> {
> public static void main( String[] args ) throws Exception
> {
> int layer = 0;
> StandaloneSpout spout = new StandaloneSpout();
> Config conf = new Config();
> conf.put(Config.TOPOLOGY_DEBUG, false);
> //FlinkLocalCluster cluster = new FlinkLocalCluster();
> //FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
> //LocalCluster cluster = new LocalCluster();
> 
> layer = Integer.parseInt(args[0]);
> //cluster.submitTopology("topology", conf,
> BinaryTopology(spout, layer));
> FlinkSubmitter.submitTopology("topology", conf,
> BinaryTopology(spout, layer));
> //Thread.sleep(5 * 1000);
> //FlinkClient.getConfiguredClient(conf).killTopology("topology");
> //cluster.killTopology("topology");
> //cluster.shutdown();
> }
> 
> public static FlinkTopology BinaryTopology(IRichSpout input, int n) {
> //public static StormTopology BinaryTopology(IRichSpout input, int n) {
> return BinaryTopology(input, n,
> Arrays.asList((BaseBasicBolt)new StandardBolt()));
> }
> 
> public static FlinkTopology BinaryTopology(IRichSpout input, int
> n, List boltList) {
> //public static StormTopology BinaryTopology(IRichSpout input, int
> n, List boltList) {
> FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> //TopologyBuilder builder = new TopologyBuilder();
> String sourceId = "src";
> builder.setSpout(sourceId

Re: Questions re: ExecutionGraph & ResultPartitions for interactive use a la Spark

2016-01-14 Thread Ufuk Celebi
Hey Kovas

sorry for the long delay.

> On 10 Jan 2016, at 06:20, kovas boguta  wrote:
> 1) How can I prevent ResultPartitions from being released?
> 
> In interactive use, RPs should not necessarily be released when there are no 
> pending tasks to consume them. 

Max Michels did some work along those lines in a by now very out dated pull 
request: https://github.com/apache/flink/pull/640

> Looking at the code, its hard to understand the high-level logic of who 
> triggers their release, how the refcounting works, etc. For instance, is 
> releasePartitionsProducedBy called by the producer, or the consumer, or both? 
> Is the refcount initialized at the beginning of the task setup, and 
> decremented every time its read out? 

I fully agree that the respective part of the system is very much under 
documented. Sorry about that.

- ResultPartitionManager: the result partition manager keeps track of all 
partitions of a task manager. Each task registers the produced partitions when 
it is instantiated (see Task constructor and NetworkEnvironment#registerTask). 
This is the final truth about which partitions are available etc.

- releasePartitionsProducedBy: This is not part of the regular life cycle of 
the results, but only triggered by the job manager to get rid of old results in 
case of cancellation/failure.

- Ref counts: The release of the results during normal operation happens via 
the ref counts. Currently, the ref counts are always initialised to the number 
of sub partitions (A result partition consists of 1 or more sub partitions for 
each parallel receiver of the result). Decrementing happens when a sub 
partition has been fully consumed (via ResultPartition#onConsumedSubpartition). 
And as soon as the ref count reaches 0, they are released. The final release 
happens in the ResultPartitionManager.

The behaviour that would need to change is the last step in the result 
partition manager imo. I think #640 has some modifications in that respect, 
which might be helpful in figuring out the details. I think what can happen 
instead of the final release is that a result becomes “cached” and stay around.

> Ideally, I could force certain ResultPartitions to only be manually 
> releasable, so I can consume them over and over.  

How would you like to have this controlled by the user? Offer a API operation 
like `.pin()`? Do you need it pinned permanently until you release it or would 
it be ok to just cache it and maybe recompute if another task needs more 
memory/disk space?

> 2) Can I call attachJobGraph on the ExecutionGraph after the job has begun 
> executing to add more nodes?
> 
> I read that Flink does not support changing the running the topology. But 
> what about extending the topology to add more nodes?

I think that should be possible. The problem at the moment is that this will 
recompute everything from the sources. Your suggestion makes sense and actually 
this was one of the motivations for #640. The newly attached nodes would back 
track to the produced results of the initial topology and go on from there.

> If the IntermediateResultPartition are just sitting around from previously 
> completely tasks, this seems straightforward in principle.. would one have to 
> fiddle with the scheduler/event system to kick things off again?

I think it will be possible to just submit the new parts. But I’m not sure 
about the details. 

> 3) Can I have a ConnectionManager without a TaskManager?
> 
> For interactive use, I want to selectively pull data from ResultPartitions 
> into my local REPL process. But I don't want my local process to be a 
> TaskManager node, and have tasks assigned to it. 
> 
> So this question boils down to, how to hook a ConnectionManager into the 
> Flink communication/actor system?

In theory it should be possible, yes. In practice I see problems with setting 
up all the parameters.

You have to instantiate a NettyConnectionManager. For the start method, only 
the NetworkBufferPool is relevant, which is easy to instantiate as well. This 
will take part of the network transfers.

To get the data you need a SingleInputGate and set it up with RemoteChannel 
instances for each consumed subpartition. This is where you need to know all 
the partition IDs and task managers (The connection ID is a wrapper around 
InetSocketAddress with a connection index for connection sharing).

When you have the gate setup, you can query it with the RecordReader. The input 
gate itself just returns byte buffers.

Do you have an idea about how you want to figure out the partition and 
connection IDs in your repl process? If yes, I could provide some more concrete 
code snippets on the setup.

---

If would like to contribute to Flink, we can also think about splitting this up 
into some smaller tasks and address them. Your ideas are definitely in line 
with what we wanted to have in Flink anyways. The recent focus on the streaming 
part of the system has pulled most contributors away from 

Re: Working with storm compatibility layer

2016-01-14 Thread Shinhyung Yang
Dear Matthias,

Thank you for a quick reply. It failed again, however I was able to
access to its WebFrontend and it gave me some logs. I wanted to show
logs immediately before digging down into it.

19:48:18,011 INFO  org.apache.flink.runtime.jobmanager.JobManager
  - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb
(topology).
19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
  - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb
(topology).
19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
  - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
changed to RUNNING.
19:48:18,014 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
CREATED to SCHEDULED
19:48:18,014 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
SCHEDULED to DEPLOYING
19:48:18,015 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Deploying Source: src (1/1) (attempt #0) to localhost
19:48:18,015 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
(1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to
SCHEDULED
19:48:18,015 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
(1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to
DEPLOYING
19:48:18,015 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Deploying bolt (1/8) (attempt #0) to localhost
19:48:18,015 INFO  org.apache.flink.runtime.taskmanager.TaskManager
  - Received task Source: src (1/1)
19:48:18,015 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
(2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to
SCHEDULED
19:48:18,016 INFO  org.apache.flink.runtime.taskmanager.Task
  - Loading JAR files for task Source: src (1/1)
19:48:18,017 INFO  org.apache.flink.runtime.taskmanager.TaskManager
  - Received task bolt (1/8)
19:48:18,017 INFO  org.apache.flink.runtime.blob.BlobCache
  - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from
localhost/127.0.0.1:36498
19:48:18,017 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
DEPLOYING to CANCELING
19:48:18,018 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
(1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to
CANCELING
19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
  - Attempting to cancel task Source: src (1/1)
19:48:18,018 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
(2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to
CANCELED
19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
  - Source: src (1/1) switched to CANCELING
19:48:18,018 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
(3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to
CANCELED
19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
  - Attempting to cancel task bolt (1/8)
19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
  - bolt (1/8) switched to CANCELING
19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
  - Loading JAR files for task bolt (1/8)
19:48:18,018 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
(4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to
CANCELED
19:48:18,018 INFO  org.apache.flink.runtime.jobmanager.JobManager
  - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
changed to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the
operator parallelism or increase the number of slots per TaskManager
in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
(unassigned) - [SCHEDULED] > with groupID <
52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
[52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>. Resources available to scheduler: Number of instances=1, total
number of slots=1, available slots=0
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleImmediately(Scheduler.java:131)
at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:298)
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:458)
at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.scheduleAll(ExecutionJobVertex.java:322)
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.scheduleForExecution(ExecutionGraph.java:686)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apac

Re: Redeployements and state

2016-01-14 Thread Ufuk Celebi
Hey Niels,

as Gabor wrote, this feature has been merged to the master branch recently.

The docs are online here: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/savepoints.html

Feel free to report back your experience with it if you give it a try.

– Ufuk

> On 14 Jan 2016, at 11:09, Gábor Gévay  wrote:
> 
> Hello,
> 
> You are probably looking for this feature:
> https://issues.apache.org/jira/browse/FLINK-2976
> 
> Best,
> Gábor
> 
> 
> 
> 
> 2016-01-14 11:05 GMT+01:00 Niels Basjes :
>> Hi,
>> 
>> I'm working on a streaming application using Flink.
>> Several steps in the processing are state-full (I use custom Windows and
>> state-full operators ).
>> 
>> Now if during a normal run an worker fails the checkpointing system will be
>> used to recover.
>> 
>> But what if the entire application is stopped (deliberately) or stops/fails
>> because of a problem?
>> 
>> At this moment I have three main reasons/causes for doing this:
>> 1) The application just dies because of a bug on my side or a problem like
>> for example this (which I'm actually confronted with):  Failed to Update
>> HDFS Delegation Token for long running application in HA mode
>> https://issues.apache.org/jira/browse/HDFS-9276
>> 2) I need to rebalance my application (i.e. stop, change parallelism, start)
>> 3) I need a new version of my software to be deployed. (i.e. I fixed a bug,
>> changed the topology and need to continue)
>> 
>> I assume the solution will be in some part be specific for my application.
>> The question is what features exist in Flink to support such a clean
>> "continue where I left of" scenario?
>> 
>> --
>> Best regards / Met vriendelijke groeten,
>> 
>> Niels Basjes



Re: Working with storm compatibility layer

2016-01-14 Thread Shinhyung Yang
Dear Matthias,

Thank you very much again. I changed the value of
taskmanager.numberOfTaskSlots from 1 to 128 in
flink-0.10.1/conf/flink-conf.yaml, stopped the local cluster and
started local cluster again. And it works fine and well. (It is still
running and I can check it clear on the webfrontend) Although I'm not
sure whether it would be safe to keep the value like this or not.

Thank you.
Best regards,
Shinhyung

On Thu, Jan 14, 2016 at 7:53 PM, Shinhyung Yang
 wrote:
> Dear Matthias,
>
> Thank you for a quick reply. It failed again, however I was able to
> access to its WebFrontend and it gave me some logs. I wanted to show
> logs immediately before digging down into it.
>
> 19:48:18,011 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb
> (topology).
> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb
> (topology).
> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to RUNNING.
> 19:48:18,014 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> CREATED to SCHEDULED
> 19:48:18,014 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> SCHEDULED to DEPLOYING
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Deploying Source: src (1/1) (attempt #0) to localhost
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to
> SCHEDULED
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to
> DEPLOYING
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Deploying bolt (1/8) (attempt #0) to localhost
> 19:48:18,015 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>   - Received task Source: src (1/1)
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to
> SCHEDULED
> 19:48:18,016 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Loading JAR files for task Source: src (1/1)
> 19:48:18,017 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>   - Received task bolt (1/8)
> 19:48:18,017 INFO  org.apache.flink.runtime.blob.BlobCache
>   - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from
> localhost/127.0.0.1:36498
> 19:48:18,017 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> DEPLOYING to CANCELING
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to
> CANCELING
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Attempting to cancel task Source: src (1/1)
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Source: src (1/1) switched to CANCELING
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Attempting to cancel task bolt (1/8)
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>   - bolt (1/8) switched to CANCELING
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Loading JAR files for task bolt (1/8)
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager
> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
> (unassigned) - [SCHEDULED] > with groupID <
> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>>. Resources available to scheduler: Number of instances=1, total
> number of slots=1, available slots=0
> at 
> org.ap

Flink Execution Plan

2016-01-14 Thread lofifnc
Hi,

I'm trying to figure out what graph the execution plan represents when you
call env.getExecutionPlan on the StreamExecutionEnvironment. From my
understanding the StreamGraph is what you call an APIGraph, which will be
used to create the JobGraph.
So is the ExecutionPlan is a full representation of the StreamGraph?
And Is there a way to get a human-interpretable representation of the
JobGraph? :)

Best,
Alex





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Execution-Plan-tp4290.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Working with storm compatibility layer

2016-01-14 Thread Matthias J. Sax
Hi,

the logs shows:

> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Resources available to scheduler: Number of instances=1, total number of 
> slots=1, available slots=0

You need to increase your task slots in conf/flink-conf.yaml. Look for
parameter "taskmanager.numberOfTaskSlots".


-Matthias


On 01/14/2016 11:53 AM, Shinhyung Yang wrote:
> Dear Matthias,
> 
> Thank you for a quick reply. It failed again, however I was able to
> access to its WebFrontend and it gave me some logs. I wanted to show
> logs immediately before digging down into it.
> 
> 19:48:18,011 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb
> (topology).
> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb
> (topology).
> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to RUNNING.
> 19:48:18,014 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> CREATED to SCHEDULED
> 19:48:18,014 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> SCHEDULED to DEPLOYING
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Deploying Source: src (1/1) (attempt #0) to localhost
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to
> SCHEDULED
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to
> DEPLOYING
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Deploying bolt (1/8) (attempt #0) to localhost
> 19:48:18,015 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>   - Received task Source: src (1/1)
> 19:48:18,015 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to
> SCHEDULED
> 19:48:18,016 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Loading JAR files for task Source: src (1/1)
> 19:48:18,017 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>   - Received task bolt (1/8)
> 19:48:18,017 INFO  org.apache.flink.runtime.blob.BlobCache
>   - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from
> localhost/127.0.0.1:36498
> 19:48:18,017 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
> DEPLOYING to CANCELING
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to
> CANCELING
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Attempting to cancel task Source: src (1/1)
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Source: src (1/1) switched to CANCELING
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Attempting to cancel task bolt (1/8)
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>   - bolt (1/8) switched to CANCELING
> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>   - Loading JAR files for task bolt (1/8)
> 19:48:18,018 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
> (4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to
> CANCELED
> 19:48:18,018 INFO  org.apache.flink.runtime.jobmanager.JobManager
>   - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
> changed to FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager
> in the configuration. Task to schedule: < Attempt #0 (bolt (2/8)) @
> (unassigned) - [SCHEDULED] > with groupID <
> 52431fe365bb7d0f12dd8d20e2528427 > in sharing group < SlotSharingGroup
> [52431fe365bb7d0f12dd8d20e2528427, be3fd4192759bf8d5b39e028f005d7f4]
>> . Resources available to scheduler: Number of instances=1, total
> number of slots=1, available slots=0
> at 
> org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:256)
> at 
> org

Re: Working with storm compatibility layer

2016-01-14 Thread Matthias J. Sax
Just saw your email after my answer...

Have a look here about task slots.
https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots

Also have a look here (starting from 18:30):
https://www.youtube.com/watch?v=UEkjRN8jRx4

-Matthias


On 01/14/2016 12:05 PM, Shinhyung Yang wrote:
> Dear Matthias,
> 
> Thank you very much again. I changed the value of
> taskmanager.numberOfTaskSlots from 1 to 128 in
> flink-0.10.1/conf/flink-conf.yaml, stopped the local cluster and
> started local cluster again. And it works fine and well. (It is still
> running and I can check it clear on the webfrontend) Although I'm not
> sure whether it would be safe to keep the value like this or not.
> 
> Thank you.
> Best regards,
> Shinhyung
> 
> On Thu, Jan 14, 2016 at 7:53 PM, Shinhyung Yang
>  wrote:
>> Dear Matthias,
>>
>> Thank you for a quick reply. It failed again, however I was able to
>> access to its WebFrontend and it gave me some logs. I wanted to show
>> logs immediately before digging down into it.
>>
>> 19:48:18,011 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>   - Submitting job 6f52281fb987a3def8d3c01e1fc0bdcb
>> (topology).
>> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>   - Scheduling job 6f52281fb987a3def8d3c01e1fc0bdcb
>> (topology).
>> 19:48:18,014 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>   - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
>> changed to RUNNING.
>> 19:48:18,014 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> CREATED to SCHEDULED
>> 19:48:18,014 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> SCHEDULED to DEPLOYING
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>> Deploying Source: src (1/1) (attempt #0) to localhost
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from CREATED to
>> SCHEDULED
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from SCHEDULED to
>> DEPLOYING
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>> Deploying bolt (1/8) (attempt #0) to localhost
>> 19:48:18,015 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>   - Received task Source: src (1/1)
>> 19:48:18,015 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
>> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from CREATED to
>> SCHEDULED
>> 19:48:18,016 INFO  org.apache.flink.runtime.taskmanager.Task
>>   - Loading JAR files for task Source: src (1/1)
>> 19:48:18,017 INFO  org.apache.flink.runtime.taskmanager.TaskManager
>>   - Received task bolt (1/8)
>> 19:48:18,017 INFO  org.apache.flink.runtime.blob.BlobCache
>>   - Downloading fac8ddfb4668c9f76559766001b7c9fd07cbd0bc from
>> localhost/127.0.0.1:36498
>> 19:48:18,017 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>> Source: src (1/1) (3535644576ae695d2685a65401e16fc4) switched from
>> DEPLOYING to CANCELING
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
>> (1/8) (391ac2875a2fdc86d8af4f2d51e3e849) switched from DEPLOYING to
>> CANCELING
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>   - Attempting to cancel task Source: src (1/1)
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
>> (2/8) (8c7c675e42deeef5e9ad17581bafe457) switched from SCHEDULED to
>> CANCELED
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>   - Source: src (1/1) switched to CANCELING
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
>> (3/8) (0fed2e4059a37fe25b8b15480547a550) switched from CREATED to
>> CANCELED
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>   - Attempting to cancel task bolt (1/8)
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>   - bolt (1/8) switched to CANCELING
>> 19:48:18,018 INFO  org.apache.flink.runtime.taskmanager.Task
>>   - Loading JAR files for task bolt (1/8)
>> 19:48:18,018 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph- bolt
>> (4/8) (647c524f0a73b936656d6c6a67ecfbc9) switched from CREATED to
>> CANCELED
>> 19:48:18,018 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>   - Status of job 6f52281fb987a3def8d3c01e1fc0bdcb (topology)
>> changed to FAILING.
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Not enough free slots available to run the job. You can decre

Re: Redeployements and state

2016-01-14 Thread Niels Basjes
Yes, that is exactly the type of solution I was looking for.

I'll dive into this.
Thanks guys!

Niels

On Thu, Jan 14, 2016 at 11:55 AM, Ufuk Celebi  wrote:

> Hey Niels,
>
> as Gabor wrote, this feature has been merged to the master branch recently.
>
> The docs are online here:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/savepoints.html
>
> Feel free to report back your experience with it if you give it a try.
>
> – Ufuk
>
> > On 14 Jan 2016, at 11:09, Gábor Gévay  wrote:
> >
> > Hello,
> >
> > You are probably looking for this feature:
> > https://issues.apache.org/jira/browse/FLINK-2976
> >
> > Best,
> > Gábor
> >
> >
> >
> >
> > 2016-01-14 11:05 GMT+01:00 Niels Basjes :
> >> Hi,
> >>
> >> I'm working on a streaming application using Flink.
> >> Several steps in the processing are state-full (I use custom Windows and
> >> state-full operators ).
> >>
> >> Now if during a normal run an worker fails the checkpointing system
> will be
> >> used to recover.
> >>
> >> But what if the entire application is stopped (deliberately) or
> stops/fails
> >> because of a problem?
> >>
> >> At this moment I have three main reasons/causes for doing this:
> >> 1) The application just dies because of a bug on my side or a problem
> like
> >> for example this (which I'm actually confronted with):  Failed to Update
> >> HDFS Delegation Token for long running application in HA mode
> >> https://issues.apache.org/jira/browse/HDFS-9276
> >> 2) I need to rebalance my application (i.e. stop, change parallelism,
> start)
> >> 3) I need a new version of my software to be deployed. (i.e. I fixed a
> bug,
> >> changed the topology and need to continue)
> >>
> >> I assume the solution will be in some part be specific for my
> application.
> >> The question is what features exist in Flink to support such a clean
> >> "continue where I left of" scenario?
> >>
> >> --
> >> Best regards / Met vriendelijke groeten,
> >>
> >> Niels Basjes
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: Exception using flink-connector-elasticsearch

2016-01-14 Thread Lopez, Javier
Hi,

Thanks Aljoscha, the libraries solved the problem. It worked perfectly!.

On 12 January 2016 at 14:03, Aljoscha Krettek 
wrote:

> Hi,
> could you please try adding the lucene-core-4.10.4.jar file to your lib
> folder of Flink. (
> https://repo1.maven.org/maven2/org/apache/lucene/lucene-core/4.10.4/)
> Elasticsearch uses dependency injection to resolve the classes and maven is
> not really aware of this.
>
> Also you could try adding lucent-codecs-4.10.4.jar as well (
> https://repo1.maven.org/maven2/org/apache/lucene/lucene-codecs/4.10.4/).
>
> Cheers,
> Aljoscha
> > On 12 Jan 2016, at 11:55, Lopez, Javier  wrote:
> >
> > Hi,
> >
> > We are using the sink for ElasticSearch and when we try to run our job
> we get the following exception:
> >
> > java.lang.ExceptionInInitializerError Caused by:
> java.lang.IllegalArgumentException: An SPI class of type
> org.apache.lucene.codecs.Codec with name 'Lucene410' does not exist.  You
> need to add the corresponding JAR file supporting this SPI to your
> classpath.  The current classpath supports the following names: []
> >
> > We are using embedded nodes and we don't know if we are missing some
> configuration for the elasticsearch client. This is the code we are using:
> >
> > Map config = Maps.newHashMap();
> >
> >   config.put("bulk.flush.max.actions", "1");
> >
> >   config.put("cluster.name", "flink-test");
> >
> >
> >
> >   result.addSink(new ElasticsearchSink<>(config, new
> IndexRequestBuilder>() {
> >   @Override
> >   public org.elasticsearch.action.index.IndexRequest
> createIndexRequest(Tuple4 element,
> RuntimeContext ctx) {
> >   Map json = new HashMap<>();
> >   json.put("data", element);
> >   return org.elasticsearch.client.Requests.indexRequest()
> >   .index("stream_test_1")
> >   .type("aggregation_test")
> >   .source(json);
> >   }
> >   }));
> >
> > The flink server as well as the elasticsearch server are in the same
> local machine.
> >
> > Thanks for your help
>
>


Re: Flink Execution Plan

2016-01-14 Thread Márton Balassi
Hey Alex,

Flink has 3 abstractions having a Graph suffix in place currently for
streaming jobs:

  * StreamGraph: Used for representing the logical plan of a streaming job
that is under construction in the API. This one is the only streaming
specific in this list.
  * JobGraph: Used for representing the logical plan of a streaming job
that is finished construction.
  * ExecutionGraph: The physical plan of the JobGraph, contains
parallelism, estimated input sizes etc.

env.getExecutionPlan gives you a JSON String representation of the
ExecutionGraph, which should contain must of the info you need. To
visualize that go to your flink binary distribution and open up
tools/planVisualizer.html in a browser, paste the JSON there and hit the
button. :)

You might find it useful that the new Flink Dashboard also comes with this
feature integrated, so you can visualize jobs that have been submitted to
the cluster.

Hope that helps,

Marton

On Thu, Jan 14, 2016 at 11:56 AM, lofifnc  wrote:

> Hi,
>
> I'm trying to figure out what graph the execution plan represents when you
> call env.getExecutionPlan on the StreamExecutionEnvironment. From my
> understanding the StreamGraph is what you call an APIGraph, which will be
> used to create the JobGraph.
> So is the ExecutionPlan is a full representation of the StreamGraph?
> And Is there a way to get a human-interpretable representation of the
> JobGraph? :)
>
> Best,
> Alex
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Execution-Plan-tp4290.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink Execution Plan

2016-01-14 Thread Christian Kreutzfeldt
Hi

Is there a way to map a JSON representation back to an executable flink
job? If there is no such API, what is the best starting point to implement
such a feature?

Best
  Christian

2016-01-14 15:18 GMT+01:00 Márton Balassi :

> Hey Alex,
>
> Flink has 3 abstractions having a Graph suffix in place currently for
> streaming jobs:
>
>   * StreamGraph: Used for representing the logical plan of a streaming job
> that is under construction in the API. This one is the only streaming
> specific in this list.
>   * JobGraph: Used for representing the logical plan of a streaming job
> that is finished construction.
>   * ExecutionGraph: The physical plan of the JobGraph, contains
> parallelism, estimated input sizes etc.
>
> env.getExecutionPlan gives you a JSON String representation of the
> ExecutionGraph, which should contain must of the info you need. To
> visualize that go to your flink binary distribution and open up
> tools/planVisualizer.html in a browser, paste the JSON there and hit the
> button. :)
>
> You might find it useful that the new Flink Dashboard also comes with this
> feature integrated, so you can visualize jobs that have been submitted to
> the cluster.
>
> Hope that helps,
>
> Marton
>
> On Thu, Jan 14, 2016 at 11:56 AM, lofifnc 
> wrote:
>
>> Hi,
>>
>> I'm trying to figure out what graph the execution plan represents when you
>> call env.getExecutionPlan on the StreamExecutionEnvironment. From my
>> understanding the StreamGraph is what you call an APIGraph, which will be
>> used to create the JobGraph.
>> So is the ExecutionPlan is a full representation of the StreamGraph?
>> And Is there a way to get a human-interpretable representation of the
>> JobGraph? :)
>>
>> Best,
>> Alex
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Execution-Plan-tp4290.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: Flink Execution Plan

2016-01-14 Thread lofifnc
Hi Márton, 

Thanks for your answer. But now I'm even more confused as it somehow
conflicts with the documentation. ;)
According to the wiki and the stratosphere paper the JobGraph will be
submitted to the JobManager. And the JobManager will then translate it into
the ExecutionGraph.

> In order to track the status of the parallel vertex and channel
> instances individually, the Job Manager spans the Job Graph
> to the Execution Graph, as shown in Fig. 9. The Execution
> Graph contains a node for each parallel instance of a vertex,
> which we refer to as a task.

So the ExecutionGraph should only be available at the JobManager and contain
a node for each parallel instance of a operator and the corresponding
vertices.

The question is in the context of my master thesis as I'm trying to describe
the deployment process of Flink. And wan't to use a visualization of the
execution plan as an concrete example for one of these three Graphs.

Best Alex!
 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Execution-Plan-tp4290p4297.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Flink QuickStart: On start up, running into ClassNotFoundException: org.apache.flink.streaming.runtime.tasks.OneInputStreamTask

2016-01-14 Thread Prez Cannady
Sure thing.  Opened pull requests #1506 against master.  Also submitted pull 
request #1507 against release-0.9.1.rc1 seeing as we’re still pinned to 0.9.1 
for our project.  Not sure if you guys are interested in hot fixes to previous 
releases, but there you have it.

Prez Cannady  
p: 617 500 3378  
e: revp...@opencorrelate.org   
GH: https://github.com/opencorrelate   
LI: https://www.linkedin.com/in/revprez   


> On Jan 14, 2016, at 4:07 AM, Stephan Ewen  wrote:
> 
> Hi!
> 
> I think this is a solid fix. Adding the classloader that loads Flink's 
> classes as the parent is a good.
> 
> Do you want to open a pull request with that?
> 
> 
> Greetings,
> Stephan
> 
> On Thu, Jan 14, 2016 at 2:26 AM, Prez Cannady  > wrote:
> Simply passing FlinkUserCodeClassLoader.class.getClassLoader to the parent 
> constructor cleared the impasse. 
> 
> 2016-01-13 20:06:43.637  INFO 35403 --- [   main] 
> o.o.e.j.s.SocketTextStreamWordCount$ : Started SocketTextStreamWordCount. 
> in 5.176 seconds (JVM running for 12.58)
> [INFO] 
> 
> [INFO] BUILD SUCCESS
> [INFO] 
> 
> [INFO] Total time: 11.734 s
> [INFO] Finished at: 2016-01-13T20:06:43-05:00
> [INFO] Final Memory: 49M/4986M
> [INFO] 
> 
> 2016-01-13 20:06:43.804  INFO 35403 --- [   Thread-3] 
> s.c.a.AnnotationConfigApplicationContext : Closing 
> org.springframework.context.annotation.AnnotationConfigApplicationContext@33248c18:
>  startup date [Wed Jan 13 20:06:38 EST 2016]; root of context hierarchy
> 2016-01-13 20:06:43.806  INFO 35403 --- [   Thread-3] 
> o.s.j.e.a.AnnotationMBeanExporter: Unregistering JMX-exposed beans on 
> shutdown
> 
> 
> All tests in flink-runtime passed after the change `BlobLibraryCacheManager’, 
> but I haven’t run the full test suite. 
> 
> Is this actually an appropriate fix, or just a way to highlight a 
> configuration problem? 
> 
> I assume that injecting a parent class loader when registering a task might 
> break things, but I don’t know nearly enough about Flink and this code to say 
> one way or another.
> 
> 
> 
> Prez Cannady  
> p: 617 500 3378  
> e: revp...@opencorrelate.org   
> GH: https://github.com/opencorrelate   
> LI: https://www.linkedin.com/in/revprez  
>  
> 
> 
> 
> 
> 
> 
> 
> 
> 
>> On Jan 13, 2016, at 6:50 PM, Stephan Ewen > > wrote:
>> 
>> Hi!
>> 
>> Running this is Spring, the whole classloader configuration is probably a 
>> bit different than in Flink's standalone or YARN or local mode.
>> 
>> Can you try if the following solves your problem: 
>> 
>> At the end of the file "BlobLibraryCacheManager", there is the private class 
>> "FlinkUserCodeClassloader".
>> 
>> Can you replace the current FlinkUserCodeClassloader with this?
>> 
>> 
>> private static class FlinkUserCodeClassLoader extends URLClassLoader {
>> 
>>  public FlinkUserCodeClassLoader(URL[] urls) {
>>  super(urls, FlinkUserCodeClassLoader.class.getClassLoader());
>>  }
>> }
>> 
>> You can also try and use instead of 
>> "FlinkUserCodeClassLoader.class.getClassLoader()" the statements 
>> "Thread.currentThread().getContextClassLoader()".
>> 
>> Let me know if one of the two solves the problem.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> On Wed, Jan 13, 2016 at 7:20 PM, Prez Cannady > > wrote:
>> I’m experimenting combining Spring with Flink.  I’ve successfully 
>> instrumented for Gradle, but Maven is emitting ClassNotFoundExceptions for 
>> items ostensibly on the class path.
>> 
>> Project is currently configured for:
>> 
>> 1. Scala 2.10.4
>> 2. Flink 0.9.1
>> 
>> I execute the following
>> 
>> ```
>> # In one terminal
>> $ nc -lk -p  --sh-exec "cat /usr/share/dict/words | head -n 10”
>> 
>> 
>> # In another terminal
>> $ mvn clean install spring-boot:run -Drun.arguments=“localhost,”
>> 
>> # observe output
>> ```
>> 
>> The specific class not found is 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.  However, 
>> Spring Boot Plugin is configured to repackage a fat jar, and I can see that 
>> the class is present in the included flink-streaming-core jar.  
>> Additionally, LogBack shows that the flink-streaming-core jar is in my 
>> classpath.
>> 
>> 
>> I’m hoping I’m just missing something that should be obvious.  While I wish 
>> could move forward with just Gradle, unfortunately I have to support Maven 
>> builds.
>> 
>> For reference, the complete project is available here:
>> 
>> https://github.com/OCExercise/wordcount-processing 
>> 

Re: Flink Execution Plan

2016-01-14 Thread Fabian Hueske
@Christian: I don't think that is possible.

There are quite a few things missing in the JSON including:
- User function objects (Flink ships objects not class names)
- Function configuration objects
- Data types

Best, Fabian

2016-01-14 16:02 GMT+01:00 lofifnc :

> Hi Márton,
>
> Thanks for your answer. But now I'm even more confused as it somehow
> conflicts with the documentation. ;)
> According to the wiki and the stratosphere paper the JobGraph will be
> submitted to the JobManager. And the JobManager will then translate it into
> the ExecutionGraph.
>
> > In order to track the status of the parallel vertex and channel
> > instances individually, the Job Manager spans the Job Graph
> > to the Execution Graph, as shown in Fig. 9. The Execution
> > Graph contains a node for each parallel instance of a vertex,
> > which we refer to as a task.
>
> So the ExecutionGraph should only be available at the JobManager and
> contain
> a node for each parallel instance of a operator and the corresponding
> vertices.
>
> The question is in the context of my master thesis as I'm trying to
> describe
> the deployment process of Flink. And wan't to use a visualization of the
> execution plan as an concrete example for one of these three Graphs.
>
> Best Alex!
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Execution-Plan-tp4290p4297.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Questions re: ExecutionGraph & ResultPartitions for interactive use a la Spark

2016-01-14 Thread kovas boguta
On Thu, Jan 14, 2016 at 5:52 AM, Ufuk Celebi  wrote:

> Hey Kovas
>
> sorry for the long delay.
>

It was worth the wait! Thanks for the detailed response.

> Ideally, I could force certain ResultPartitions to only be manually
> releasable, so I can consume them over and over.
>
> How would you like to have this controlled by the user? Offer a API
> operation like `.pin()`? Do you need it pinned permanently until you
> release it or would it be ok to just cache it and maybe recompute if
> another task needs more memory/disk space?
>

If another task needs resources, I would expect cached data to be moved to
lower levels of the storage hierarchy. We don't want to block/degrade new
computations, because it is impossible to tell the user upfront that their
new request will exceed the available resources.

I think extending the approach of Tachyon might be interesting.  Currently,
they have up to 3 tiers: memory, local disk, durable "understore" (hdfs,
s3, etc). One could add a fourth possibility, call it "null" which would
mean, just drop data and recompute if needed. Then, one could specify a
policy with a list like [memory, null] or [memory, local disk, null] and
have pretty reasonable control.

However its done, I hope its in the nice compositional, interface-driven
style that Flink enjoys today. "Simple" is much better than "easy" for me.
There's a bunch of questions, and having well-though-out architecture to
achieve different behaviors seems like the most valuable thing to me vs any
specific top-level API approach.

One issue/question:

How would the user refer to an existing dataset in subsequent operations?
When you create a dataset at the top-level API, is it automatically
assigned an ID?

The separation of logical & physical plan is a Good Thing. When one is
submitting additional plans, one needs to refer to previous nodes. Whether
nodes are assigned identity by value versus by reference makes a
difference.

>
> To get the data you need a SingleInputGate and set it up with
> RemoteChannel instances for each consumed subpartition. This is where you
> need to know all the partition IDs and task managers (The connection ID is
> a wrapper around InetSocketAddress with a connection index for connection
> sharing).


> When you have the gate setup, you can query it with the RecordReader. The
> input gate itself just returns byte buffers.
>

This part is fairly well described in the docs, and I was able to more or
less figure it out. Cool stuff.


> Do you have an idea about how you want to figure out the partition and
> connection IDs in your repl process? If yes, I could provide some more
> concrete code snippets on the setup.
>

For hacking purposes my plan was to manually instantiate the execution
graph and instrument some of the nodes so I can get information out. My
REPL would run in the same process as the JobManager.

For a "real" solution, the REPL needs seem related to the WebUI, which I
haven't studied yet. One would want a fairly detailed view into the running
execution graph, possibly but not necessarily as an HTTP api.

I haven't studied how the execution graph is instantiated yet, again I'd
rather inject this logic via composition than have it hard-coded into the
existing implementation. Will have to study more.

Thanks for the pointers!




>
> ---
>
> If would like to contribute to Flink, we can also think about splitting
> this up into some smaller tasks and address them. Your ideas are definitely
> in line with what we wanted to have in Flink anyways. The recent focus on
> the streaming part of the system has pulled most contributors away from the
> batch parts though. I would suggest to also look at the changes in #640.
> Although the PR is rather old, the network stack has not seen many changes
> in recent times.
>
> Feel free to post further questions! :)
>
> – Ufuk
>
>


Re: Questions re: ExecutionGraph & ResultPartitions for interactive use a la Spark

2016-01-14 Thread kovas boguta
On Thu, Jan 14, 2016 at 4:00 PM, kovas boguta 
wrote:
>
> For a "real" solution, the REPL needs seem related to the WebUI, which I
> haven't studied yet. One would want a fairly detailed view into the running
> execution graph, possibly but not necessarily as an HTTP api.
>
> I haven't studied how the execution graph is instantiated yet, again I'd
> rather inject this logic via composition than have it hard-coded into the
> existing implementation. Will have to study more.
>

Is there a way to hook into the global or 'per-job' event stream?

One approach could be to allow the user to submit a listener on the event
stream, which presumably contains all the relevant information about
availability and location of  result partitions (and other stuff of
interest). Writing a little service to send that info down to a client is
pretty easy from there.


Re: Flink v0.10.2

2016-01-14 Thread Nick Dimiduk
I would also find a 0.10.2 release useful :)

On Wed, Jan 13, 2016 at 1:24 AM, Welly Tambunan  wrote:

> Hi Robert,
>
> We are on deadline for demo stage right now before production for
> management so it would be great to have 0.10.2 for stable version within
> this week if possible ?
>
> Cheers
>
> On Wed, Jan 13, 2016 at 4:13 PM, Robert Metzger 
> wrote:
>
>> Hi,
>>
>> there are currently no planned releases. I would actually like to start
>> preparing for the 1.0 release soon, but the community needs to discuss that
>> first.
>>
>> How urgently do you need a 0.10.2 release? If this is the last blocker
>> for using Flink in production at your company, I can push for the bugfix
>> release.
>>
>>
>> On Wed, Jan 13, 2016 at 8:39 AM, Welly Tambunan 
>> wrote:
>>
>>> Hi All,
>>>
>>> We currently using snapshot version for development as we face Data
>>> Stream union error. For deployment we may need to built the flink from
>>> the master.
>>>
>>>
>>> I want to ask when this version will be released ? Any roadmap and plan
>>> i can look for this release ?
>>>
>>>
>>> Thanks a lot
>>>
>>> Cheers
>>> --
>>> Welly Tambunan
>>> Triplelands
>>>
>>> http://weltam.wordpress.com
>>> http://www.triplelands.com 
>>>
>>
>>
>
>
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


flink 1.0-SNAPSHOT scala 2.11 compilation error

2016-01-14 Thread David Kim
Hi,

I have a scala project depending on flink scala_2.11 and am seeing a
compilation error when using sbt.

I'm using flink 1.0-SNAPSHOT and my build was working yesterday. I was
wondering if maybe a recent change to flink could be the cause?

Usually we see flink resolving the scala _2.11 counterparts for akka and
scalatest:

[info] Resolving com.typesafe.akka#akka-actor_2.11;2.3.7 ...
[info] Resolving com.typesafe#config;1.2.1 ...
[info] Resolving com.typesafe.akka#akka-remote_2.11;2.3.7 ...[info]
Resolving org.scalatest#scalatest_2.11;2.2.4 ...



but am seeing it pick up the _2.10 counterparts:

[info] Resolving com.typesafe.akka#akka-actor_2.10;2.3.7 ...[info]
Resolving com.typesafe.akka#akka-actor_2.10;2.3.7 ...
[info] Resolving com.typesafe#config;1.2.1 ...[info] Resolving
org.scalatest#scalatest_2.10;2.2.2 ...



This ultimately gives us the following compilation error:

[error]org.scalatest:scalatest _2.10, _2.11
java.lang.RuntimeException: Conflicting cross-version suffixes in:
org.scalatest:scalatest
at scala.sys.package$.error(package.scala:27)
at sbt.ConflictWarning$.processCrossVersioned(ConflictWarning.scala:46)
at sbt.ConflictWarning$.apply(ConflictWarning.scala:32)



Maybe the latest 1.0-SNAPSHOT build erroneously built with scala 2.10 for
the 2.11 profile? Any guidance appreciated!

Thanks,
David


答复:UpdateTaskExecutionState during JobManager failover

2016-01-14 Thread wangzhijiang999
Hi Stephan,
 Thank you for detail explaination.  As you said, my opition is to keep task 
still running druing jobmanager failover, even though sending update status 
failed.
For the first reason you mentioned, if i understand correctly, the key issue is 
status out of sync between taskmanager and jobmanager. For example, when the 
jobmanager failover, the task is at CREATED status . When the task status 
transition to RUNNING, the updateStatus message can not be received because of 
jobmanager failover, then the taskmanager will retry sending the message to 
jobmanager until success. When the jobmanager recovers, the previous status of 
task is still CREATED in jobmanager view, and the task status maybe actually 
transition to FINISHED in taskmanager view. The key problem is that when the 
jobmanager received the FINISHED earlier than the RUNNING message, it will 
reject the FINISHED message.  If the task maintain a queue for sending message 
during jobmanager failover in order to confirm that the messages will be 
received in sequence at jobmanager when recover, that means the RUNNING status 
message must be arrived before FINISHED status message, are there any problems?
For the second reason you mentioned,  i am not very clear of the machenism of 
filtering the critical message by leaderSessionID, would you extend it in 
detail? 
I am trying to improve process of jobmanager and taskmanager failover, thank 
you for your help!
Zhijiang Wang