[ 
https://issues.apache.org/jira/browse/AVRO-3856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17769421#comment-17769421
 ] 

Edmond Wong edited comment on AVRO-3856 at 9/27/23 5:12 AM:
------------------------------------------------------------

I do not think my pull request: [https://github.com/apache/avro/pull/2519] to 
Apache Avro project will be approved anytime soon.  In the meanwhile, I found 
out I don't the information in the deepest structure in the Avro schema I got.  
 For the workaround, I have removed the deepest structure from the schema , 
regenerate the SpecificRecord classes, and I have use a custom deserializer to 
use the minimized schema.  Since I no longer can use the original schema stored 
in schema register without Apache Avro schema parse failing,  I will have to 
maintain the minimized schema and make sure it is compatible with the original 
schema.     

 

Here is the custom deserializer:
{code:java}
using System.Diagnostics;
using System.Net;
using Avro.Generic;
using Avro.IO;
using Avro.Specific;
using Confluent.Kafka;

namespace EntitlementConsumer;

public class MinAvroDeserializer<T> : IAsyncDeserializer<T>
{
    private readonly string _minSchema;    
    private readonly Dictionary<int, DatumReader<T>?> _datumReaderBySchemaId = 
new();
    private readonly SemaphoreSlim _deserializeMutex = new(1);
    
    private Avro.Schema ReaderSchema { get; }
    
    public  MinAvroDeserializer(string minSchema)
    {
        _minSchema = minSchema;
        
        if (typeof (ISpecificRecord).IsAssignableFrom(typeof (T)))
            ReaderSchema = ((ISpecificRecord) 
Activator.CreateInstance<T>()).Schema;
        else if (typeof (T) == typeof (int))
            ReaderSchema = Avro.Schema.Parse("int");
        else if (typeof (T) == typeof (bool))
            ReaderSchema = Avro.Schema.Parse("boolean");
        else if (typeof (T) == typeof (double))
            ReaderSchema = Avro.Schema.Parse("double");
        else if (typeof (T) == typeof (string))
            ReaderSchema = Avro.Schema.Parse("string");
        else if (typeof (T) == typeof (float))
            ReaderSchema = Avro.Schema.Parse("float");
        else if (typeof (T) == typeof (long))
            ReaderSchema = Avro.Schema.Parse("long");
        else if (typeof (T) == typeof (byte[]))
        {
            ReaderSchema = Avro.Schema.Parse("bytes");
        }
        else
        {
            if (typeof (T) != typeof (Null))
                throw new InvalidOperationException("AvroDeserializer only 
accepts type parameters of int, bool, double, string, float, long, byte[], 
instances of ISpecificRecord and subclasses of SpecificFixed.");
            ReaderSchema = Avro.Schema.Parse("null");
        }
    }
    
    public async Task<T> DeserializeAsync(
        ReadOnlyMemory<byte> data,
        bool isNull,
        SerializationContext context)
    {
        try
        {
            if (isNull)
            {
                if (default (T) == null)
                    return default (T) ?? throw new InvalidOperationException();
                throw new InvalidOperationException("Cannot deserialize null to 
a Value Type");
            }            T obj;
            if (isNull)
                obj = default (T) ?? throw new InvalidOperationException();
            else
                obj = await Deserialize(context.Topic, 
data.ToArray()).ConfigureAwait(false);
            return obj;
        }
        catch (AggregateException ex)
        {
            Debug.Assert(ex.InnerException != null, "ex.InnerException != 
null");
            throw ex.InnerException;
        }
    }    private async Task<T> Deserialize(string topic, byte[] array)
    {
        GenericRecord genericRecord;
        try
        {
            if (array.Length < 5)
                throw new InvalidDataException(
                    $"Expecting data framing of length 5 bytes or more but 
total data size is {(object)array.Length} bytes");
            
            using var stream = new MemoryStream(array);
            using var reader = new BinaryReader(stream);
            var writerId = reader.ReadByte() == 0 ? 
IPAddress.NetworkToHostOrder(reader.ReadInt32()) : throw new 
InvalidDataException(
                $"Expecting data with Confluent Schema Registry framing. Magic 
byte was {(object)array[0]}, expecting {(object)(byte)0}");            await 
_deserializeMutex.WaitAsync().ConfigureAwait(false);
            DatumReader<T>? datumReader;
            try
            {
                _datumReaderBySchemaId.TryGetValue(writerId, out datumReader);
                if (datumReader == null)
                {
                    datumReader = new 
SpecificReader<T>(Avro.Schema.Parse(_minSchema), this.ReaderSchema);
                    _datumReaderBySchemaId[writerId] = datumReader;
                }
            }
            finally
            {
                _deserializeMutex.Release();
            }
            if (!typeof (ISpecificRecord).IsAssignableFrom(typeof (T)))
                return datumReader.Read(default, new BinaryDecoder(stream));
            var instance = Activator.CreateInstance<T>();
            return datumReader.Read(instance, new BinaryDecoder(stream));
        }
        catch (AggregateException ex)
        {
            Debug.Assert(ex.InnerException != null, "ex.InnerException != 
null");
            throw ex.InnerException;
        }
    }
} {code}


was (Author: tradercentric):
I do not think my pull request: [https://github.com/apache/avro/pull/2519] to 
Apache Avro project will be approved anytime soon.  In the meanwhile, I found 
out I don't the information in the deepest structure in the Avro schema I got.  
 For the workaround, I have removed the deepest structure from the schema , 
regenerate the SpecificRecord classes, and I have use a custom deserializer to 
use the minimized schema.  Since I no longer can use the original schema stored 
in schema register without Apache Avro schema parse failing,  I will have to 
maintain the minimized schema and make sure it is compatible with the original 
schema.      Here is the custom deserializer:
{code:java}
using System.Diagnostics;
using System.Net;
using Avro.Generic;
using Avro.IO;
using Avro.Specific;
using Confluent.Kafka;

namespace EntitlementConsumer;

public class MinAvroDeserializer<T> : IAsyncDeserializer<T>
{
    private readonly string _minSchema;    
    private readonly Dictionary<int, DatumReader<T>?> _datumReaderBySchemaId = 
new();
    private readonly SemaphoreSlim _deserializeMutex = new(1);
    
    private Avro.Schema ReaderSchema { get; }
    
    public  MinAvroDeserializer(string minSchema)
    {
        _minSchema = minSchema;
        
        if (typeof (ISpecificRecord).IsAssignableFrom(typeof (T)))
            ReaderSchema = ((ISpecificRecord) 
Activator.CreateInstance<T>()).Schema;
        else if (typeof (T) == typeof (int))
            ReaderSchema = Avro.Schema.Parse("int");
        else if (typeof (T) == typeof (bool))
            ReaderSchema = Avro.Schema.Parse("boolean");
        else if (typeof (T) == typeof (double))
            ReaderSchema = Avro.Schema.Parse("double");
        else if (typeof (T) == typeof (string))
            ReaderSchema = Avro.Schema.Parse("string");
        else if (typeof (T) == typeof (float))
            ReaderSchema = Avro.Schema.Parse("float");
        else if (typeof (T) == typeof (long))
            ReaderSchema = Avro.Schema.Parse("long");
        else if (typeof (T) == typeof (byte[]))
        {
            ReaderSchema = Avro.Schema.Parse("bytes");
        }
        else
        {
            if (typeof (T) != typeof (Null))
                throw new InvalidOperationException("AvroDeserializer only 
accepts type parameters of int, bool, double, string, float, long, byte[], 
instances of ISpecificRecord and subclasses of SpecificFixed.");
            ReaderSchema = Avro.Schema.Parse("null");
        }
    }
    
    public async Task<T> DeserializeAsync(
        ReadOnlyMemory<byte> data,
        bool isNull,
        SerializationContext context)
    {
        try
        {
            if (isNull)
            {
                if (default (T) == null)
                    return default (T) ?? throw new InvalidOperationException();
                throw new InvalidOperationException("Cannot deserialize null to 
a Value Type");
            }            T obj;
            if (isNull)
                obj = default (T) ?? throw new InvalidOperationException();
            else
                obj = await Deserialize(context.Topic, 
data.ToArray()).ConfigureAwait(false);
            return obj;
        }
        catch (AggregateException ex)
        {
            Debug.Assert(ex.InnerException != null, "ex.InnerException != 
null");
            throw ex.InnerException;
        }
    }    private async Task<T> Deserialize(string topic, byte[] array)
    {
        GenericRecord genericRecord;
        try
        {
            if (array.Length < 5)
                throw new InvalidDataException(
                    $"Expecting data framing of length 5 bytes or more but 
total data size is {(object)array.Length} bytes");
            
            using var stream = new MemoryStream(array);
            using var reader = new BinaryReader(stream);
            var writerId = reader.ReadByte() == 0 ? 
IPAddress.NetworkToHostOrder(reader.ReadInt32()) : throw new 
InvalidDataException(
                $"Expecting data with Confluent Schema Registry framing. Magic 
byte was {(object)array[0]}, expecting {(object)(byte)0}");            await 
_deserializeMutex.WaitAsync().ConfigureAwait(false);
            DatumReader<T>? datumReader;
            try
            {
                _datumReaderBySchemaId.TryGetValue(writerId, out datumReader);
                if (datumReader == null)
                {
                    datumReader = new 
SpecificReader<T>(Avro.Schema.Parse(_minSchema), this.ReaderSchema);
                    _datumReaderBySchemaId[writerId] = datumReader;
                }
            }
            finally
            {
                _deserializeMutex.Release();
            }
            if (!typeof (ISpecificRecord).IsAssignableFrom(typeof (T)))
                return datumReader.Read(default, new BinaryDecoder(stream));
            var instance = Activator.CreateInstance<T>();
            return datumReader.Read(instance, new BinaryDecoder(stream));
        }
        catch (AggregateException ex)
        {
            Debug.Assert(ex.InnerException != null, "ex.InnerException != 
null");
            throw ex.InnerException;
        }
    }
} {code}

> Cannot customize MaxDepth when parsing Avro schema in C#
> --------------------------------------------------------
>
>                 Key: AVRO-3856
>                 URL: https://issues.apache.org/jira/browse/AVRO-3856
>             Project: Apache Avro
>          Issue Type: Bug
>          Components: csharp
>    Affects Versions: 1.11.2
>            Reporter: Emanuele Sabellico
>            Priority: Minor
>              Labels: pull-request-available
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> When parsing the Avro Schema with Newtonsoft.Json max depth is limited to 64, 
> the default. It cannot be customized because it's using JObject.Parse that 
> doesn't allow to customize it.
> Using JsonConvert can allow to change default MaxDepth
> {code:c#}
>                 JsonConvert.DefaultSettings = () =>
>                 {
>                     return new JsonSerializerSettings
>                     {
>                         MaxDepth = 100
>                     };
>                 };
>                 var schema = File.ReadAllText("schema.avsc");
>                 var json = JsonConvert.DeserializeObject<JObject>(schema);
> {code}
> [https://github.com/apache/avro/blob/41b3c08ca5da192786c2b08546e691b3126e1856/lang/csharp/src/apache/main/Schema/Schema.cs#L250]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to