Hello Flavio, Thank you so much for replying, however I didn't download Flink locally, I only added dependencies in a maven project. So i don't think I'll be able to modify the KryoSerializer class. But yeah me too i think it's the problem. Thanks, Ahmed
On 8 June 2016 at 16:07, Flavio Pompermaier <pomperma...@okkam.it> wrote: > Hi Ahmed, > I also have the same error that is probably caused by the KryoSerializer. > Right now I'm testing a patch to this problem so maybe you could also test > it. Unfortunately I'm using Flink 1.0.2 so I don't know whether you can use > my KryoSerializer but I think so. Actually I just recreate Input and Output > every time in the serialized/deserialize and then I close them. > > This is my attempt to fix the problem (actually the KryoSerializer class > in the flink-core module): > > > /* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > package org.apache.flink.api.java.typeutils.runtime.kryo; > > import com.esotericsoftware.kryo.Kryo; > import com.esotericsoftware.kryo.KryoException; > import com.esotericsoftware.kryo.Serializer; > import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory; > import com.esotericsoftware.kryo.io.Input; > import com.esotericsoftware.kryo.io.Output; > import com.esotericsoftware.kryo.serializers.JavaSerializer; > import com.google.common.base.Preconditions; > > import org.apache.avro.generic.GenericData; > import org.apache.flink.api.common.ExecutionConfig; > import org.apache.flink.api.common.typeutils.TypeSerializer; > import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; > import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream; > import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput; > import > org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList; > import org.apache.flink.core.memory.DataInputView; > import org.apache.flink.core.memory.DataOutputView; > import org.objenesis.strategy.StdInstantiatorStrategy; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import java.io.ByteArrayInputStream; > import java.io.ByteArrayOutputStream; > import java.io.EOFException; > import java.io.IOException; > import java.lang.reflect.InvocationTargetException; > import java.lang.reflect.Method; > import java.lang.reflect.Modifier; > import java.util.LinkedHashMap; > import java.util.LinkedHashSet; > import java.util.Map; > import java.util.Objects; > > /** > * A type serializer that serializes its type using the Kryo serialization > * framework (https://github.com/EsotericSoftware/kryo). > * > * This serializer is intended as a fallback serializer for the cases that > are > * not covered by the basic types, tuples, and POJOs. > * > * @param <T> The type to be serialized. > */ > public class KryoSerializer<T> extends TypeSerializer<T> { > > private static final long serialVersionUID = 3L; > > private static final Logger LOG = > LoggerFactory.getLogger(KryoSerializer.class); > > // ------------------------------------------------------------------------ > > private final LinkedHashMap<Class<?>, > ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers; > private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> > registeredTypesWithSerializerClasses; > private final LinkedHashMap<Class<?>, > ExecutionConfig.SerializableSerializer<?>> defaultSerializers; > private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> > defaultSerializerClasses; > private final LinkedHashSet<Class<?>> registeredTypes; > > private final Class<T> type; > // ------------------------------------------------------------------------ > // The fields below are lazily initialized after duplication or > deserialization. > > private transient Kryo kryo; > private transient T copyInstance; > // ------------------------------------------------------------------------ > > public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){ > this.type = Preconditions.checkNotNull(type); > > this.defaultSerializers = executionConfig.getDefaultKryoSerializers(); > this.defaultSerializerClasses = > executionConfig.getDefaultKryoSerializerClasses(); > this.registeredTypesWithSerializers = > executionConfig.getRegisteredTypesWithKryoSerializers(); > this.registeredTypesWithSerializerClasses = > executionConfig.getRegisteredTypesWithKryoSerializerClasses(); > this.registeredTypes = executionConfig.getRegisteredKryoTypes(); > } > > /** > * Copy-constructor that does not copy transient fields. They will be > initialized once required. > */ > protected KryoSerializer(KryoSerializer<T> toCopy) { > registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers; > registeredTypesWithSerializerClasses = > toCopy.registeredTypesWithSerializerClasses; > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > registeredTypes = toCopy.registeredTypes; > > type = toCopy.type; > if(type == null){ > throw new NullPointerException("Type class cannot be null."); > } > } > > // ------------------------------------------------------------------------ > > @Override > public boolean isImmutableType() { > return false; > } > > @Override > public KryoSerializer<T> duplicate() { > return new KryoSerializer<T>(this); > } > > @Override > public T createInstance() { > if(Modifier.isAbstract(type.getModifiers()) || > Modifier.isInterface(type.getModifiers()) ) { > return null; > } else { > checkKryoInitialized(); > try { > return kryo.newInstance(type); > } catch(Throwable e) { > return null; > } > } > } > > @SuppressWarnings("unchecked") > @Override > public T copy(T from) { > if (from == null) { > return null; > } > checkKryoInitialized(); > try { > return kryo.copy(from); > } > catch(KryoException ke) { > // kryo was unable to copy it, so we do it through serialization: > ByteArrayOutputStream baout = new ByteArrayOutputStream(); > Output output = new Output(baout); > > kryo.writeObject(output, from); > > output.close(); > > ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray()); > Input input = new Input(bain); > > return (T)kryo.readObject(input, from.getClass()); > } > } > @Override > public T copy(T from, T reuse) { > return copy(from); > } > > @Override > public int getLength() { > return -1; > } > > @Override > public void serialize(T record, DataOutputView target) throws IOException { > checkKryoInitialized(); > DataOutputViewStream outputStream = new DataOutputViewStream(target); > Output output = new Output(outputStream); > > try { > // Sanity check: Make sure that the output is cleared/has been flushed > by the last call > // otherwise data might be written multiple times in case of a > previous EOFException > if (output.position() != 0) { > throw new IllegalStateException("The Kryo Output still contains > data from a previous " + > "serialize call. It has to be flushed or cleared at the end > of the serialize call."); > } > > kryo.writeClassAndObject(output, record); > output.flush(); > } > catch (KryoException ke) { > Throwable cause = ke.getCause(); > if (cause instanceof EOFException) { > throw (EOFException) cause; > } > else { > throw ke; > } > } finally { > try { > output.close(); > } catch (KryoException ke) { > Throwable cause = ke.getCause(); > > if (cause instanceof EOFException) { > throw (EOFException) cause; > } else { > throw ke; > } > } > } > } > > @SuppressWarnings("unchecked") > @Override > public T deserialize(DataInputView source) throws IOException { > checkKryoInitialized(); > DataInputViewStream inputStream = new DataInputViewStream(source); > Input input = new NoFetchingInput(inputStream); > > try { > return (T) kryo.readClassAndObject(input); > } catch (KryoException ke) { > Throwable cause = ke.getCause(); > > if (cause instanceof EOFException) { > throw (EOFException) cause; > } else { > throw ke; > } > } finally { > try { > input.close(); > } catch (KryoException ke) { > Throwable cause = ke.getCause(); > > if (cause instanceof EOFException) { > throw (EOFException) cause; > } else { > throw ke; > } > } > } > } > @Override > public T deserialize(T reuse, DataInputView source) throws IOException { > return deserialize(source); > } > > @Override > public void copy(DataInputView source, DataOutputView target) throws > IOException { > checkKryoInitialized(); > if(this.copyInstance == null){ > this.copyInstance = createInstance(); > } > > T tmp = deserialize(copyInstance, source); > serialize(tmp, target); > } > // > -------------------------------------------------------------------------------------------- > @Override > public int hashCode() { > return Objects.hash( > type, > registeredTypes, > registeredTypesWithSerializerClasses, > defaultSerializerClasses); > } > @Override > public boolean equals(Object obj) { > if (obj instanceof KryoSerializer) { > KryoSerializer<?> other = (KryoSerializer<?>) obj; > > // we cannot include the Serializers here because they don't implement the > equals method > return other.canEqual(this) && > type == other.type && > registeredTypes.equals(other.registeredTypes) && > registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) > && > defaultSerializerClasses.equals(other.defaultSerializerClasses); > } else { > return false; > } > } > > @Override > public boolean canEqual(Object obj) { > return obj instanceof KryoSerializer; > } > > // > -------------------------------------------------------------------------------------------- > > /** > * Returns the Chill Kryo Serializer which is implictly added to the > classpath via flink-runtime. > * Falls back to the default Kryo serializer if it can't be found. > * @return The Kryo serializer instance. > */ > private Kryo getKryoInstance() { > > try { > // check if ScalaKryoInstantiator is in class path (coming from Twitter's > Chill library). > // This will be true if Flink's Scala API is used. > Class<?> chillInstantiatorClazz = > Class.forName("com.twitter.chill.ScalaKryoInstantiator"); > Object chillInstantiator = chillInstantiatorClazz.newInstance(); > > // obtain a Kryo instance through Twitter Chill > Method m = chillInstantiatorClazz.getMethod("newKryo"); > > return (Kryo) m.invoke(chillInstantiator); > } catch (ClassNotFoundException | InstantiationException | > NoSuchMethodException | > IllegalAccessException | InvocationTargetException e) { > > LOG.warn("Falling back to default Kryo serializer because Chill serializer > couldn't be found.", e); > > Kryo.DefaultInstantiatorStrategy initStrategy = new > Kryo.DefaultInstantiatorStrategy(); > initStrategy.setFallbackInstantiatorStrategy(new > StdInstantiatorStrategy()); > > Kryo kryo = new Kryo(); > kryo.setInstantiatorStrategy(initStrategy); > > return kryo; > } > } > > private void checkKryoInitialized() { > if (this.kryo == null) { > this.kryo = getKryoInstance(); > > // Enable reference tracking. > kryo.setReferences(true); > // Throwable and all subclasses should be serialized via java serialization > kryo.addDefaultSerializer(Throwable.class, new JavaSerializer()); > > // Add default serializers first, so that they type registrations without > a serializer > // are registered with a default serializer > for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: > defaultSerializers.entrySet()) { > kryo.addDefaultSerializer(entry.getKey(), > entry.getValue().getSerializer()); > } > > for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: > defaultSerializerClasses.entrySet()) { > kryo.addDefaultSerializer(entry.getKey(), entry.getValue()); > } > > // register the type of our class > kryo.register(type); > > // register given types. we do this first so that any registration of a > // more specific serializer overrides this > for (Class<?> type : registeredTypes) { > kryo.register(type); > } > > // register given serializer classes > for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : > registeredTypesWithSerializerClasses.entrySet()) { > Class<?> typeClass = e.getKey(); > Class<? extends Serializer<?>> serializerClass = e.getValue(); > > Serializer<?> serializer = > ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, > typeClass); > kryo.register(typeClass, serializer); > } > > // register given serializers > for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : > registeredTypesWithSerializers.entrySet()) { > kryo.register(e.getKey(), e.getValue().getSerializer()); > } > // this is needed for Avro but can not be added on demand. > kryo.register(GenericData.Array.class, new > SpecificInstanceCollectionSerializerForArrayList()); > > kryo.setRegistrationRequired(false); > kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); > } > } > > // > -------------------------------------------------------------------------------------------- > // For testing > // > -------------------------------------------------------------------------------------------- > public Kryo getKryo() { > checkKryoInitialized(); > return this.kryo; > } > } > > Best, > Flavio > > On Wed, Jun 8, 2016 at 3:40 PM, Ahmed Nader <ahmednader...@gmail.com> > wrote: > >> Hello, >> I have a TwitterSource and I'm applying some transformations as filter >> and map on the resulting stream from twitter. I'm collecting the output in >> an iterator: iterator = DataStreamUtils.collect(datastream). Then in a >> parallel thread i periodically check if this iterator.hasNext() and print >> the next item. I'm using Flink 1.0.3. >> That program works at the beginning and actually prints some items, >> however when i leave it running for some more time (Like for example after >> 40 seconds or 1 minute) then i get 2 exceptions which are: >> com.esotericsoftware.kryo.KryoException: Encountered unregistered class >> ID and java.lang.IndexOutOfBoundsException: Index: 32, Size: 0. >> These 2 exceptions result from the line where i'm checking if the >> iterator hasNext(). >> >> I wanted to know why do these exceptions happen in general and also if >> anyone knows a specific solution for my program, that would be great too. >> Thanks, >> Ahmed >> > > > > -- > > Flavio Pompermaier > > *Development Department*_______________________________________________ > *OKKAM**Srl **- www.okkam.it <http://www.okkam.it/>* > > *Phone:* +(39) 0461 283 702 > *Fax:* + (39) 0461 186 6433 > *Email:* pomperma...@okkam.it > *Headquarters:* Trento (Italy), via G.B. Trener 8 > *Registered office:* Trento (Italy), via Segantini 23 > > Confidentially notice. This e-mail transmission may contain legally > privileged and/or confidential information. Please do not read it if you > are not the intended recipient(S). Any use, distribution, reproduction or > disclosure by any other person is strictly prohibited. If you have received > this e-mail in error, please notify the sender and destroy the original > transmission and its attachments without reading or saving it in any manner. > >