Hi,

what is the exact error message you are getting?

Best,

Dawid


On 10/10/18 11:51, 远远 wrote:
> invoke FlinkKafkaProducer011 constructor in scala:
> val producer = new 
> FlinkKafkaProducer011[PVEvent.Entity](appConf.getPvEventTopic, new 
> PvEventSerializeSchema, producerProps, 
> Optional.of(FlinkRebalancePartitioner[PVEvent.Entity]))
> and the constructor is :
>
> /** * Creates a FlinkKafkaProducer for a given topic. The sink
> produces its input to * the topic. It accepts a keyed {@link
> KeyedSerializationSchema} and possibly a custom {@link
> FlinkKafkaPartitioner}. * * <p>If a partitioner is not provided,
> written records will be partitioned by the attached key of each *
> record (as determined by {@link
> KeyedSerializationSchema#serializeKey(Object)}). If written records do
> not * have a key (i.e., {@link
> KeyedSerializationSchema#serializeKey(Object)} returns {@code null}),
> they * will be distributed to Kafka partitions in a round-robin
> fashion. * * @param defaultTopicId The default topic to write data to
> * @param serializationSchema A serializable serialization schema for
> turning user objects into a kafka-consumable byte[] supporting
> key/value messages * @param producerConfig Configuration properties
> for the KafkaProducer. 'bootstrap.servers.' is the only required
> argument. * @param customPartitioner A serializable partitioner for
> assigning messages to Kafka partitions. * If a partitioner is not
> provided, records will be partitioned by the key of each record *
> (determined by {@link KeyedSerializationSchema#serializeKey(Object)}).
> If the keys * are {@code null}, then records will be distributed to
> Kafka partitions in a * round-robin fashion. */ public
> FlinkKafkaProducer011(
>       String defaultTopicId, KeyedSerializationSchema<IN> 
> serializationSchema, Properties producerConfig, 
> Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
>    this(
>       defaultTopicId, serializationSchema, producerConfig, customPartitioner, 
> Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); }
> but cannot complie pass, and IDEA show ''cannot resolve constructor"
> error. 
> and i invoke other constructor that without java8 Optional params, it 
> will no
> error。  because of java8 Optional param?what should i do?
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to