Hi,

at this point, this seems more like a Kafka question than a Flink question. You 
think you need to configure high availability for Kafka with Zookeeper, you can 
probably find more about this here: 
https://www.cloudera.com/documentation/kafka/latest/topics/kafka_ha.html 
<https://www.cloudera.com/documentation/kafka/latest/topics/kafka_ha.html>.

Best,
Stefan


> Am 20.12.2017 um 12:06 schrieb Shivam Sharma <28shivamsha...@gmail.com>:
> 
> Hi Stefan,
> 
> Kafka one node was down. But I want it to restart automatically . How can I
> solve it?
> 
> Thanks
> 
> On Wed, Dec 20, 2017 at 4:24 PM, Stefan Richter <s.rich...@data-artisans.com
>> wrote:
> 
>> Hi,
>> 
>> did you see that the problem starts from a Kafka exception „Failed to send
>> data to Kafka: This server is not the leader for that topic-partition.“? Is
>> it possible that you had a network issue and the producer could not find
>> the leader broker?
>> 
>> Best,
>> Stefan
>> 
>>> Am 20.12.2017 um 10:57 schrieb Shivam Sharma <28shivamsha...@gmail.com>:
>>> 
>>> ​Hi ,
>>> 
>>> I am always facing this issue with Flink job on yarn.
>>> Basically I am reading data from kafka, transforming it & putting in
>> kafka
>>> only.
>>> 
>>> My build.sbt is:
>>> 
>>> val flinkVersion = "1.3.2"
>>> val flinkKafkaConnect = "0.10.2"
>>> 
>>> libraryDependencies ++= Seq(
>>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
>>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>>   "org.apache.flink" %% "flink-table" % flinkVersion,
>>>   "org.json4s" %% "json4s-native" % "3.5.3",
>>>   "org.json4s" %% "json4s-jackson" % "3.5.3"
>>> 
>>> )
>>> 
>>> *Note: One of the node in our kafka Cluster ​goes down.*
>>> 
>>> 
>>> java.lang.RuntimeException: Exception occurred while processing valve
>>> output watermark:
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$
>> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
>>> java:289)
>>> at
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
>> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
>>> java:173)
>>> at
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
>> inputWatermark(StatusWatermarkValve.
>>> java:108)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
>> StreamInputProcessor.
>>> java:188)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
>> OneInputStreamTask.
>>> java:69)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.
>> invoke(StreamTask.
>>> java:263)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.
>> collect(TimeWindowPropertyCollector.
>>> scala:58)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc
>> tion.apply(IncrementalAggregateWindowFunction.
>>> scala:75)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
>> Function.apply(IncrementalAggregateTimeWindowFunction.
>>> scala:65)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
>> Function.apply(IncrementalAggregateTimeWindowFunction.
>>> scala:36)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.
>> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct
>> ion.
>>> java:44)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.
>> emitWindowContents(WindowOperator.
>>> java:597)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.onEventTime(WindowOperator.
>>> java:504)
>>> at
>>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.
>> advanceWatermark(HeapInternalTimerService.
>>> java:275)
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.
>> advanceWatermark(InternalTimeServiceManager.
>>> java:107)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.
>> processWatermark(AbstractStreamOperator.
>>> java:946)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$
>> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
>>> java:286)
>>> ... 7 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.table.runtime.CRowWrappingCollector.
>> collect(CRowWrappingCollector.
>>> scala:37)
>>> at
>>> org.apache.flink.table.runtime.CRowWrappingCollector.
>> collect(CRowWrappingCollector.
>>> scala:28)
>>> at DataStreamCalcRule$127.processElement(Unknown Source)
>>> at
>>> org.apache.flink.table.runtime.CRowProcessRunner.processElement(
>> CRowProcessRunner.
>>> scala:67)
>>> at
>>> org.apache.flink.table.runtime.CRowProcessRunner.processElement(
>> CRowProcessRunner.
>>> scala:35)
>>> at
>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(
>> ProcessOperator.
>>> java:66)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 23 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.
>> processElement(StreamMap.
>>> java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 35 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
>> anonfun$flatMap$1.apply(DataStream.
>>> scala:622)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
>> anonfun$flatMap$1.apply(DataStream.
>>> scala:622)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.
>> flatMap(DataStream.
>>> scala:622)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.
>> processElement(StreamFlatMap.
>>> java:50)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 41 more
>>> Caused by: java.lang.Exception: Failed to send data to Kafka: This server
>>> is not the leader for that topic-partition.
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
>> checkErroneous(FlinkKafkaProducerBase.
>>> java:373)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
>>> .invokeInternal(FlinkKafkaProducer010.java:302)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
>>> .invoke(FlinkKafkaProducer010.java:407)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.
>> processElement(StreamSink.
>>> java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 52 more
>>> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
>> :
>>> This server is not the leader for that topic-partition.
>>> 2017-12-20 05:42:16,008 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>> Persuasion (b65f972af4866ff0548bf7f0438caf8f) switched from state
>> RUNNING
>>> to FAILING.
>>> java.lang.RuntimeException: Exception occurred while processing valve
>>> output watermark:
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$
>> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
>>> java:289)
>>> at
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
>> findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.
>>> java:173)
>>> at
>>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
>> inputWatermark(StatusWatermarkValve.
>>> java:108)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
>> StreamInputProcessor.
>>> java:188)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
>> OneInputStreamTask.
>>> java:69)
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.
>> invoke(StreamTask.
>>> java:263)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.
>> collect(TimeWindowPropertyCollector.
>>> scala:58)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunc
>> tion.apply(IncrementalAggregateWindowFunction.
>>> scala:75)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
>> Function.apply(IncrementalAggregateTimeWindowFunction.
>>> scala:65)
>>> at
>>> org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindow
>> Function.apply(IncrementalAggregateTimeWindowFunction.
>>> scala:36)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.
>> InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunct
>> ion.
>>> java:44)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.
>> emitWindowContents(WindowOperator.
>>> java:597)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.
>> WindowOperator.onEventTime(WindowOperator.
>>> java:504)
>>> at
>>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.
>> advanceWatermark(HeapInternalTimerService.
>>> java:275)
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.
>> advanceWatermark(InternalTimeServiceManager.
>>> java:107)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.
>> processWatermark(AbstractStreamOperator.
>>> java:946)
>>> at
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor$
>> ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.
>>> java:286)
>>> ... 7 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.table.runtime.CRowWrappingCollector.
>> collect(CRowWrappingCollector.
>>> scala:37)
>>> at
>>> org.apache.flink.table.runtime.CRowWrappingCollector.
>> collect(CRowWrappingCollector.
>>> scala:28)
>>> at DataStreamCalcRule$127.processElement(Unknown Source)
>>> at
>>> org.apache.flink.table.runtime.CRowProcessRunner.processElement(
>> CRowProcessRunner.
>>> scala:67)
>>> at
>>> org.apache.flink.table.runtime.CRowProcessRunner.processElement(
>> CRowProcessRunner.
>>> scala:35)
>>> at
>>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(
>> ProcessOperator.
>>> java:66)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 23 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.
>> processElement(StreamMap.
>>> java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 35 more
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:530)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:503)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.collect(OperatorChain.
>>> java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:891)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$
>> CountingOutput.collect(AbstractStreamOperator.
>>> java:869)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(
>> TimestampedCollector.
>>> java:51)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
>> anonfun$flatMap$1.apply(DataStream.
>>> scala:622)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$
>> anonfun$flatMap$1.apply(DataStream.
>>> scala:622)
>>> at scala.collection.immutable.List.foreach(List.scala:381)
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.
>> flatMap(DataStream.
>>> scala:622)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamFlatMap.
>> processElement(StreamFlatMap.
>>> java:50)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 41 more
>>> Caused by: java.lang.Exception: Failed to send data to Kafka: This server
>>> is not the leader for that topic-partition.
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.
>> checkErroneous(FlinkKafkaProducerBase.
>>> java:373)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
>>> .invokeInternal(FlinkKafkaProducer010.java:302)
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010
>>> .invoke(FlinkKafkaProducer010.java:407)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.
>> processElement(StreamSink.
>>> java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$
>> CopyingChainingOutput.pushToOperator(OperatorChain.
>>> java:528)
>>> ... 52 more
>>> 
>>> 
>>> Thanks
>>> 
>>> --
>>> Shivam Sharma
>>> Data Engineer @ Goibibo
>>> Indian Institute Of Information Technology, Design and Manufacturing
>>> Jabalpur
>>> Mobile No- (+91) 8882114744
>>> Email:- 28shivamsha...@gmail.com
>>> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
>>> <https://www.linkedin.com/in/28shivamsharma>*
>> 
>> 
> 
> 
> -- 
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> <https://www.linkedin.com/in/28shivamsharma>*

Reply via email to