I am processing data and then sending it to kafka by kafka sink .
this is method where I am producing the data
nudgeDetailsDataStream.keyBy(NudgeDetails::getCarSugarID).addSink(NudgeCarLevelProducer.getProducer(config))
.name("nudge-details-producer")
.uid("nudge-details-producer");
its my producer
public class NudgeCarLevelProducer {
static Logger logger = LoggerFactory.getLogger(PeakLocationFinder.class);
public static FlinkKafkaProducer010<NudgeDetails>
getProducer(PeakLocationFinderGlobalConfig config) {
return new FlinkKafkaProducer010(config.getFabricIncentiveTopic(),
new NudgeCarLevelSchema(config),
FlinkKafkaProducerBase.getPropertiesFromBrokerList(config.getInstrumentationBrokers()));
}
}
class NudgeCarLevelSchema implements SerializationSchema<NudgeDetails>
{
Logger logger = LoggerFactory.getLogger(NudgeCarLevelSchema.class);
ObjectMapper mapper = new ObjectMapper();
PeakLocationFinderGlobalConfig config;
public NudgeCarLevelSchema(PeakLocationFinderGlobalConfig config)
{
this.config = config;
}
@Override
public byte[] serialize(NudgeDetails element) {
byte [] bytes = null;
Document document = new Document();
document.setId(UUID.randomUUID().toString());
Metadata metadata = new Metadata();
metadata.setSchema(config.getFabricCarLevelDataStream());
metadata.setSchemaVersion(1);
metadata.setTenant(config.getTenantId());
metadata.setTimestamp(System.currentTimeMillis());
metadata.setType(Type.EVENT);
metadata.setSender("nudge");
metadata.setStream(config.getFabricCarLevelDataStream());
document.setMetadata(metadata);
document.setData(element);
try {
bytes = mapper.writeValueAsString(document).getBytes();
} catch (Exception e) {
logger.error("error while serializing nudge car level Schema");
}
return bytes;
}
}
On Mon, Sep 24, 2018 at 12:24 PM miki haiat <[email protected]> wrote:
> What are you trying to do , can you share some code ?
> This is the reason for the exeption
> Proceeding to force close the producer since pending requests could not be
> completed within timeout 9223372036854775807 ms.
>
>
>
> On Mon, 24 Sep 2018, 9:23 yuvraj singh, <[email protected]> wrote:
>
>> Hi all ,
>>
>>
>> I am getting this error with flink 1.6.0 , please help me .
>>
>>
>>
>>
>>
>>
>> 2018-09-23 07:15:08,846 ERROR
>> org.apache.kafka.clients.producer.KafkaProducer -
>> Interrupted while joining ioThread
>>
>> java.lang.InterruptedException
>>
>> at java.lang.Object.wait(Native Method)
>>
>> at java.lang.Thread.join(Thread.java:1257)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>>
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>>
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> 2018-09-23 07:15:08,847 INFO org.apache.kafka.clients.producer.KafkaProducer
>> - Proceeding to force close the producer since pending
>> requests could not be completed within timeout 9223372036854775807 ms.
>>
>> 2018-09-23 07:15:08,860 ERROR
>> org.apache.flink.streaming.runtime.tasks.StreamTask - Error
>> during disposal of stream operator.
>>
>> org.apache.kafka.common.KafkaException: Failed to close kafka producer
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:734)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:682)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:661)
>>
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
>>
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Caused by: java.lang.InterruptedException
>>
>> at java.lang.Object.wait(Native Method)
>>
>> at java.lang.Thread.join(Thread.java:1257)
>>
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:703)
>>
>> ... 9 more
>>
>>
>> Thanks
>>
>> Yubraj Singh
>>
>