[ 
https://issues.apache.org/jira/browse/BEAM-13705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jordan Moore updated BEAM-13705:
--------------------------------
    Resolution: Not A Bug
        Status: Resolved  (was: Triage Needed)

It's an error in Confluent library. 

This alone throws OOM 

{code}
    final CachedSchemaRegistryClient client = new 
CachedSchemaRegistryClient("http://localhost:8081";, 2147483647, ImmutableMap.of(
        KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true
    ));
{code}

> OOM / memory-leak in KafkaIO.read using Confluent KafkaAvroDeserializer with 
> SpecificRecord 
> --------------------------------------------------------------------------------------------
>
>                 Key: BEAM-13705
>                 URL: https://issues.apache.org/jira/browse/BEAM-13705
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.35.0
>            Reporter: Jordan Moore
>            Priority: P3
>         Attachments: Screen Shot 2022-01-19 at 5.30.51 PM.png
>
>
> *Details* - Trying to use a generated Avro SpecificRecord subclass with 
> KafkaIO.read (I was able to use KafkaIO.write fine with it). 
> *Problem* - OOM happens while constructing the deserializer with 
> SpecificRecord, but not GenericRecord. I am unable to use my generated class 
> because I get errors saying it cannot be cast to a GenericRecord (even though 
> it extends/implements it though a chain of other classes)
> {code}
> 2022-01-19 17:17:47,163 DEBUG [main] options.PipelineOptionsFactory$Builder 
> (PipelineOptionsFactory.java:325) - Provided Arguments: {}
> 2022-01-19 17:17:47,345 DEBUG [main] sdk.Pipeline (Pipeline.java:158) - 
> Creating Pipeline#817686795
> 2022-01-19 17:17:47,382 DEBUG [main] sdk.Pipeline (Pipeline.java:544) - 
> Adding KafkaIO.Read [KafkaIO.TypedWithoutMetadata] to Pipeline#817686795
> 2022-01-19 17:17:47,383 DEBUG [main] sdk.Pipeline (Pipeline.java:544) - 
> Adding KafkaIO.Read to Pipeline#817686795
> 2022-01-19 17:17:47,445 DEBUG [main] coders.CoderRegistry 
> (CoderRegistry.java:635) - Coder for [B: ByteArrayCoder
> java.lang.OutOfMemoryError: Java heap space
> Dumping heap to /tmp/beam-dump ...
> Heap dump file created [1086964638 bytes in 1.315 secs]
> Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
>       at 
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$HashEntry.newArray(BoundedConcurrentHashMap.java:247)
>       at 
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap$Segment.<init>(BoundedConcurrentHashMap.java:1200)
>       at 
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1637)
>       at 
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1670)
>       at 
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1654)
>       at 
> io.confluent.kafka.schemaregistry.utils.BoundedConcurrentHashMap.<init>(BoundedConcurrentHashMap.java:1683)
>       at 
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:181)
>       at 
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:170)
>       at 
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:136)
>       at 
> io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.<init>(CachedSchemaRegistryClient.java:98)
>       at 
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.lambda$of$282520f2$1(ConfluentSchemaRegistryDeserializerProvider.java:93)
>       at 
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider$$Lambda$70/1932332324.apply(Unknown
>  Source)
>       at 
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaRegistryClient(ConfluentSchemaRegistryDeserializerProvider.java:134)
>       at 
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getSchemaMetadata(ConfluentSchemaRegistryDeserializerProvider.java:126)
>       at 
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getAvroSchema(ConfluentSchemaRegistryDeserializerProvider.java:120)
>       at 
> org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider.getCoder(ConfluentSchemaRegistryDeserializerProvider.java:116)
>       at 
> org.apache.beam.sdk.io.kafka.KafkaIO$Read.getValueCoder(KafkaIO.java:1476)
>       at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:1256)
>       at org.apache.beam.sdk.io.kafka.KafkaIO$Read.expand(KafkaIO.java:605)
>       at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
>       at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
>       at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
>       at 
> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1555)
>       at 
> org.apache.beam.sdk.io.kafka.KafkaIO$TypedWithoutMetadata.expand(KafkaIO.java:1529)
>       at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
>       at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
>       at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
>       at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177)
>       at cricket.jmoore.jmx.Main.main(Main.java:98)
> {code}
> Small example with Kafka and Confluent Schema Registry locally
> {code}
>   public static void main(String[] args) throws Exception {
>     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
> //    Pipeline p = getWritePipeline(options);
>     Pipeline p = Pipeline.create(options);
>     final String topic = "foobar-2";
>     final SubjectNameStrategy subjectStrategy = new TopicNameStrategy();
>     final String valueSubject = subjectStrategy.subjectName(topic, false, 
> null); // schema not used
>     final ConfluentSchemaRegistryDeserializerProvider<SpecificRecord> 
> valueProvider =
>         
> ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081";, 
> valueSubject, null,
>                                                        // TODO: This doesn't 
> seem to work to get the SpecificRecord subclass in the apply function below
>                                                        
> ImmutableMap.of(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, 
> true));
>     p
>         .apply(KafkaIO.<byte[], SpecificRecord>read()
>                    .withBootstrapServers("localhost:9092")
>                    .withTopic(topic)
>                    .withKeyDeserializer(ByteArrayDeserializer.class) // Don't 
> have any keys, but this is required
>                    .withValueDeserializer(valueProvider)
>                    .withConsumerConfigUpdates(ImmutableMap.of(
>                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
> OffsetResetStrategy.EARLIEST.name().toLowerCase(Locale.ROOT),
>                        ConsumerConfig.GROUP_ID_CONFIG, "beam-" + 
> UUID.randomUUID()
>                    ))
>                    .withoutMetadata()
>         ).apply(Values.create())
>         // TODO: How to get SpecificRecord subclass?
>         .apply(MapElements.via(new SimpleFunction<SpecificRecord, Void>() {
>           @Override
>           public Void apply(SpecificRecord input) {
>             log.info("{}", input);
>             return null;
>           }
>         }));
>     p.run().waitUntilFinish();
>   }
> {code}
> Avro schema that I am using, which generates a class Product.java that I 
> would like to use in-place of SpecificRecord above. 
> {code}
> {"type":"record","name":"Product","namespace":"cricket.jmoore.avro","fields":[{"name":"name","type":"string"}]}
> {code}
> *Beam Version*: 2.35.0
> *Confluent Version*: 7.0.1 (error seems to come from here... will try to 
> downgrade this)
> Dependencies: 
> {code}
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka-clients</artifactId>
>             <version>2.8.1</version>
>         </dependency>
>         <dependency>
>             <groupId>io.confluent</groupId>
>             <artifactId>kafka-avro-serializer</artifactId>
>             <version>7.0.1</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.beam</groupId>
>             <artifactId>beam-runners-direct-java</artifactId>
>             <version>${beam.version}</version> <!-- 2.35.0 -->
>             <scope>provided</scope>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.beam</groupId>
>             <artifactId>beam-sdks-java-core</artifactId>
>             <version>${beam.version}</version>
>         </dependency>
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to