Hi Arvid, I have implemented the code with envelope schema as you suggested but now I am facing issues with the consumer . I have written code like this:
FlinkKafkaConsumer010 kafkaConsumer010 = new FlinkKafkaConsumer010(KAFKA_TOPICS, new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), properties); And the Deserialization class looks like this : pblic class KafkaGenericAvroDeserializationSchema implements KeyedDeserializationSchema<GenericRecord> { private final String registryUrl; private transient KafkaAvroDeserializer inner; public KafkaGenericAvroDeserializationSchema(String registryUrl) { this.registryUrl = registryUrl; } @Override public GenericRecord deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) { checkInitialized(); return (GenericRecord) inner.deserialize(topic, message); } @Override public boolean isEndOfStream(GenericRecord nextElement) { return false; } @Override public TypeInformation<GenericRecord> getProducedType() { return TypeExtractor.getForClass(GenericRecord.class); } private void checkInitialized() { if (inner == null) { Map<String, Object> props = new HashMap<>(); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); SchemaRegistryClient client = new CachedSchemaRegistryClient( registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); inner = new KafkaAvroDeserializer(client, props); } } } It's working locally on my machine but when I deployed it on yarn cluster I am getting below exception: java.lang.Exception: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks. SourceStreamTask$LegacySourceFunctionThread .checkThrowSourceExecutionException(SourceStreamTask.java:212) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask .performDefaultAction(SourceStreamTask.java:132) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask .java:298) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask .java:403) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks. ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 727) at org.apache.flink.streaming.api.operators. AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java: 705) at org.apache.flink.streaming.api.operators. StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java: 104) at org.apache.flink.streaming.api.operators. StreamSourceContexts$NonTimestampContext.collectWithTimestamp( StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher .emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher .emitRecord(Kafka010Fetcher.java:91) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher .runFetchLoop(Kafka09Fetcher.java:156) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase .run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run( StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run( StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks. SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) Caused by: com.esotericsoftware.kryo.KryoException: Error constructing instance of class: org.apache.avro.Schema$LockableArrayList Serialization trace: types (org.apache.avro.Schema$UnionSchema) schema (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala: 136) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061) at com.esotericsoftware.kryo.serializers.CollectionSerializer.create( CollectionSerializer.java:89) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read( CollectionSerializer.java:93) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read( CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField .java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read( FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField .java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read( FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read( MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read( MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField .java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read( FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField .java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read( FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy( KryoSerializer.java:262) at org.apache.flink.streaming.runtime.tasks. OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635) ... 13 more Caused by: java.lang.IllegalAccessException: Class com.twitter.chill. Instantiators$$anonfun$normalJava$1 can not access a member of class org.apache.avro.Schema$LockableArrayList with modifiers "public" Please help me to resolve this issue. Thanks, Anuj On Mon, Jan 20, 2020 at 9:42 PM aj <ajainje...@gmail.com> wrote: > Thanks, Arvid for all the clarification. I will work on the approach you > suggested. > > Thanks, > Anuj > > On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Anuj, >> >> I think that there may be a fundamental misunderstanding about the role >> of a schema registry in Kafka. So let me first clarify that. >> In each Avro/Parquet file, all records have the same schema. The schema >> is stored within the file, such that we can always retrieve the writer >> schema for the records. >> When Avro was first applied to Kafka, there was the basic question on how >> the writer schema for any record is known to the consumer. Storing the >> complete schema on each record would mean that each record would be much >> larger than needed. Hence, they added the schema registry that assigns a >> unique id to schema, which is then embedded into the records. >> Now, whenever I update a schema in my producer, I would have old records >> with the old schema id and new records with the new schema id. >> In my consumer, I'd use a fixed reader schema, such that my application >> would not need to worry if the record is written with old or new schema; my >> consumer would only see records with the reader schema. >> >> Given that background information, you see that in general, it's >> impossible with a generic approach to write the parquet with the same >> schema as it has been written in Kafka: the parquet schema needs to be >> supplied statically during query compilation while the actual used Avro >> schema in Kafka is only known when actually consuming data. >> >> But looking further down the road: >> * since you need one schema to write the parquet files, you'd need to >> decide: do you want to write with the new or the old schema in case of a >> schema update? That should also be the reader schema of your application >> for a given event type. >> * this decision has further implications: your application need to >> extract exactly one specific version of the schema from the schema registry >> at query compilation. That could be either a specific schema id or the >> latest schema for the event type. >> * that means that the output schema is locked until you restart your >> application and fetch a new latest schema in case of an update. >> * at that point, it might just be easier to use the approach that I >> outlined previously by bundling a specific schema with your application. >> >> If you want to extract the latest schema for a subject: >> >> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000); >> var versions = registryClient.getAllVersions(<subject>); >> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1)); >> >> >> On Sat, Jan 18, 2020 at 5:22 PM aj <ajainje...@gmail.com> wrote: >> >>> Thanks, Arvid. >>> >>> I do not fully understand the above approach, >>> so currently, I am thinking to go with the envelope approach that you >>> suggested. >>> >>> One more question I have if I do not want to keep schema in my consumer >>> project even its a single envelope schema. I want it to be fetched from the >>> schema registry and pass to my parquet-sink so that I always use the same >>> schema that is used by the producer. Can you provide a sample code how can >>> i infer the schema from the generic record or get it from schema registry? >>> >>> >>> Regards, >>> Anuj >>> >>> >>> >>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> wrote: >>> >>>> (Readded user mailing list) >>>> >>>> Hi Anuj, >>>> >>>> since I'd still recommend going with distinct sources/sinks, let me try >>>> to solve your issues in this mail. If that doesn't work out, I'd address >>>> your concerns about the envelope format later. >>>> >>>> In Flink, you can have several subtopologies in the same application. >>>> >>>> Thus, for each event type, you can add >>>> AvroSource(eventType) -> generic transformation/validation -> >>>> AvroSink(eventType) >>>> for each event. >>>> >>>> I'd put all avro schema in one project and use an avro plugin to >>>> generate the respective Java Classes. Then I'd simply create a map of Avro >>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name >>>> (event-a, event-b, ...). >>>> Next, I'd iterate over the list to add the respective subtopologies to >>>> env. >>>> Finally, execute everything. >>>> >>>> You have one project where all validations reside. But you'd have >>>> almost no overhead to process a given source of eventType. The downside of >>>> that approach is of course, that each new event type would require a >>>> redeployment, but that seems like what you'd want to do anyhow. >>>> >>>> Best, >>>> >>>> Arvid >>>> >>>> On Sat, Jan 18, 2020 at 2:08 PM aj <ajainje...@gmail.com> wrote: >>>> >>>>> Thanks, Arvid. >>>>> >>>>> 1. I like your approach as I can write a single consumer and put the >>>>> data in S3 in parquet format. The only challenge is there are extra >>>>> columns >>>>> that always going to be null as at a time I will get one type of event. >>>>> >>>>> 2. if I go with a separate schema I am not sure how I can solve it >>>>> using a single generalize consumer. Till now what my understanding is I >>>>> have to write a consumer for each type of event. Each consumer will read >>>>> the whole data then filter the respective events from this and then I can >>>>> pass this stream to sink. But this does not look scalable solution as the >>>>> new events keep growing then I have to write a consumer for each new type. >>>>> >>>>> >>>>> DataStreamSource<GenericRecord> input = env >>>>> .addSource( >>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>> new >>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>> config).setStartFromEarliest()); >>>>> >>>>> Example : >>>>> >>>>> * 1st Consumer:* >>>>> DataStreamSource<GenericRecord> input = >>>>> env.addSource( >>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>> new >>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>> config).setStartFromEarliest()); >>>>> * DataStream<GenericRecord> aInput = >>>>> input.filter("event_name"= "a")* >>>>> >>>>> * 2nd Consumer:* >>>>> DataStreamSource<GenericRecord> input = env.addSource( >>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>> new >>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>> config).setStartFromEarliest()); >>>>> * DataStream<GenericRecord> bInput = >>>>> input.filter("event_name"= "b")* >>>>> >>>>> >>>>> Can you help me How I solve this using a single consumer as I do not >>>>> want to write a separate consumer for each type of schema? >>>>> >>>>> For example, this is my consumer that contains different types of >>>>> records. >>>>> >>>>> DataStreamSource<GenericRecord> input = env >>>>> .addSource( >>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>> new >>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>> config).setStartFromEarliest()); >>>>> >>>>> Now I can not write this stream directly as there is no common schema >>>>> of records in this stream. So possible way I am thinking is >>>>> >>>>> 1. Can I create multiple streams from this stream using the key by on >>>>> *"event_name" >>>>> *and then write each stream separately. >>>>> >>>>> Just wanna know is this possible ?? >>>>> >>>>> >>>>> Thanks, >>>>> Anuj >>>>> >>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hi Anuj, >>>>>> >>>>>> I originally understood that you would like to store data in the same >>>>>> Kafka topic and also want to store it in the same parquet file. In the >>>>>> past, I mostly used schema registry with Kafka Streams, where you could >>>>>> only store a schema for a key and value respectively. To use different >>>>>> record types in the same kafka topic, you had to disable schema >>>>>> compatibility checks and just stored the schemas as different versions >>>>>> under the same subject. >>>>>> >>>>>> Your approach is much better. You can ensure full schema >>>>>> compatibility. Nevertheless, it still shares the same drawback that >>>>>> consumption is much harder (using GenericRecord is proof of that) if you >>>>>> want to read/write everything into the same place. Also you will never be >>>>>> able to write one consistent file, as they can only have one schema (both >>>>>> on Avro and Parquet). >>>>>> So you only have two options: >>>>>> * keep schemas separated, but then you also need to write separate >>>>>> files per record type. >>>>>> * have a common schema (either my outlined approach or any other >>>>>> wrapper schema). >>>>>> The approach with a common schema makes only sense if you want to >>>>>> write it into one table/kafka topic. >>>>>> >>>>>> However, in the last mail you pointed out that you actually want to >>>>>> store the record types separately. Then, you should keep everything >>>>>> separated. Then you should have a sink for each type each getting the >>>>>> respective schema. Note that you'd need to fetch the schema manually from >>>>>> the schema registry when creating the query as you would need to pass it >>>>>> to >>>>>> the sink. >>>>>> >>>>>> Btw, do you actually have a need to write all events into one Kafka >>>>>> topic? The only real use case is to preserve the time order per key. >>>>>> Everything else is much more complicated then storing events >>>>>> individually. >>>>>> >>>>>> Best, >>>>>> >>>>>> Arvid >>>>>> >>>>>> >>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <ajainje...@gmail.com> wrote: >>>>>> >>>>>>> Hi Arvid, >>>>>>> Thanks for the quick response. I am new to this Avro design so can >>>>>>> you please help me understand and design for my use case. >>>>>>> >>>>>>> I have use case like this : >>>>>>> >>>>>>> 1. we have an app where a lot of action happened from the user side. >>>>>>> 2. for each action we collect some set of information that >>>>>>> defined using some key-value pairs. This information we want to define >>>>>>> as >>>>>>> proper schemas so that we maintain the proper format and not push random >>>>>>> data. >>>>>>> 3. So we are defining for each action a schema and register in the >>>>>>> schema registry using topic+record.name as the subject . >>>>>>> 4. So I do not think the producer side has any issue as whenever we >>>>>>> push the event to Kafka we register a new schema with the above subject. >>>>>>> >>>>>>> Example : >>>>>>> >>>>>>> { >>>>>>> event_name : "a" >>>>>>> "timestamp": >>>>>>> "properties" :[ >>>>>>> "key-1 : "val-1" >>>>>>> "key-2 : "val-2" >>>>>>> ] >>>>>>> } >>>>>>> >>>>>>> { >>>>>>> event_name : "b" >>>>>>> "timestamp": >>>>>>> "properties" :[ >>>>>>> "key-3 : "val-3" >>>>>>> "key-4 : "val-4" >>>>>>> ] >>>>>>> } >>>>>>> >>>>>>> Now I have a consumer that will parse the data by fetching the >>>>>>> schema from schema registry and deserialize in the generic record >>>>>>> streams. >>>>>>> >>>>>>> Why you think it will break as I am always deserializing with writer >>>>>>> schema only. >>>>>>> >>>>>>> As you suggested to keep an envelope Avro schema and not separate >>>>>>> schema for each type of event that I am generating. I have some doubts >>>>>>> about that: >>>>>>> >>>>>>> 1. How I enforce a schema on each event as it subtypes in the main >>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and >>>>>>> convert it to subschema type of "a" and push to Kafka. >>>>>>> 2. I want to create a separate hive table for each of the events so >>>>>>> when I write this data and lets says I have 20 events than for 19 >>>>>>> columns I >>>>>>> am getting null values always in data. >>>>>>> >>>>>>> Please help me in doing this right way. It will be a great help and >>>>>>> learning for me. >>>>>>> >>>>>>> Thanks, >>>>>>> Anuj >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Anuj, >>>>>>>> >>>>>>>> you should always avoid having records with different schemas in >>>>>>>> the same topic/dataset. You will break the compatibility features of >>>>>>>> the >>>>>>>> schema registry and your consumer/producer code is always hard to >>>>>>>> maintain. >>>>>>>> >>>>>>>> A common and scalable way to avoid it is to use some kind of >>>>>>>> envelope format. >>>>>>>> >>>>>>>> { >>>>>>>> "namespace": "example", >>>>>>>> "name": "Envelope", >>>>>>>> "type": "record", >>>>>>>> "fields": [ >>>>>>>> { >>>>>>>> "name": "type1", >>>>>>>> "type": ["null", { >>>>>>>> "type": "record", >>>>>>>> "fields": [ ... ] >>>>>>>> }], >>>>>>>> "default": null >>>>>>>> }, >>>>>>>> { >>>>>>>> "name": "type2", >>>>>>>> "type": ["null", { >>>>>>>> "type": "record", >>>>>>>> "fields": [ ... ] >>>>>>>> }], >>>>>>>> "default": null >>>>>>>> } >>>>>>>> ] >>>>>>>> } >>>>>>>> >>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped >>>>>>>> types, which by themselves can be evolved), and adds only a little >>>>>>>> overhead >>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that >>>>>>>> exactly >>>>>>>> one of the subtypes is set. >>>>>>>> >>>>>>>> This schema is fully compatible with the schema registry, so no >>>>>>>> need to parse anything manually. >>>>>>>> >>>>>>>> This schema can easily be used with Parquet. If you can't change >>>>>>>> the input format anymore, you can at least use that approach on your >>>>>>>> output. >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Arvid >>>>>>>> >>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <ajainje...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi All, >>>>>>>>> >>>>>>>>> I have a use case where I am getting a different set of Avro >>>>>>>>> records in Kafka. I am using the schema registry to store Avro >>>>>>>>> schema. One >>>>>>>>> topic can also have different types of records. >>>>>>>>> >>>>>>>>> Now I have created a GenericRecord Stream using >>>>>>>>> kafkaAvroDeseralizer by defining custom >>>>>>>>> Deserializer class like this >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public GenericRecord deserialize( >>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition, >>>>>>>>> long offset) { >>>>>>>>> checkInitialized(); >>>>>>>>> return (GenericRecord) inner.deserialize(topic, message); >>>>>>>>> } >>>>>>>>> >>>>>>>>> private void checkInitialized() { >>>>>>>>> if (inner == null) { >>>>>>>>> Map<String, Object> props = new HashMap<>(); >>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >>>>>>>>> registryUrl); >>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, >>>>>>>>> false); >>>>>>>>> SchemaRegistryClient client = >>>>>>>>> new CachedSchemaRegistryClient( >>>>>>>>> registryUrl, >>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); >>>>>>>>> inner = new KafkaAvroDeserializer(client, props); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> And this is my consumer code : >>>>>>>>> >>>>>>>>> DataStreamSource<GenericRecord> input = env >>>>>>>>> .addSource( >>>>>>>>> new FlinkKafkaConsumer010<GenericRecord>(topics, >>>>>>>>> new >>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl), >>>>>>>>> config).setStartFromEarliest()); >>>>>>>>> >>>>>>>>> Now I want to write this stream partition on >>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I >>>>>>>>> can expose hive tables directly on top of this data. >>>>>>>>> event_name is common field for all types of records that I am >>>>>>>>> getting in Kafka. >>>>>>>>> I am stuck as parquet writer needs a schema to write but my >>>>>>>>> different records have different schemas So how do I write this >>>>>>>>> stream in >>>>>>>>> s3 in above partition format. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks & Regards, >>>>>>>>> Anuj Jain >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks & Regards, >>>>>>> Anuj Jain >>>>>>> Mob. : +91- 8588817877 >>>>>>> Skype : anuj.jain07 >>>>>>> <http://www.oracle.com/> >>>>>>> >>>>>>> >>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Thanks & Regards, >>>>> Anuj Jain >>>>> Mob. : +91- 8588817877 >>>>> Skype : anuj.jain07 >>>>> <http://www.oracle.com/> >>>>> >>>>> >>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>>>> >>>> >>> >>> -- >>> Thanks & Regards, >>> Anuj Jain >>> Mob. : +91- 8588817877 >>> Skype : anuj.jain07 >>> <http://www.oracle.com/> >>> >>> >>> <http://www.cse.iitm.ac.in/%7Eanujjain/> >>> >> > > -- > Thanks & Regards, > Anuj Jain > Mob. : +91- 8588817877 > Skype : anuj.jain07 > <http://www.oracle.com/> > > > <http://www.cse.iitm.ac.in/%7Eanujjain/> > -- Thanks & Regards, Anuj Jain Mob. : +91- 8588817877 Skype : anuj.jain07 <http://www.oracle.com/> <http://www.cse.iitm.ac.in/%7Eanujjain/>