Hi Geoffroy,

I put up a PR https://github.com/apache/incubator-pulsar/pull/2004 for
addressing the issue. If you can take a look and see if that approach
addresses your problem.

- Sijie

On Fri, Jun 8, 2018 at 8:32 AM Sijie Guo <guosi...@gmail.com> wrote:

> Hi Geoffroy,
>
> I created an issue for
> https://github.com/apache/incubator-pulsar/issues/1943.
>
> We will address it for the upcoming 2.1 release.
>
> - Sijie
>
> On Fri, Jun 8, 2018 at 6:39 AM Geoffroy Fouquier <
> geoffroy.fouqu...@exensa.com> wrote:
>
>> On 08/06/2018 12:01, Sijie Guo wrote:
>> > Geoffroy,
>> >
>> > Thank you for reporting this. The change was made to evolve pulsar into
>> a
>> > type-safe client with schema and more validations. Along with the idea,
>> > we changed the client to become a more fluent style builder and hide the
>> > configuration data as an implementation detail to the pulsar client
>> > implementation.
>> >
>> > However we missed the use cases in data computing world, people tends to
>> > use java Serializable on sharing configuration.
>> >
>> > We would incorporate your use cases and try to bring the configuration
>> > object back.
>> >
>> > - Sijie
>>
>> That would be great, thanks!
>>
>> >
>> >
>> > On Fri, Jun 8, 2018 at 1:09 AM Geoffroy Fouquier <
>> > geoffroy.fouqu...@exensa.com> wrote:
>> >
>> >> We are using pulsar 1.22 within a spark framework and I am currently
>> >> upgrading my cluster to pulsar 2.0. One of the main change concerns the
>> >> configuration classes, replace by builders. Although ProducerBuilder
>> and
>> >> ConsumerBuilder interfaces implement Serializable, corresponding
>> >> implementations aren't, because a pulsar client object is embedded. The
>> >> schema object is also a problem for serialization.
>> >>
>> >>
>> >> Basically, with 1.22 my producer app. instanciate a client and a
>> >> producer on each executor and only configurations classes need to be
>> >> serialized
>> >>
>> >> ---8<---
>> >>
>> >> WarcRecordExtractor
>> >>           .load(sc, input)
>> >>           .foreachPartition{ ite =>
>> >>
>> >>               val client = PulsarClient.create(ServiceUrl, clientConf)
>> >>               val producer = client.createProducer(topic, producerConf)
>> >>               ite.foreach{ x => producer.sendAsync(x._2.content) }
>> >>               producer.close()
>> >>
>> >>               client.close()
>> >>
>> >>           }
>> >>
>> >> ---8<---
>> >>
>> >>
>> >> With 2.0 I try to replace the configuration classes with the
>> >> corresponding builders, but this setting doesn't work since the
>> >> producerBuilder isn't serializable :
>> >>
>> >> ---8<---
>> >>
>> >>               [...]
>> >>
>> >>               val client = clientBuilder.serviceUrl(serviceUrl).build()
>> >>               val producer = producerBuilder.topic(topicName).create
>> >>
>> >>               [...]
>> >>
>> >> ---8<---
>> >>
>> >>
>> >> My actual workaround is to rewrite my own ProducerBuilderImpl without
>> >> pulsar client or schemas (the createAsync method still exists but throw
>> >> an exception). And then to instanciate a producer like this :
>> >>
>> >> ---8<---
>> >>
>> >>               [...]
>> >>
>> >>               val client : PulsarClientImpl =
>> >> clientBuilder.serviceUrl(broker).build().asInstanceOf[PulsarClientImpl]
>> >>               val producer =
>> >> client.createProducerAsync(producerBuilder.topic(topic).getConf(),
>> >> org.apache.pulsar.client.api.Schema.BYTES).get()
>> >>
>> >>               [...]
>> >>
>> >> ---8<---
>> >>
>> >> where producerBuilder is my own implementation which implements
>> >> ProducerBuilder interface.
>> >>
>> >>
>> >> I did the same thing for ConsumerBuilderImpl and also rewrite
>> >> SparkStreamingPulsarReceiver with the same kind of workaround. I don't
>> >> know why the configuration classes have been replaced by builders, but
>> >> it became more difficult to use it with spark (and probably with other
>> >> distributed framework), but maybe i did it wrong.
>> >>
>> >>
>> >>
>> >>
>>
>>

Reply via email to