This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e2a6c99310 Expose bootstrap and decommission state to nodetool info
e2a6c99310 is described below

commit e2a6c99310aa93ba3506ca8f603ae1039372f533
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Wed Jun 14 17:43:50 2023 +0200

    Expose bootstrap and decommission state to nodetool info
    
    patch by Stefan Miklosovic; reviewed by Brandon Williams CASSANDRA-18555
    
    Co-authored-by: Jaydeepkumar Chovatia <[email protected]>
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   1 +
 .../apache/cassandra/service/StorageService.java   | 121 ++++++++----
 .../cassandra/service/StorageServiceMBean.java     |  19 ++
 .../cassandra/tools/nodetool/Decommission.java     |  10 +
 .../org/apache/cassandra/tools/nodetool/Info.java  |   4 +
 .../distributed/test/DecommissionTest.java         | 220 +++++++++++++++++++++
 7 files changed, 339 insertions(+), 37 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6437d2796d..334f829bfc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0
+ * Expose bootstrap and decommission state to nodetool info (CASSANDRA-18555)
  * Fix SSTabledump errors when dumping data from index (CASSANDRA-17698)
  * Avoid unnecessary deserialization of terminal arguments when executing CQL 
functions (CASSANDRA-18566)
  * Remove dependency on pytz library for setting CQLSH timezones on Python 
version >= 3.9 (CASSANDRA-17433)
diff --git a/NEWS.txt b/NEWS.txt
index 43e980582e..7027b6e489 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -164,6 +164,7 @@ New features
     - Added `sstablepartitions` offline tool to find large partitions in 
sstables.
     - `cassandra-stress` has a new option called '-jmx' which enables a user 
to pass username and password to JMX (CASSANDRA-18544)
     - It is possible to read all credentials for `cassandra-stress` from a 
file via option `-credentials-file` (CASSANDRA-18544)
+    - nodetool info displays bootstrap state a node is in as well as if it was 
decommissioned or if it failed to decommission (CASSANDRA-18555)
 
 Upgrading
 ---------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 5f99fef4bc..1773673f61 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -263,6 +263,8 @@ import static org.apache.cassandra.net.NoPayload.noPayload;
 import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
 import static 
org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
 import static 
org.apache.cassandra.service.ActiveRepairService.repairCommandExecutor;
+import static org.apache.cassandra.service.StorageService.Mode.DECOMMISSIONED;
+import static 
org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED;
 import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
@@ -414,7 +416,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     /* the probability for tracing any particular request, 0 disables tracing 
and 1 enables for all */
     private double traceProbability = 0.0;
 
-    public enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, 
MOVING, DRAINING, DRAINED }
+    public enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, 
DECOMMISSION_FAILED, MOVING, DRAINING, DRAINED }
     private volatile Mode operationMode = Mode.STARTING;
 
     /* Used for tracking drain progress */
@@ -626,7 +628,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
      * they get the Gossip shutdown message, so even if
      * we don't get time to broadcast this, it is not a problem.
      *
-     * See {@link Gossiper#markAsShutdown(InetAddressAndPort)}
+     * See Gossiper.markAsShutdown(InetAddressAndPort)
      */
     private void shutdownClientServers()
     {
@@ -2157,7 +2159,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     /**
      * All MVs have been created during bootstrap, so mark them as built
      */
-    private void markViewsAsBuilt() {
+    private void markViewsAsBuilt()
+    {
         for (String keyspace : Schema.instance.getUserKeyspaces().names())
         {
             for (ViewMetadata view: 
Schema.instance.getKeyspaceMetadata(keyspace).views)
@@ -2168,11 +2171,18 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     /**
      * Called when bootstrap did finish successfully
      */
-    private void bootstrapFinished() {
+    private void bootstrapFinished()
+    {
         markViewsAsBuilt();
         isBootstrapMode = false;
     }
 
+    @Override
+    public String getBootstrapState()
+    {
+        return SystemKeyspace.getBootstrapState().name();
+    }
+
     public boolean resumeBootstrap()
     {
         if (isBootstrapMode && SystemKeyspace.bootstrapInProgress())
@@ -5128,18 +5138,32 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public void decommission(boolean force) throws InterruptedException
     {
+        if (operationMode == DECOMMISSIONED)
+        {
+            logger.info("This node was already decommissioned. There is no 
point in decommissioning it again.");
+            return;
+        }
+
+        if (isDecommissioning())
+        {
+            logger.info("This node is still decommissioning.");
+            return;
+        }
+
         TokenMetadata metadata = tokenMetadata.cloneAfterAllLeft();
+        // there is no point to do this logic again once node was 
decommissioning but failed to do so
         if (operationMode != Mode.LEAVING)
         {
             if 
(!tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
                 throw new UnsupportedOperationException("local node is not a 
member of the token ring yet");
-            if (metadata.getAllEndpoints().size() < 2)
+            if (metadata.getAllEndpoints().size() < 2 && 
metadata.getAllEndpoints().contains(FBUtilities.getBroadcastAddressAndPort()))
                     throw new UnsupportedOperationException("no other normal 
nodes in the ring; decommission would be pointless");
-            if (operationMode != Mode.NORMAL)
+            if (operationMode != Mode.NORMAL && operationMode != 
DECOMMISSION_FAILED)
                 throw new UnsupportedOperationException("Node in " + 
operationMode + " state; wait for status to become normal or restart");
         }
+
         if (!isDecommissioning.compareAndSet(false, true))
-            throw new IllegalStateException("Node is still decommissioning. 
Check nodetool netstats.");
+            throw new IllegalStateException("Node is still decommissioning. 
Check nodetool netstats or nodetool info.");
 
         if (logger.isDebugEnabled())
             logger.debug("DECOMMISSIONING");
@@ -5150,27 +5174,35 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
             String dc = 
DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();
 
-            if (operationMode != Mode.LEAVING) // If we're already 
decommissioning there is no point checking RF/pending ranges
+            // If we're already decommissioning there is no point checking 
RF/pending ranges
+            if (operationMode != Mode.LEAVING)
             {
                 int rf, numNodes;
                 for (String keyspaceName : 
Schema.instance.getNonLocalStrategyKeyspaces().names())
                 {
                     if (!force)
                     {
+                        boolean notEnoughLiveNodes = false;
                         Keyspace keyspace = Keyspace.open(keyspaceName);
                         if (keyspace.getReplicationStrategy() instanceof 
NetworkTopologyStrategy)
                         {
                             NetworkTopologyStrategy strategy = 
(NetworkTopologyStrategy) keyspace.getReplicationStrategy();
                             rf = strategy.getReplicationFactor(dc).allReplicas;
-                            numNodes = 
metadata.getTopology().getDatacenterEndpoints().get(dc).size();
+                            Collection<InetAddressAndPort> datacenterEndpoints 
= metadata.getTopology().getDatacenterEndpoints().get(dc);
+                            numNodes = datacenterEndpoints.size();
+                            if (numNodes <= rf && 
datacenterEndpoints.contains(FBUtilities.getBroadcastAddressAndPort()))
+                                notEnoughLiveNodes = true;
                         }
                         else
                         {
-                            numNodes = metadata.getAllEndpoints().size();
+                            Set<InetAddressAndPort> allEndpoints = 
metadata.getAllEndpoints();
+                            numNodes = allEndpoints.size();
                             rf = 
keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
+                            if (numNodes <= rf && 
allEndpoints.contains(FBUtilities.getBroadcastAddressAndPort()))
+                                notEnoughLiveNodes = true;
                         }
 
-                        if (numNodes <= rf)
+                        if (notEnoughLiveNodes)
                             throw new UnsupportedOperationException("Not 
enough live nodes to maintain replication factor in keyspace "
                                                                     + 
keyspaceName + " (RF = " + rf + ", N = " + numNodes + ")."
                                                                     + " 
Perform a forceful decommission to ignore.");
@@ -5182,42 +5214,48 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             }
 
             startLeaving();
-            long timeout = Math.max(RING_DELAY_MILLIS, 
BatchlogManager.instance.getBatchlogTimeout());
+            long timeout = Math.max(RING_DELAY_MILLIS, 
BatchlogManager.getBatchlogTimeout());
             setMode(Mode.LEAVING, "sleeping " + timeout + " ms for batch 
processing and pending range setup", true);
             Thread.sleep(timeout);
 
-            Runnable finishLeaving = new Runnable()
+            unbootstrap();
+
+            // shutdown cql, gossip, messaging, Stage and set state to 
DECOMMISSIONED
+
+            shutdownClientServers();
+            Gossiper.instance.stop();
+            try
             {
-                public void run()
-                {
-                    shutdownClientServers();
-                    Gossiper.instance.stop();
-                    try
-                    {
-                        MessagingService.instance().shutdown();
-                    }
-                    catch (IOError ioe)
-                    {
-                        logger.info("failed to shutdown message service: {}", 
ioe);
-                    }
+                MessagingService.instance().shutdown();
+            }
+            catch (IOError ioe)
+            {
+                logger.info("failed to shutdown message service", ioe);
+            }
 
-                    Stage.shutdownNow();
-                    
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED);
-                    setMode(Mode.DECOMMISSIONED, true);
-                    // let op be responsible for killing the process
-                }
-            };
-            unbootstrap(finishLeaving);
+            Stage.shutdownNow();
+            
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED);
+            setMode(DECOMMISSIONED, true);
+            // let op be responsible for killing the process
         }
         catch (InterruptedException e)
         {
-            throw new UncheckedInterruptedException(e);
+            setMode(DECOMMISSION_FAILED, true);
+            logger.error("Node interrupted while decommissioning");
+            throw new RuntimeException("Node interrupted while 
decommissioning");
         }
         catch (ExecutionException e)
         {
-            logger.error("Error while decommissioning node ", e.getCause());
+            setMode(DECOMMISSION_FAILED, true);
+            logger.error("Error while decommissioning node: {}", 
e.getCause().getMessage());
             throw new RuntimeException("Error while decommissioning node: " + 
e.getCause().getMessage());
         }
+        catch (Throwable t)
+        {
+            setMode(DECOMMISSION_FAILED, true);
+            logger.error("Error while decommissioning node: {}", 
t.getMessage());
+            throw t;
+        }
         finally
         {
             isDecommissioning.set(false);
@@ -5254,7 +5292,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         return () -> streamRanges(rangesToStream);
     }
 
-    private void unbootstrap(Runnable onFinish) throws ExecutionException, 
InterruptedException
+    private void unbootstrap() throws ExecutionException, InterruptedException
     {
         Supplier<Future<StreamState>> startStreaming = 
prepareUnbootstrapStreaming();
 
@@ -5290,7 +5328,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         hintsSuccess.get();
         logger.debug("stream acks all received.");
         leaveRing();
-        onFinish.run();
     }
 
     private Future streamHints()
@@ -5610,7 +5647,17 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public boolean isDecommissioned()
     {
-        return operationMode == Mode.DECOMMISSIONED;
+        return operationMode == DECOMMISSIONED;
+    }
+
+    public boolean isDecommissionFailed()
+    {
+        return operationMode == DECOMMISSION_FAILED;
+    }
+
+    public boolean isDecommissioning()
+    {
+        return isDecommissioning.get();
     }
 
     public String getDrainProgress()
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 154b0d86d3..8c3428e703 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -489,6 +489,23 @@ public interface StorageServiceMBean extends 
NotificationEmitter
      */
     public void decommission(boolean force) throws InterruptedException;
 
+    /**
+     * Returns whether a node has failed to decommission.
+     *
+     * The fact that this method returns false does not mean that there was an 
attempt to
+     * decommission this node which was successful.
+     *
+     * @return true if decommission of this node has failed, false otherwise
+     */
+    public boolean isDecommissionFailed();
+
+    /**
+     * Returns whether a node is being decommissioned or not.
+     *
+     * @return true if this node is decommissioning, false otherwise
+     */
+    public boolean isDecommissioning();
+
     /**
      * @param newToken token to move this node to.
      * This node will unload its data onto its neighbors, and bootstrap to the 
new token.
@@ -1006,6 +1023,8 @@ public interface StorageServiceMBean extends 
NotificationEmitter
      */
     public boolean resumeBootstrap();
 
+    public String getBootstrapState();
+
     /** Gets the concurrency settings for processing stages*/
     static class StageConcurrency implements Serializable
     {
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Decommission.java 
b/src/java/org/apache/cassandra/tools/nodetool/Decommission.java
index 98b6d5846c..de70932c53 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Decommission.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Decommission.java
@@ -37,6 +37,16 @@ public class Decommission extends NodeToolCmd
     {
         try
         {
+            if (probe.getStorageService().isDecommissioning())
+            {
+                probe.output().out.println("This node is still 
decommissioning.");
+                return;
+            }
+            if 
("DECOMMISSIONED".equals(probe.getStorageService().getBootstrapState()))
+            {
+                probe.output().out.println("Node was already decommissioned.");
+                return;
+            }
             probe.decommission(force);
         } catch (InterruptedException e)
         {
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Info.java 
b/src/java/org/apache/cassandra/tools/nodetool/Info.java
index 5e0d87c767..72086fa568 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Info.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Info.java
@@ -177,6 +177,10 @@ public class Info extends NodeToolCmd
         {
             out.printf("%-23s: (node is not joined to the cluster)%n", 
"Token");
         }
+
+        out.printf("%-23s: %s%n", "Bootstrap state", 
probe.getStorageService().getBootstrapState());
+        out.printf("%-23s: %s%n", "Decommissioning", 
probe.getStorageService().isDecommissioning());
+        out.printf("%-23s: %s%n", "Decommission failed", 
probe.getStorageService().isDecommissionFailed());
     }
 
     /**
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
new file mode 100644
index 0000000000..66091da3ef
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.db.SystemKeyspace.BootstrapState.COMPLETED;
+import static 
org.apache.cassandra.db.SystemKeyspace.BootstrapState.DECOMMISSIONED;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
+import static 
org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED;
+import static org.apache.cassandra.service.StorageService.Mode.NORMAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DecommissionTest extends TestBaseImpl
+{
+    @Test
+    public void testDecommission() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(config -> 
config.with(GOSSIP)
+                                                                       
.with(NETWORK))
+                                           
.withInstanceInitializer(DecommissionTest.BB::install)
+                                           .start()))
+        {
+            IInvokableInstance instance = cluster.get(1);
+
+            instance.runOnInstance(() -> {
+
+                assertEquals(COMPLETED.name(), 
StorageService.instance.getBootstrapState());
+
+                // pretend that decommissioning has failed in the middle
+
+                try
+                {
+                    StorageService.instance.decommission(true);
+                    fail("the first attempt to decommission should fail");
+                }
+                catch (Throwable t)
+                {
+                    assertEquals("simulated error in 
prepareUnbootstrapStreaming", t.getMessage());
+                }
+
+                assertFalse(StorageService.instance.isDecommissioning());
+                assertTrue(StorageService.instance.isDecommissionFailed());
+
+                // still COMPLETED, nothing has changed
+                assertEquals(COMPLETED.name(), 
StorageService.instance.getBootstrapState());
+
+                String operationMode = 
StorageService.instance.getOperationMode();
+                assertEquals(DECOMMISSION_FAILED.name(), operationMode);
+
+                // try to decommission again, now successfully
+
+                try
+                {
+                    StorageService.instance.decommission(true);
+
+                    // decommission was successful, so we reset failed 
decommission mode
+                    
assertFalse(StorageService.instance.isDecommissionFailed());
+
+                    assertEquals(DECOMMISSIONED.name(), 
StorageService.instance.getBootstrapState());
+                    assertFalse(StorageService.instance.isDecommissioning());
+                }
+                catch (Throwable t)
+                {
+                    fail("the second decommission attempt should pass but it 
failed on: " + t.getMessage());
+                }
+
+                // check that decommissioning of already decommissioned node 
has no effect
+
+                try
+                {
+                    assertEquals(DECOMMISSIONED.name(), 
StorageService.instance.getBootstrapState());
+                    
assertFalse(StorageService.instance.isDecommissionFailed());
+
+                    StorageService.instance.decommission(true);
+
+                    assertEquals(DECOMMISSIONED.name(), 
StorageService.instance.getBootstrapState());
+                    
assertFalse(StorageService.instance.isDecommissionFailed());
+                    assertFalse(StorageService.instance.isDecommissioning());
+                }
+                catch (Throwable t)
+                {
+                    fail("Decommissioning already decommissioned node should 
be no-op operation.");
+                }
+            });
+        }
+    }
+
+    @Test
+    public void testDecommissionAfterNodeRestart() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(config -> 
config.with(GOSSIP)
+                                                                       
.with(NETWORK))
+                                           
.withInstanceInitializer((classLoader, threadGroup, num, generation) -> {
+                                               // we do not want to install BB 
after restart of a node which
+                                               // failed to decommission which 
is the second generation, here
+                                               // as "1" as it is counted from 
0.
+                                               if (num == 1 && generation != 1)
+                                                   BB.install(classLoader, 
num);
+                                           })
+                                           .start()))
+        {
+            IInvokableInstance instance = cluster.get(1);
+
+            instance.runOnInstance(() -> {
+                assertEquals(COMPLETED.name(), 
StorageService.instance.getBootstrapState());
+
+                // pretend that decommissioning has failed in the middle
+
+                try
+                {
+                    StorageService.instance.decommission(true);
+                    fail("the first attempt to decommission should fail");
+                }
+                catch (Throwable t)
+                {
+                    assertEquals("simulated error in 
prepareUnbootstrapStreaming", t.getMessage());
+                }
+
+                // node is in DECOMMISSION_FAILED mode
+                String operationMode = 
StorageService.instance.getOperationMode();
+                assertEquals(DECOMMISSION_FAILED.name(), operationMode);
+            });
+
+            // restart the node which we failed to decommission
+            stopUnchecked(instance);
+            instance.startup();
+
+            // it is back to normal so let's decommission again
+
+            String oprationMode = instance.callOnInstance(() -> 
StorageService.instance.getOperationMode());
+            assertEquals(NORMAL.name(), oprationMode);
+
+            instance.runOnInstance(() -> {
+                try
+                {
+                    StorageService.instance.decommission(true);
+                }
+                catch (InterruptedException e)
+                {
+                    fail("Should decommission the node");
+                }
+
+                assertEquals(DECOMMISSIONED.name(), 
StorageService.instance.getBootstrapState());
+                assertFalse(StorageService.instance.isDecommissionFailed());
+                assertFalse(StorageService.instance.isDecommissioning());
+            });
+        }
+    }
+
+
+    public static class BB
+    {
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            new ByteBuddy().rebase(StorageService.class)
+                           .method(named("prepareUnbootstrapStreaming"))
+                           
.intercept(MethodDelegation.to(DecommissionTest.BB.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        private static int invocations = 0;
+
+        @SuppressWarnings("unused")
+        public static Supplier<Future<StreamState>> 
prepareUnbootstrapStreaming(@SuperCall Callable<Supplier<Future<StreamState>>> 
zuper)
+        {
+            ++invocations;
+
+            if (invocations == 1)
+                throw new RuntimeException("simulated error in 
prepareUnbootstrapStreaming");
+
+            try
+            {
+                return zuper.call();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to