Hello Till, Thanks so much for your reply. Here's my program: So that's TwitterSource:
public class TwitterSource extends Stream { private static final long serialVersionUID = 1L; protected transient BlockingQueue<String> queue; protected int queueSize = 10000; private transient BasicClient client; private int waitSec = 5; private int maxNumberOfTweets; private int currentNumberOfTweets; private volatile boolean isRunning = true; public TwitterSource(int numberOfTweets) { this.maxNumberOfTweets = numberOfTweets; currentNumberOfTweets = 0; } public void initializeConnection() { queue = new LinkedBlockingQueue<>(queueSize); UserstreamEndpoint endpoint = new UserstreamEndpoint (); endpoint.stallWarnings(false); Authentication auth = authenticate(); initializeClient(endpoint, auth); } public OAuth1 authenticate() { return new OAuth1("---"); } protected void initializeClient(DefaultStreamingEndpoint endpoint, Authentication auth) { client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.USERSTREAM_HOST) .endpoint(endpoint).authentication(auth) .processor(new StringDelimitedProcessor(queue)).build(); client.connect(); } @Override public void run(SourceContext<Object> sourceContext) throws Exception { initializeConnection(); while (isRunning) { sourceContext.collect(queue.take()); currentNumberOfTweets++; if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets) { break; } Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } } then i initialize it: List<Object> globalEntities = new ArrayList<>(); Iterator iterator; public void runModel(String key) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment. getExecutionEnvironment(); DataStream twitter = env.addSource(new TwitterSource(30)).flatMap(new processTweets()); twitter.filter(new FilterFunction<Object>() { @Override public boolean filter(Object tweet) throws Exception { Tweet Singletweet = (Tweet) tweet; return Singletweet.search(key); } }).print(); iterator = DataStreamUtils.collect(datastream); } //this method is called periodically with an Ajax call every 2 seconds public void viewResults(Model model) { if (iterator != null) { if (iterator.hasNext()) { globalEntities.add(iter.next()); } } if (!globalEntities.isEmpty()) { model.addAttribute("list", globalEntities); } } public static class processTweets extends JSONParseFlatMap<Object, Object> { @Override public void flatMap(Object value, Collector<Object> out) throws Exception { try { //if (getString((String)value, "user.lang").equals("en")) { Tweet tweet = new Tweet(); // message of tweet tweet.setText(getString((String) value, "text")); tweet.setUser(getString((String) value, "user.name")); out.collect(tweet); // } } catch (JSONException e) { // the JSON was not parsed correctly } } } import java.util.regex.Matcher; import java.util.regex.Pattern; public class Tweet { private String user; private String text; public Tweet() { } public String getUser() { return user; } public void setUser(String user) { this.user = user; } public String getText() { return text; } public void setText(String text) { this.text = text; } public String toString() { return this.user+" : "+this.text; } public boolean search(String key) { String patternString = ".*"+key+".*"; Pattern pattern = Pattern.compile(patternString, Pattern.CASE_INSENSITIVE); Matcher matcher = pattern.matcher(this.toString()); return matcher.find(); } } *And that's the stack trace:* 2016-06-08 17:20:10.091 INFO 13564 --- [om Source (1/1)] o.apache.flink.runtime.taskmanager.Task : Source: Custom Source (1/1) switched to RUNNING 2016-06-08 17:20:10.096 INFO 13564 --- [lt-dispatcher-2] o.a.f.r.executiongraph.ExecutionGraph : Sink: Unnamed (1/1) (50d42a893093705f278bd0aa099a53d3) switched from DEPLOYING to RUNNING 2016-06-08 17:20:10.097 INFO 13564 --- [lt-dispatcher-5] o.a.f.r.executiongraph.ExecutionGraph : Flat Map -> Filter -> Sink: Unnamed (4/4) (3c98bbdab04256d73f1f405669d007a8) switched from DEPLOYING to RUNNING 2016-06-08 17:20:10.097 INFO 13564 --- [lt-dispatcher-3] o.a.f.r.executiongraph.ExecutionGraph : Flat Map -> Filter -> Sink: Unnamed (3/4) (49c23924bae61cead7158bc817c22d0b) switched from DEPLOYING to RUNNING 2016-06-08 17:20:10.097 INFO 13564 --- [lt-dispatcher-4] o.a.f.r.executiongraph.ExecutionGraph : Flat Map -> Filter -> Sink: Unnamed (2/4) (7cfc6b655d7bb70beb901012094db0e5) switched from DEPLOYING to RUNNING 2016-06-08 17:20:10.099 INFO 13564 --- [lt-dispatcher-7] o.a.f.r.executiongraph.ExecutionGraph : Flat Map -> Filter -> Sink: Unnamed (1/4) (6afafe555579dc31e1c974c7238d486c) switched from DEPLOYING to RUNNING 2016-06-08 17:20:10.099 INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor : 06/08/2016 17:20:10 Sink: Unnamed(1/1) switched to RUNNING 06/08/2016 17:20:10 Sink: Unnamed(1/1) switched to RUNNING 2016-06-08 17:20:10.100 INFO 13564 --- [lt-dispatcher-9] o.a.f.r.executiongraph.ExecutionGraph : Source: Custom Source (1/1) (92f08ef0a7d715478bf9fe60e8bc4dea) switched from DEPLOYING to RUNNING 2016-06-08 17:20:10.107 INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor : 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(4/4) switched to RUNNING 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(4/4) switched to RUNNING 2016-06-08 17:20:10.108 INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor : 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(3/4) switched to RUNNING 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(3/4) switched to RUNNING 2016-06-08 17:20:10.109 INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor : 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(2/4) switched to RUNNING 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(2/4) switched to RUNNING 2016-06-08 17:20:10.109 INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor : 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(1/4) switched to RUNNING 06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(1/4) switched to RUNNING 2016-06-08 17:20:10.109 INFO 13564 --- [lt-dispatcher-6] o.a.flink.runtime.client.JobClientActor : 06/08/2016 17:20:10 Source: Custom Source(1/1) switched to RUNNING 06/08/2016 17:20:10 Source: Custom Source(1/1) switched to RUNNING 2016-06-08 17:20:10.124 WARN 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.125 INFO 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.129 WARN 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.132 INFO 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.132 WARN 13564 --- [: Unnamed (1/1)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.132 INFO 13564 --- [: Unnamed (1/1)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.136 WARN 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.136 INFO 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.137 WARN 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.137 INFO 13564 --- [: Unnamed (4/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.138 WARN 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.138 INFO 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.139 WARN 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.139 INFO 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.140 WARN 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.140 INFO 13564 --- [: Unnamed (2/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.148 WARN 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.148 INFO 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.149 WARN 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.149 INFO 13564 --- [: Unnamed (3/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.150 WARN 13564 --- [om Source (1/1)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.150 INFO 13564 --- [om Source (1/1)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.158 WARN 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.159 INFO 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.159 WARN 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.160 INFO 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.160 WARN 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask : No state backend has been specified, using default state backend (Memory / JobManager) 2016-06-08 17:20:10.160 INFO 13564 --- [: Unnamed (1/4)] o.a.f.s.runtime.tasks.StreamTask : State backend is set to heap memory (checkpoint to jobmanager) 2016-06-08 17:20:10.359 INFO 13564 --- [om Source (1/1)] com.twitter.hbc.httpclient.BasicClient : New connection executed: twitterSourceClient, endpoint: /1.1/user.json?delimited=length 2016-06-08 17:20:10.622 INFO 13564 --- [ent-io-thread-0] com.twitter.hbc.httpclient.ClientBase : twitterSourceClient Establishing a connection 2016-06-08 17:20:11.687 INFO 13564 --- [ent-io-thread-0] com.twitter.hbc.httpclient.ClientBase : twitterSourceClient Processing connection data *Then Here are some results printed:* 2> Fast Company : Homemade brings an Etsy mindset to food https://t.co/9uaOTPka58 https://t.co/VB7IFIxrdM 3> tagesthemen : Noch immer #Flüchtlinge auf der #Balkanroute. An der Grenze zu #Ungarn wartet Stacheldraht - der hat aber Löcher. https://t.co/YHW25gnTzA 4> BuzzFeed News : Maria Sharapova says that she will fight back against the ITF's decision to suspend her https://t.co/DuNhDUv64f https://t.co/3BZcfR9Vid *Then the exceptions are thrown:* 2016-06-08 17:20:42.326 ERROR 13564 --- [io-8080-exec-10] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 97] with root cause com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 97 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) ~[kryo-2.24.0.jar:na] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) ~[flink-core-1.0.3.jar:1.0.3] at org.apache.flink.contrib.streaming.DataStreamIterator.readNextFromStream(DataStreamIterator.java:108) ~[flink-streaming-contrib-0.10.2.jar:0.10.2] at org.apache.flink.contrib.streaming.DataStreamIterator.hasNext(DataStreamIterator.java:78) ~[flink-streaming-contrib-0.10.2.jar:0.10.2] at com.example.controllers.MiningModelController.viewResults(MiningModelController.java:559) ~[classes/:na] at com.example.controllers.MiningModelController$$FastClassBySpringCGLIB$$d468bf0d.invoke(<generated>) ~[classes/:na] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:651) ~[spring-aop-4.2.5.RELEASE.jar:4.2.5.RELEASE] at com.example.controllers.MiningModelController$$EnhancerBySpringCGLIB$$de7f687c.viewResults(<generated>) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_73] at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(HttpServlet.service(HttpServlet.java:729) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:292) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:87) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:121) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:522) AbstractConnectionHandler.process(AbstractProtocol.java:672) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456) [tomcat-embed-core-8.0.32.jar:8.0.32] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_73] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_73] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.0.32.jar:8.0.32] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73] java.lang.IndexOutOfBoundsException: Index: 99, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) ~[na:1.8.0_73] at java.util.ArrayList.get(ArrayList.java:429) ~[na:1.8.0_73] at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) ~[kryo-2.24.0.jar:na] at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) ~[kryo-2.24.0.jar:na] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) ~[flink-core-1.0.3.jar:1.0.3] at org.apache.flink.contrib.streaming.DataStreamIterator.readNextFromStream(DataStreamIterator.java:108) ~[flink-streaming-contrib-0.10.2.jar:0.10.2] at org.apache.flink.contrib.streaming.DataStreamIterator.hasNext(DataStreamIterator.java:78) ~[flink-streaming-contrib-0.10.2.jar:0.10.2] at com.example.controllers.MiningModelController.viewResults(MiningModelController.java:559) ~[classes/:na] at com.example.controllers.MiningModelController$$FastClassBySpringCGLIB$$d468bf0d.invoke(<generated>) ~[classes/:na] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) ~[spring-core-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:651) ~[spring-aop-4.2.5.RELEASE.jar:4.2.5.RELEASE] at com.example.controllers.MiningModelController$$EnhancerBySpringCGLIB$$de7f687c.viewResults(<generated>) ~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_73] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_73] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_73] at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:292) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:87) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77) ~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:522) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1095) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:672) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500) [tomcat-embed-core-8.0.32.jar:8.0.32] at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456) [tomcat-embed-core-8.0.32.jar:8.0.32] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_73] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_73] at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.0.32.jar:8.0.32] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_73] And please note that the line where these exceptions point is the line that I'm checking the condition if(iterator.hasNext()) Thanks, Ahmed On 8 June 2016 at 16:07, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Ahmed, > > the problem usually occurs, if you use differently initialized Kryo > instances where one instance has a different set of classes registered. But > your data could also be corrupted because you see an > IndexOutOfBoundsException where you try to access an element of an array > with size 0 at index 32. > > In order to debug the problem it would be helpful to see the full stack > traces of the errors and the complete error message. Additionally, it would > be helpful to see your program so that we could try to reproduce the > problem. > > Cheers, > Till > > On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ahmednader...@gmail.com> > wrote: > >> Hello, >> I have a TwitterSource and I'm applying some transformations as filter >> and map on the resulting stream from twitter. I'm collecting the output in >> an iterator: iterator = DataStreamUtils.collect(datastream). Then in a >> parallel thread i periodically check if this iterator.hasNext() and print >> the next item. I'm using Flink 1.0.3. >> That program works at the beginning and actually prints some items, >> however when i leave it running for some more time (Like for example after >> 40 seconds or 1 minute) then i get 2 exceptions which are: >> com.esotericsoftware.kryo.KryoException: Encountered unregistered class >> ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0. >> These 2 exceptions result from the line where i'm checking if the >> iterator hasNext(). >> >> I wanted to know why do these exceptions happen in general and also if >> anyone knows a specific solution for my program, that would be great too. >> Thanks, >> Ahmed >> > >