Hi Ahmed, I tried setting up your use case and for me it all seems to work. However, I didn't use the Spring framework and executed the program in a local Flink cluster.
Maybe you can compile a self-containing example (including example data) to reproduce your problem and send it to us. Cheers, Till On Wed, Jun 8, 2016 at 5:48 PM, Ahmed Nader <ahmednader...@gmail.com> wrote: > Hello Flavio, > Thank you so much for replying, however I didn't download Flink locally, I > only added dependencies in a maven project. So i don't think I'll be able > to modify the KryoSerializer class. But yeah me too i think it's the > problem. > Thanks, > Ahmed > > On 8 June 2016 at 16:07, Flavio Pompermaier <pomperma...@okkam.it> wrote: > >> Hi Ahmed, >> I also have the same error that is probably caused by the KryoSerializer. >> Right now I'm testing a patch to this problem so maybe you could also >> test it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you >> can use my KryoSerializer but I think so. Actually I just recreate Input >> and Output every time in the serialized/deserialize and then I close them. >> >> This is my attempt to fix the problem (actually the KryoSerializer class >> in the flink-core module): >> >> >> /* >> * Licensed to the Apache Software Foundation (ASF) under one >> * or more contributor license agreements. See the NOTICE file >> * distributed with this work for additional information >> * regarding copyright ownership. The ASF licenses this file >> * to you under the Apache License, Version 2.0 (the >> * "License"); you may not use this file except in compliance >> * with the License. You may obtain a copy of the License at >> * >> * http://www.apache.org/licenses/LICENSE-2.0 >> * >> * Unless required by applicable law or agreed to in writing, software >> * distributed under the License is distributed on an "AS IS" BASIS, >> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or >> implied. >> * See the License for the specific language governing permissions and >> * limitations under the License. >> */ >> >> package org.apache.flink.api.java.typeutils.runtime.kryo; >> >> import com.esotericsoftware.kryo.Kryo; >> import com.esotericsoftware.kryo.KryoException; >> import com.esotericsoftware.kryo.Serializer; >> import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; >> import com.esotericsoftware.kryo.io.Input; >> import com.esotericsoftware.kryo.io.Output; >> import com.esotericsoftware.kryo.serializers.JavaSerializer; >> import com.google.common.base.Preconditions; >> >> import org.apache.avro.generic.GenericData; >> import org.apache.flink.api.common.ExecutionConfig; >> import org.apache.flink.api.common.typeutils.TypeSerializer; >> import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; >> import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; >> import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput; >> import >> org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList; >> import org.apache.flink.core.memory.DataInputView; >> import org.apache.flink.core.memory.DataOutputView; >> import org.objenesis.strategy.StdInstantiatorStrategy; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> import java.io.ByteArrayInputStream; >> import java.io.ByteArrayOutputStream; >> import java.io.EOFException; >> import java.io.IOException; >> import java.lang.reflect.InvocationTargetException; >> import java.lang.reflect.Method; >> import java.lang.reflect.Modifier; >> import java.util.LinkedHashMap; >> import java.util.LinkedHashSet; >> import java.util.Map; >> import java.util.Objects; >> >> /** >> * A type serializer that serializes its type using the Kryo serialization >> * framework (https://github.com/EsotericSoftware/kryo). >> * >> * This serializer is intended as a fallback serializer for the cases >> that are >> * not covered by the basic types, tuples, and POJOs. >> * >> * @param <T> The type to be serialized. >> */ >> public class KryoSerializer<T> extends TypeSerializer<T> { >> >> private static final long serialVersionUID = 3L; >> >> private static final Logger LOG = >> LoggerFactory.getLogger(KryoSerializer.class); >> >> // >> ------------------------------------------------------------------------ >> >> private final LinkedHashMap<Class<?>, >> ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers; >> private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> >> registeredTypesWithSerializerClasses; >> private final LinkedHashMap<Class<?>, >> ExecutionConfig.SerializableSerializer<?>> defaultSerializers; >> private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> >> defaultSerializerClasses; >> private final LinkedHashSet<Class<?>> registeredTypes; >> >> private final Class<T> type; >> // >> ------------------------------------------------------------------------ >> // The fields below are lazily initialized after duplication or >> deserialization. >> >> private transient Kryo kryo; >> private transient T copyInstance; >> // >> ------------------------------------------------------------------------ >> >> public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ >> this.type = Preconditions.checkNotNull(type); >> >> this.defaultSerializers = executionConfig.getDefaultKryoSerializers(); >> this.defaultSerializerClasses = >> executionConfig.getDefaultKryoSerializerClasses(); >> this.registeredTypesWithSerializers = >> executionConfig.getRegisteredTypesWithKryoSerializers(); >> this.registeredTypesWithSerializerClasses = >> executionConfig.getRegisteredTypesWithKryoSerializerClasses(); >> this.registeredTypes = executionConfig.getRegisteredKryoTypes(); >> } >> >> /** >> * Copy-constructor that does not copy transient fields. They will be >> initialized once required. >> */ >> protected KryoSerializer(KryoSerializer<T> toCopy) { >> registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers; >> registeredTypesWithSerializerClasses = >> toCopy.registeredTypesWithSerializerClasses; >> defaultSerializers = toCopy.defaultSerializers; >> defaultSerializerClasses = toCopy.defaultSerializerClasses; >> registeredTypes = toCopy.registeredTypes; >> >> type = toCopy.type; >> if(type == null){ >> throw new NullPointerException("Type class cannot be null."); >> } >> } >> >> // >> ------------------------------------------------------------------------ >> >> @Override >> public boolean isImmutableType() { >> return false; >> } >> >> @Override >> public KryoSerializer<T> duplicate() { >> return new KryoSerializer<T>(this); >> } >> >> @Override >> public T createInstance() { >> if(Modifier.isAbstract(type.getModifiers()) || >> Modifier.isInterface(type.getModifiers()) ) { >> return null; >> } else { >> checkKryoInitialized(); >> try { >> return kryo.newInstance(type); >> } catch(Throwable e) { >> return null; >> } >> } >> } >> >> @SuppressWarnings("unchecked") >> @Override >> public T copy(T from) { >> if (from == null) { >> return null; >> } >> checkKryoInitialized(); >> try { >> return kryo.copy(from); >> } >> catch(KryoException ke) { >> // kryo was unable to copy it, so we do it through serialization: >> ByteArrayOutputStream baout = new ByteArrayOutputStream(); >> Output output = new Output(baout); >> >> kryo.writeObject(output, from); >> >> output.close(); >> >> ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); >> Input input = new Input(bain); >> >> return (T)kryo.readObject(input, from.getClass()); >> } >> } >> @Override >> public T copy(T from, T reuse) { >> return copy(from); >> } >> >> @Override >> public int getLength() { >> return -1; >> } >> >> @Override >> public void serialize(T record, DataOutputView target) throws IOException >> { >> checkKryoInitialized(); >> DataOutputViewStream outputStream = new DataOutputViewStream(target); >> Output output = new Output(outputStream); >> >> try { >> // Sanity check: Make sure that the output is cleared/has been flushed >> by the last call >> // otherwise data might be written multiple times in case of a >> previous EOFException >> if (output.position() != 0) { >> throw new IllegalStateException("The Kryo Output still >> contains data from a previous " + >> "serialize call. It has to be flushed or cleared at the >> end of the serialize call."); >> } >> >> kryo.writeClassAndObject(output, record); >> output.flush(); >> } >> catch (KryoException ke) { >> Throwable cause = ke.getCause(); >> if (cause instanceof EOFException) { >> throw (EOFException) cause; >> } >> else { >> throw ke; >> } >> } finally { >> try { >> output.close(); >> } catch (KryoException ke) { >> Throwable cause = ke.getCause(); >> >> if (cause instanceof EOFException) { >> throw (EOFException) cause; >> } else { >> throw ke; >> } >> } >> } >> } >> >> @SuppressWarnings("unchecked") >> @Override >> public T deserialize(DataInputView source) throws IOException { >> checkKryoInitialized(); >> DataInputViewStream inputStream = new DataInputViewStream(source); >> Input input = new NoFetchingInput(inputStream); >> >> try { >> return (T) kryo.readClassAndObject(input); >> } catch (KryoException ke) { >> Throwable cause = ke.getCause(); >> >> if (cause instanceof EOFException) { >> throw (EOFException) cause; >> } else { >> throw ke; >> } >> } finally { >> try { >> input.close(); >> } catch (KryoException ke) { >> Throwable cause = ke.getCause(); >> >> if (cause instanceof EOFException) { >> throw (EOFException) cause; >> } else { >> throw ke; >> } >> } >> } >> } >> @Override >> public T deserialize(T reuse, DataInputView source) throws IOException { >> return deserialize(source); >> } >> >> @Override >> public void copy(DataInputView source, DataOutputView target) throws >> IOException { >> checkKryoInitialized(); >> if(this.copyInstance == null){ >> this.copyInstance = createInstance(); >> } >> >> T tmp = deserialize(copyInstance, source); >> serialize(tmp, target); >> } >> // >> -------------------------------------------------------------------------------------------- >> @Override >> public int hashCode() { >> return Objects.hash( >> type, >> registeredTypes, >> registeredTypesWithSerializerClasses, >> defaultSerializerClasses); >> } >> @Override >> public boolean equals(Object obj) { >> if (obj instanceof KryoSerializer) { >> KryoSerializer<?> other = (KryoSerializer<?>) obj; >> >> // we cannot include the Serializers here because they don't implement >> the equals method >> return other.canEqual(this) && >> type == other.type && >> registeredTypes.equals(other.registeredTypes) && >> registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) >> && >> defaultSerializerClasses.equals(other.defaultSerializerClasses); >> } else { >> return false; >> } >> } >> >> @Override >> public boolean canEqual(Object obj) { >> return obj instanceof KryoSerializer; >> } >> >> // >> -------------------------------------------------------------------------------------------- >> >> /** >> * Returns the Chill Kryo Serializer which is implictly added to the >> classpath via flink-runtime. >> * Falls back to the default Kryo serializer if it can't be found. >> * @return The Kryo serializer instance. >> */ >> private Kryo getKryoInstance() { >> >> try { >> // check if ScalaKryoInstantiator is in class path (coming from Twitter's >> Chill library). >> // This will be true if Flink's Scala API is used. >> Class<?> chillInstantiatorClazz = >> Class.forName("com.twitter.chill.ScalaKryoInstantiator"); >> Object chillInstantiator = chillInstantiatorClazz.newInstance(); >> >> // obtain a Kryo instance through Twitter Chill >> Method m = chillInstantiatorClazz.getMethod("newKryo"); >> >> return (Kryo) m.invoke(chillInstantiator); >> } catch (ClassNotFoundException | InstantiationException | >> NoSuchMethodException | >> IllegalAccessException | InvocationTargetException e) { >> >> LOG.warn("Falling back to default Kryo serializer because Chill >> serializer couldn't be found.", e); >> >> Kryo.DefaultInstantiatorStrategy initStrategy = new >> Kryo.DefaultInstantiatorStrategy(); >> initStrategy.setFallbackInstantiatorStrategy(new >> StdInstantiatorStrategy()); >> >> Kryo kryo = new Kryo(); >> kryo.setInstantiatorStrategy(initStrategy); >> >> return kryo; >> } >> } >> >> private void checkKryoInitialized() { >> if (this.kryo == null) { >> this.kryo = getKryoInstance(); >> >> // Enable reference tracking. >> kryo.setReferences(true); >> // Throwable and all subclasses should be serialized via java >> serialization >> kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); >> >> // Add default serializers first, so that they type registrations without >> a serializer >> // are registered with a default serializer >> for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> >> entry: defaultSerializers.entrySet()) { >> kryo.addDefaultSerializer(entry.getKey(), >> entry.getValue().getSerializer()); >> } >> >> for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: >> defaultSerializerClasses.entrySet()) { >> kryo.addDefaultSerializer(entry.getKey(), entry.getValue()); >> } >> >> // register the type of our class >> kryo.register(type); >> >> // register given types. we do this first so that any registration of a >> // more specific serializer overrides this >> for (Class<?> type : registeredTypes) { >> kryo.register(type); >> } >> >> // register given serializer classes >> for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : >> registeredTypesWithSerializerClasses.entrySet()) { >> Class<?> typeClass = e.getKey(); >> Class<? extends Serializer<?>> serializerClass = e.getValue(); >> >> Serializer<?> serializer = >> ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, >> typeClass); >> kryo.register(typeClass, serializer); >> } >> >> // register given serializers >> for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : >> registeredTypesWithSerializers.entrySet()) { >> kryo.register(e.getKey(), e.getValue().getSerializer()); >> } >> // this is needed for Avro but can not be added on demand. >> kryo.register(GenericData.Array.class, new >> SpecificInstanceCollectionSerializerForArrayList()); >> >> kryo.setRegistrationRequired(false); >> kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); >> } >> } >> >> // >> -------------------------------------------------------------------------------------------- >> // For testing >> // >> -------------------------------------------------------------------------------------------- >> public Kryo getKryo() { >> checkKryoInitialized(); >> return this.kryo; >> } >> } >> >> Best, >> Flavio >> >> On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ahmednader...@gmail.com> >> wrote: >> >>> Hello, >>> I have a TwitterSource and I'm applying some transformations as filter >>> and map on the resulting stream from twitter. I'm collecting the output in >>> an iterator: iterator = DataStreamUtils.collect(datastream). Then in a >>> parallel thread i periodically check if this iterator.hasNext() and print >>> the next item. I'm using Flink 1.0.3. >>> That program works at the beginning and actually prints some items, >>> however when i leave it running for some more time (Like for example after >>> 40 seconds or 1 minute) then i get 2 exceptions which are: >>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class >>> ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0. >>> These 2 exceptions result from the line where i'm checking if the >>> iterator hasNext(). >>> >>> I wanted to know why do these exceptions happen in general and also if >>> anyone knows a specific solution for my program, that would be great too. >>> Thanks, >>> Ahmed >>> >> >> >> >> -- >> >> Flavio Pompermaier >> >> *Development Department*_______________________________________________ >> *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>* >> >> *Phone:* +(39) 0461 283 702 >> *Fax:* + (39) 0461 186 6433 >> *Email:* pomperma...@okkam.it >> *Headquarters:* Trento (Italy), via G.B. Trener 8 >> *Registered office:* Trento (Italy), via Segantini 23 >> >> Confidentially notice. This e-mail transmission may contain legally >> privileged and/or confidential information. Please do not read it if you >> are not the intended recipient(S). Any use, distribution, reproduction or >> disclosure by any other person is strictly prohibited. If you have received >> this e-mail in error, please notify the sender and destroy the original >> transmission and its attachments without reading or saving it in any manner. >> >> >