[ 
https://issues.apache.org/jira/browse/KAFKA-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16985429#comment-16985429
 ] 

John Roesler edited comment on KAFKA-9231 at 11/30/19 8:32 PM:
---------------------------------------------------------------

In addition to the error I listed above (ProducerFencedException), I have also 
observed the following recoverable exceptions leading to threads dying:

UnknownProducerIdException:
{noformat}
org.apache.kafka.streams.errors.StreamsException: task [1_0] Abort sending 
since an error caught with a previous record (timestamp 1574960233670) to topic 
stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog due to 
org.apache.kafka.common.errors.UnknownProducerIdException: This exception is 
raised by the broker if it could not locate the producer metadata associated 
with the producerId in question. This could happen if, for instance, the 
producer's records were deleted because their retention time had elapsed. Once 
the last records of the producerId are removed, the producer's metadata is 
removed from the broker, and future appends by the producer will return this 
exception.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:143)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:51)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:202)
        at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716)
        at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
        at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
        at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
        at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:562)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:554)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
        at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This 
exception is raised by the broker if it could not locate the producer metadata 
associated with the producerId in question. This could happen if, for instance, 
the producer's records were deleted because their retention time had elapsed. 
Once the last records of the producerId are removed, the producer's metadata is 
removed from the broker, and future appends by the producer will return this 
exception.
{noformat}

and an internal assertion:
{noformat}
java.lang.IllegalStateException: RocksDB metrics recorder for store 
"KSTREAM-AGGREGATE-STATE-STORE-0000000049" of task 3_1 has already been added. 
This is a bug in Kafka Streams.
        at 
org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger.addMetricsRecorder(RocksDBMetricsRecordingTrigger.java:30)
        at 
org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder.addStatistics(RocksDBMetricsRecorder.java:93)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.maybeSetUpMetricsRecorder(RocksDBStore.java:205)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:191)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:227)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:203)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:201)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:211)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:323)
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:76)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:385)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
[2019-11-29 08:27:08,883] INFO 
[stream-soak-test-a0363f9f-ff9c-4dbd-b38a-8e898d77a22e-StreamThread-2] 
stream-thread 
[stream-soak-test-a0363f9f-ff9c-4dbd-b38a-8e898d77a22e-StreamThread-2] State 
transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN 
(org.apache.kafka.streams.processor.internals.StreamThread)
{noformat}

This is a specific internal assertion. What happened there is that the 
RocksDBStore initializer registers itself with the metric collector before 
fully creating the store. The store creation failed, but it was still 
registered, leading to a situation where the store could no longer be created 
at all.

My testing setup is to run a Streams application processing data at a modest 
rate (~ 20K records/sec) in a three-instance configuration, each of which have 
three StreamsThreads. I'm introducing network partitions separating each 
instance one at a time from the brokers, at an incidence of about one network 
partition per hour, each lasting up to 5 minutes.

I've observed all of these exceptions to kill Streams threads within a few 
occurrences of network partitioning. Note that Streams is configured with 
appropriate timeouts/retries to tolerate network interruptions lasting longer 
than 5 mintues, so any thread deaths are unexpected. All the thread deaths I've 
observed are the results of bugs in the Streams exception handling code.

There are two main categories of exception:
1. ProducerFencedException and UnknownProducerIdException . While they reflect 
different root causes, both of these are expected with EOS enabled, if the 
producer is silent for too long and ceases to be considered a valid member by 
the broker. Streams is supposed to handle this situation by rejoining the group 
(which includes discarding and re-creating its Producers). These were uncovered 
by repeated rebalances ultimately caused by the injected network partitions.
2. IllegalStateException. A specific internal assertion revealed buggy store 
initialization logic. This was also uncovered by repeated rebalances ultimately 
caused by the injected network partitions.

I have addressed all of these bugs in my PR 
https://github.com/apache/kafka/pull/7748

I'm proposing to consider this a Blocker for the 2.4.0 release. It is both 
severe (the exceptions above have reliably caused my Streams cluster to die 
completely within about 12-24 hours), and it is a regression. 

To determine the latter claim, I ran the exact same workload with the exact 
same scenario using Kafka Streams 2.3. It should be noted that I still observed 
threads to die on that branch, but *only* due to the UnknownProducerId 
exception. So, there is some overlap with what I'm seeing on 2.4, but aside 
from that one cause, the fact that Streams is losing threads at a high rate 
from both ProducerFencedExceptions and its own internal assertion 
(IllegalStateException) leads me to think that Streams would be less stable in 
production using EOS on 2.4 than it was on 2.3.

For completeness, note that I've run the same test on 2.4 and 2.3 _without_ 
EOS, and Streams is quite stable. Also note, that the exceptions killing my 
applications seem to be directly caused by frequent rebalances and network 
interruptions. For users running EOS Streams in reliable and stable conditions, 
I do not expect them to suffer thread deaths.

I know that everyone is waiting for the much-delayed 2.4.0 release, so I'm not 
taking a hard stance on it, but from where I'm sitting, the situation seems to 
warrant a new RC once the fix is prepared. Also note, a new RC was just 
announced this morning, so assuming I can merge my PR on Monday, we're only 
setting the release back a couple of extra days.

Note that we have not previously tested Streams under these conditions, which 
is why we're discovering these bugs so late in the game.


was (Author: vvcephei):
In addition to the error I listed above (ProducerFencedException), I have also 
observed the following recoverable exceptions leading to threads dying:

UnknownProducerIdException:
{noformat}
org.apache.kafka.streams.errors.StreamsException: task [1_0] Abort sending 
since an error caught with a previous record (timestamp 1574960233670) to topic 
stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog due to 
org.apache.kafka.common.errors.UnknownProducerIdException: This exception is 
raised by the broker if it could not locate the producer metadata associated 
with the producerId in question. This could happen if, for instance, the 
producer's records were deleted because their retention time had elapsed. Once 
the last records of the producerId are removed, the producer's metadata is 
removed from the broker, and future appends by the producer will return this 
exception.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:143)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:51)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:202)
        at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716)
        at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
        at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
        at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
        at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:562)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:554)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
        at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This 
exception is raised by the broker if it could not locate the producer metadata 
associated with the producerId in question. This could happen if, for instance, 
the producer's records were deleted because their retention time had elapsed. 
Once the last records of the producerId are removed, the producer's metadata is 
removed from the broker, and future appends by the producer will return this 
exception.
{noformat}

and an internal assertion:
{noformat}
java.lang.IllegalStateException: RocksDB metrics recorder for store 
"KSTREAM-AGGREGATE-STATE-STORE-0000000049" of task 3_1 has already been added. 
This is a bug in Kafka Streams.
        at 
org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger.addMetricsRecorder(RocksDBMetricsRecordingTrigger.java:30)
        at 
org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder.addStatistics(RocksDBMetricsRecorder.java:93)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.maybeSetUpMetricsRecorder(RocksDBStore.java:205)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:191)
        at 
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:227)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58)
        at 
org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:203)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:201)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:211)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:323)
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:76)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:385)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
[2019-11-29 08:27:08,883] INFO 
[stream-soak-test-a0363f9f-ff9c-4dbd-b38a-8e898d77a22e-StreamThread-2] 
stream-thread 
[stream-soak-test-a0363f9f-ff9c-4dbd-b38a-8e898d77a22e-StreamThread-2] State 
transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN 
(org.apache.kafka.streams.processor.internals.StreamThread)
{noformat}

This is a specific internal assertion. What happened there is that the 
RocksDBStore initializer registers itself with the metric collector before 
fully creating the store. The store creation failed, but it was still 
registered, leading to a situation where the store could no longer be created 
at all.

My testing setup is to run a Streams application processing data at a modest 
rate (~ 20K records/sec) in a three-instance configuration, each of which have 
three StreamsThreads. I'm introducing network partitions separating each 
instance one at a time from the brokers, at an incidence of about one network 
partition per hour, each lasting up to 5 minutes.

I've observed all of these exceptions to kill Streams threads within a few 
occurrences of network partitioning. Note that Streams is configured with 
appropriate timeouts/retries to tolerate network interruptions lasting longer 
than 5 mintues, so any thread deaths are unexpected. All the thread deaths I've 
observed are the results of bugs in the Streams exception handling code.

There are two main categories of exception:
1. ProducerFencedException and UnknownProducerIdException . While they reflect 
different root causes, both of these are expected with EOS enabled, if the 
producer is silent for too long and ceases to be considered a valid member by 
the broker. Streams is supposed to handle this situation by rejoining the group 
(which includes discarding and re-creating its Producers). These were uncovered 
by repeated rebalances ultimately caused by the injected network partitions.
2. IllegalStateException. A specific internal assertion revealed buggy store 
initialization logic. This was also uncovered by repeated rebalances ultimately 
caused by the injected network partitions.

I have addressed all of these bugs in my PR 
https://github.com/apache/kafka/pull/7748

I'm proposing to consider this a Blocker for the 2.4.0 release. It is both 
severe (the exceptions above have reliably caused my Streams cluster to die 
completely within about 12-24 hours), and it is a regression. 

To determine the latter claim, I ran the exact same workload with the exact 
same scenario using Kafka Streams 2.3. It should be noted that I still observed 
threads to die on that branch, but *only* due to the UnknownProducerId 
exception. So, there is some overlap with what I'm seeing on 2.4, but aside 
from that one cause, the fact that Streams is losing threads at a high rate 
from both ProducerFencedExceptions and its own internal assertion 
(IllegalStateException) leads me to think that Streams would be less stable in 
production using EOS on 2.4 than it was on 2.3.

For completeness, note that I've run the same test on 2.4 and 2.3 _without_ 
EOS, and Streams is quite stable. Also note, that the exceptions killing my 
applications seem to be directly caused by frequent rebalances and network 
interruptions. For users running EOS Streams in reliable and stable conditions, 
I do not expect them to suffer thread deaths.

I know that everyone is waiting for the much-delayed 2.4.0 release, so I'm not 
taking a hard stance on it, but from where I'm sitting, the situation seems to 
warrant a new RC once the fix is prepared. Also note, a new RC was just 
announced this morning, so assuming I can merge my PR on Monday, we're only 
setting the release back a couple of extra days.

> Streams Threads may die from recoverable errors with EOS enabled
> ----------------------------------------------------------------
>
>                 Key: KAFKA-9231
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9231
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Blocker
>
> While testing Streams in EOS mode under frequent and heavy network 
> partitions, I've encountered the following error, leading to thread death:
> {noformat}
> [2019-11-26 04:54:02,650] ERROR 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
> stream-thread 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.StreamsException: stream-thread 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] Failed 
> to rebalance.
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:739)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] failed 
> to suspend stream tasks
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:253)
>       at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:116)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.enforceRebalance(StreamThread.java:716)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:710)
>       ... 1 more
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task 
> [1_1] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000007
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:279)
>       at 
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:175)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:581)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:535)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:660)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:628)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendRunningTasks(AssignedStreamsTasks.java:145)
>       at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendOrCloseTasks(AssignedStreamsTasks.java:128)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:246)
>       ... 7 more
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with the same transactionalId, or the producer's transaction has been expired 
> by the broker.
> [2019-11-26 04:54:02,650] INFO 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
> stream-thread 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] State 
> transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2019-11-26 04:54:02,650] INFO 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
> stream-thread 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
> Shutting down (org.apache.kafka.streams.processor.internals.StreamThread)
> [2019-11-26 04:54:02,650] INFO 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
> [Consumer 
> clientId=stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2-restore-consumer,
>  groupId=null] Unsubscribed all topics or patterns and assigned partitions 
> (org.apache.kafka.clients.consumer.KafkaConsumer)
> [2019-11-26 04:54:02,653] INFO 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
> stream-thread 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] State 
> transition from PENDING_SHUTDOWN to DEAD 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> {noformat}
> Elsewhere in the code, we catch ProducerFencedExceptions and trigger a 
> rebalance instead of killing the thread. It seems like one possible avenue 
> has slipped through the cracks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to