Hello Till,
Thanks  so much for your reply. Here's my program:
So that's TwitterSource:

public class TwitterSource extends Stream {
    private static final long serialVersionUID = 1L;
    protected transient BlockingQueue<String> queue;
    protected int queueSize = 10000;
    private transient BasicClient client;
    private int waitSec = 5;

    private int maxNumberOfTweets;
    private int currentNumberOfTweets;

    private volatile boolean isRunning = true;

    public TwitterSource(int numberOfTweets) {
        this.maxNumberOfTweets = numberOfTweets;
       currentNumberOfTweets = 0;
    }

    public void initializeConnection() {
        queue = new LinkedBlockingQueue<>(queueSize);

        UserstreamEndpoint endpoint = new UserstreamEndpoint ();
        endpoint.stallWarnings(false);

        Authentication auth = authenticate();

        initializeClient(endpoint, auth);

    }

    public OAuth1 authenticate() {
        return new OAuth1("---");
    }

    protected void initializeClient(DefaultStreamingEndpoint endpoint,
Authentication auth) {
        client = new
ClientBuilder().name("twitterSourceClient").hosts(Constants.USERSTREAM_HOST)
                .endpoint(endpoint).authentication(auth)
                .processor(new StringDelimitedProcessor(queue)).build();

        client.connect();
    }

    @Override
    public void run(SourceContext<Object> sourceContext) throws Exception {
        initializeConnection();
        while (isRunning) {
            sourceContext.collect(queue.take());
            currentNumberOfTweets++;
            if (maxNumberOfTweets != -1 && currentNumberOfTweets >=
maxNumberOfTweets) {
                break;
            }
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

then i initialize it:

List<Object> globalEntities = new ArrayList<>();
Iterator iterator;

public void runModel(String key) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment(); DataStream twitter = env.addSource(new
TwitterSource(30)).flatMap(new processTweets()); twitter.filter(new
FilterFunction<Object>()
{

    @Override
    public boolean filter(Object tweet) throws Exception {
        Tweet Singletweet = (Tweet) tweet;
        return Singletweet.search(key);
    }
    }).print();

iterator = DataStreamUtils.collect(datastream); } //this method is called
periodically with an Ajax call every 2 seconds

public void viewResults(Model model) {

if (iterator != null) {
    if (iterator.hasNext()) {
        globalEntities.add(iter.next());
    }
}
if (!globalEntities.isEmpty()) {
    model.addAttribute("list", globalEntities);
}

}

public static class processTweets extends JSONParseFlatMap<Object, Object> {
    @Override
    public void flatMap(Object value, Collector<Object> out) throws Exception {
        try {
            //if (getString((String)value, "user.lang").equals("en")) {
                Tweet tweet = new Tweet();
                // message of tweet
                tweet.setText(getString((String) value, "text"));
                tweet.setUser(getString((String) value, "user.name"));
                out.collect(tweet);
           // }
        } catch (JSONException e) {
            // the JSON was not parsed correctly
        }
    }
}

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class Tweet {
    private String user;
    private String text;

    public Tweet() {

    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public String toString() {
        return this.user+" : "+this.text;
    }

    public boolean search(String key) {
        String patternString = ".*"+key+".*";
        Pattern pattern = Pattern.compile(patternString,
Pattern.CASE_INSENSITIVE);
        Matcher matcher = pattern.matcher(this.toString());
        return matcher.find();
    }
}

*And that's the stack trace:*
2016-06-08 17:20:10.091  INFO 13564 --- [om Source (1/1)]
o.apache.flink.runtime.taskmanager.Task  : Source: Custom Source (1/1)
switched to RUNNING
2016-06-08 17:20:10.096  INFO 13564 --- [lt-dispatcher-2]
o.a.f.r.executiongraph.ExecutionGraph    : Sink: Unnamed (1/1)
(50d42a893093705f278bd0aa099a53d3) switched from DEPLOYING to RUNNING
2016-06-08 17:20:10.097  INFO 13564 --- [lt-dispatcher-5]
o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink:
Unnamed (4/4) (3c98bbdab04256d73f1f405669d007a8) switched from DEPLOYING to
RUNNING
2016-06-08 17:20:10.097  INFO 13564 --- [lt-dispatcher-3]
o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink:
Unnamed (3/4) (49c23924bae61cead7158bc817c22d0b) switched from DEPLOYING to
RUNNING
2016-06-08 17:20:10.097  INFO 13564 --- [lt-dispatcher-4]
o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink:
Unnamed (2/4) (7cfc6b655d7bb70beb901012094db0e5) switched from DEPLOYING to
RUNNING
2016-06-08 17:20:10.099  INFO 13564 --- [lt-dispatcher-7]
o.a.f.r.executiongraph.ExecutionGraph    : Flat Map -> Filter -> Sink:
Unnamed (1/4) (6afafe555579dc31e1c974c7238d486c) switched from DEPLOYING to
RUNNING
2016-06-08 17:20:10.099  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Sink:
Unnamed(1/1) switched to RUNNING
06/08/2016 17:20:10 Sink: Unnamed(1/1) switched to RUNNING
2016-06-08 17:20:10.100  INFO 13564 --- [lt-dispatcher-9]
o.a.f.r.executiongraph.ExecutionGraph    : Source: Custom Source (1/1)
(92f08ef0a7d715478bf9fe60e8bc4dea) switched from DEPLOYING to RUNNING
2016-06-08 17:20:10.107  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map ->
Filter -> Sink: Unnamed(4/4) switched to RUNNING
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(4/4) switched to
RUNNING
2016-06-08 17:20:10.108  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map ->
Filter -> Sink: Unnamed(3/4) switched to RUNNING
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(3/4) switched to
RUNNING
2016-06-08 17:20:10.109  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map ->
Filter -> Sink: Unnamed(2/4) switched to RUNNING
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(2/4) switched to
RUNNING
2016-06-08 17:20:10.109  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Flat Map ->
Filter -> Sink: Unnamed(1/4) switched to RUNNING
06/08/2016 17:20:10 Flat Map -> Filter -> Sink: Unnamed(1/4) switched to
RUNNING
2016-06-08 17:20:10.109  INFO 13564 --- [lt-dispatcher-6]
o.a.flink.runtime.client.JobClientActor  : 06/08/2016 17:20:10 Source:
Custom Source(1/1) switched to RUNNING
06/08/2016 17:20:10 Source: Custom Source(1/1) switched to RUNNING
2016-06-08 17:20:10.124  WARN 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.125  INFO 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.129  WARN 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.132  INFO 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.132  WARN 13564 --- [: Unnamed (1/1)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.132  INFO 13564 --- [: Unnamed (1/1)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.136  WARN 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.136  INFO 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.137  WARN 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.137  INFO 13564 --- [: Unnamed (4/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.138  WARN 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.138  INFO 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.139  WARN 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.139  INFO 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.140  WARN 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.140  INFO 13564 --- [: Unnamed (2/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.148  WARN 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.148  INFO 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.149  WARN 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.149  INFO 13564 --- [: Unnamed (3/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.150  WARN 13564 --- [om Source (1/1)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.150  INFO 13564 --- [om Source (1/1)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.158  WARN 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.159  INFO 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.159  WARN 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.160  INFO 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.160  WARN 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : No state backend has been
specified, using default state backend (Memory / JobManager)
2016-06-08 17:20:10.160  INFO 13564 --- [: Unnamed (1/4)]
o.a.f.s.runtime.tasks.StreamTask         : State backend is set to heap
memory (checkpoint to jobmanager)
2016-06-08 17:20:10.359  INFO 13564 --- [om Source (1/1)]
com.twitter.hbc.httpclient.BasicClient   : New connection executed:
twitterSourceClient, endpoint: /1.1/user.json?delimited=length
2016-06-08 17:20:10.622  INFO 13564 --- [ent-io-thread-0]
com.twitter.hbc.httpclient.ClientBase    : twitterSourceClient Establishing
a connection
2016-06-08 17:20:11.687  INFO 13564 --- [ent-io-thread-0]
com.twitter.hbc.httpclient.ClientBase    : twitterSourceClient Processing
connection data

*Then Here are some results printed:*

2> Fast Company : Homemade brings an Etsy mindset to food
https://t.co/9uaOTPka58 https://t.co/VB7IFIxrdM
3> tagesthemen : Noch immer #Flüchtlinge auf der #Balkanroute. An der
Grenze zu #Ungarn wartet Stacheldraht - der hat aber Löcher.
https://t.co/YHW25gnTzA
4> BuzzFeed News : Maria Sharapova says that she will fight back against
the ITF's decision to suspend her https://t.co/DuNhDUv64f
https://t.co/3BZcfR9Vid

*Then  the exceptions are thrown:*

2016-06-08 17:20:42.326 ERROR 13564 --- [io-8080-exec-10]
o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet
[dispatcherServlet] in context with path [] threw exception [Request
processing failed; nested exception is
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
97] with root cause

com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
97 at
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
~[kryo-2.24.0.jar:na] at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
~[kryo-2.24.0.jar:na] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
~[kryo-2.24.0.jar:na] at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
~[flink-core-1.0.3.jar:1.0.3] at
org.apache.flink.contrib.streaming.DataStreamIterator.readNextFromStream(DataStreamIterator.java:108)
~[flink-streaming-contrib-0.10.2.jar:0.10.2] at
org.apache.flink.contrib.streaming.DataStreamIterator.hasNext(DataStreamIterator.java:78)
~[flink-streaming-contrib-0.10.2.jar:0.10.2] at
com.example.controllers.MiningModelController.viewResults(MiningModelController.java:559)
~[classes/:na] at
com.example.controllers.MiningModelController$$FastClassBySpringCGLIB$$d468bf0d.invoke(<generated>)
~[classes/:na] at
org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
~[spring-core-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:651)
~[spring-aop-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
com.example.controllers.MiningModelController$$EnhancerBySpringCGLIB$$de7f687c.viewResults(<generated>)
~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[na:1.8.0_73] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[na:1.8.0_73] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497)
~[na:1.8.0_73] at
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(HttpServlet.service(HttpServlet.java:729)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:292)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
~[tomcat-embed-websocket-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
ApplicationFilterChain.java:240) ~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:87)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:121)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:522)
AbstractConnectionHandler.process(AbstractProtocol.java:672)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_73] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_73] at
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]

java.lang.IndexOutOfBoundsException: Index: 99, Size: 0 at
java.util.ArrayList.rangeCheck(ArrayList.java:653) ~[na:1.8.0_73] at
java.util.ArrayList.get(ArrayList.java:429) ~[na:1.8.0_73] at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
~[kryo-2.24.0.jar:na] at
com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
~[kryo-2.24.0.jar:na] at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
~[kryo-2.24.0.jar:na] at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
~[flink-core-1.0.3.jar:1.0.3] at
org.apache.flink.contrib.streaming.DataStreamIterator.readNextFromStream(DataStreamIterator.java:108)
~[flink-streaming-contrib-0.10.2.jar:0.10.2] at
org.apache.flink.contrib.streaming.DataStreamIterator.hasNext(DataStreamIterator.java:78)
~[flink-streaming-contrib-0.10.2.jar:0.10.2] at
com.example.controllers.MiningModelController.viewResults(MiningModelController.java:559)
~[classes/:na] at
com.example.controllers.MiningModelController$$FastClassBySpringCGLIB$$d468bf0d.invoke(<generated>)
~[classes/:na] at
org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
~[spring-core-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:651)
~[spring-aop-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
com.example.controllers.MiningModelController$$EnhancerBySpringCGLIB$$de7f687c.viewResults(<generated>)
~[classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) ~[na:1.8.0_73] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[na:1.8.0_73] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[na:1.8.0_73] at java.lang.reflect.Method.invoke(Method.java:497)
~[na:1.8.0_73] at
org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:292)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
~[tomcat-embed-websocket-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99)
~[spring-org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:87)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:77)
~[spring-web-4.2.5.RELEASE.jar:4.2.5.RELEASE] at
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212)
~[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:522)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1095)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:672)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1500)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1456)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_73] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_73] at
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
[tomcat-embed-core-8.0.32.jar:8.0.32] at
java.lang.Thread.run(Thread.java:745) [na:1.8.0_73]

And please note that the line where these exceptions point is the line that
I'm checking the condition if(iterator.hasNext())

Thanks,
Ahmed




On 8 June 2016 at 16:07, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Ahmed,
>
> the problem usually occurs, if you use differently initialized Kryo
> instances where one instance has a different set of classes registered. But
> your data could also be corrupted because you see an
> IndexOutOfBoundsException where you try to access an element of an array
> with size 0 at index 32.
>
> In order to debug the problem it would be helpful to see the full stack
> traces of the errors and the complete error message. Additionally, it would
> be helpful to see your program so that we could try to reproduce the
> problem.
>
> Cheers,
> Till
>
> On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ahmednader...@gmail.com>
> wrote:
>
>> Hello,
>> I have a TwitterSource and I'm applying some transformations as filter
>> and map on the resulting stream from twitter. I'm collecting the output in
>> an iterator: iterator = DataStreamUtils.collect(datastream). Then in a
>> parallel thread i periodically check if this iterator.hasNext() and print
>> the next item. I'm using Flink 1.0.3.
>> That program works at the beginning and actually prints some items,
>> however when i leave it running for some more time (Like for example after
>> 40 seconds or 1 minute) then i get 2 exceptions which are:
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class
>> ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0.
>> These 2 exceptions result from the line where i'm checking if the
>> iterator hasNext().
>>
>> I wanted to know why do these exceptions happen in general and also if
>> anyone knows a specific solution for my program, that would be great too.
>> Thanks,
>> Ahmed
>>
>
>

Reply via email to