From a733cedcf54ab0b979f7f1b762d086e49bd59d72 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Tue, 27 Oct 2009 09:34:04 +0000
Subject: [PATCH] Fix for issue 4316 : Replication takes too much time to shutdown

---
 opends/src/server/org/opends/server/replication/server/ServerWriter.java                                  |    2 
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                             |    6 -
 opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java                  |    2 
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                              |    2 
 opends/src/server/org/opends/server/replication/server/ECLServerWriter.java                               |    2 
 opends/src/server/org/opends/server/replication/server/ServerHandler.java                                 |    4 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java |    2 
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                       |   62 ++++++++++-----
 opends/src/server/org/opends/server/replication/server/DbHandler.java                                     |    4 
 opends/src/server/org/opends/server/replication/server/ServerReader.java                                  |   14 +-
 opends/src/server/org/opends/server/replication/common/MutableBoolean.java                                |   67 ++++++++++++++++
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                            |   23 +++++
 12 files changed, 145 insertions(+), 45 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/common/MutableBoolean.java b/opends/src/server/org/opends/server/replication/common/MutableBoolean.java
new file mode 100644
index 0000000..19c9322
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/common/MutableBoolean.java
@@ -0,0 +1,67 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2009 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.common;
+
+/**
+ * The MutableBoolean wraps a boolean in a mutable way.
+ * This can be usable when one wishes to use a boolean object with condition
+ * variables.
+ */
+public class MutableBoolean
+{
+  boolean value;
+
+  /**
+   * A MutableBoolean with the given initial value.
+   *
+   * @param value  The initial value of the mutable Boolean
+   */
+  public MutableBoolean(boolean value)
+  {
+    this.value = value;
+  }
+
+  /**
+   * Retrieves the current value of this MutableBoolean.
+   *
+   * @return The current value of this MutableBoolean.
+   */
+  public boolean get()
+  {
+    return value;
+  }
+
+  /**
+   * Sets the current value of this MutableBoolean.
+   *
+   * @param value The new value of this MutableBoolean.
+   */
+  public void set(boolean value)
+  {
+    this.value = value;
+  }
+}
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 96a48b7..c92f881 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -321,9 +321,9 @@
     }
 
     shutdown  = true;
-    synchronized (this)
+    synchronized (msgQueue)
     {
-      this.notifyAll();
+      msgQueue.notifyAll();
     }
 
     synchronized (this)
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 660521f..1d53436 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -988,7 +988,7 @@
       sendWindow = new Semaphore(sendWindowSize);
 
       // create reader
-      reader = new ServerReader(session, serverId, this);
+      reader = new ServerReader(session, this);
       reader.start();
 
       if (writer == null)
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 31ce932..ee10192 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -194,7 +194,7 @@
         }
       }
       if (replicationServerDomain!=null)
-        replicationServerDomain.stopServer(handler);
+        replicationServerDomain.stopServer(handler, false);
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java b/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
index b66e8d0..bd52427 100644
--- a/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
+++ b/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
@@ -93,6 +93,6 @@
   public void close()
   {
     if (handler.getDomain() != null)
-      handler.getDomain().stopServer(handler);
+      handler.getDomain().stopServer(handler, false);
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index c36cc3b..c0c1655 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -569,10 +569,6 @@
         serverId , this);
       connectThread.start();
 
-      // FIXME : Is it better to have the time to receive the ReplServerInfo
-      // from all the other replication servers since this info is necessary
-      // to route an early received total update request.
-      try { Thread.sleep(300);} catch(Exception e) {}
       if (debugEnabled())
         TRACER.debugInfo("RS " +getMonitorInstanceName()+
             " creates listen thread");
@@ -1048,7 +1044,7 @@
       // Have a new group id: Disconnect every servers.
       for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
       {
-        replicationServerDomain.stopAllServers();
+        replicationServerDomain.stopAllServers(true);
       }
     }
 
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index d41ee2c..4ef072c 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -808,7 +808,7 @@
               cn.toString(), baseDn));
             mb.append(stackTraceToSingleLineString(e));
             logError(mb.toMessage());
-            stopServer(origServer);
+            stopServer(origServer, false);
           }
           // Mark the ack info object as completed to prevent potential timeout
           // code parallel run
@@ -887,7 +887,7 @@
                 cn.toString(), baseDn));
             mb.append(stackTraceToSingleLineString(e));
             logError(mb.toMessage());
-            stopServer(origServer);
+            stopServer(origServer, false);
           }
           // Increment assured counters
           boolean safeRead =
@@ -979,25 +979,28 @@
     for (ReplicationServerHandler handler : replicationServers.values())
     {
       if (replServers.contains(handler.getServerAddressURL()))
-        stopServer(handler);
+        stopServer(handler, false);
     }
   }
 
   /**
    * Stop operations with all servers this domain is connected with (RS and DS).
+   *
+   * @param shutdown A boolean indicating if the stop is due to a
+   *                 shutdown condition.
    */
-  public void stopAllServers()
+  public void stopAllServers(boolean shutdown)
   {
     // Close session with other replication servers
     for (ReplicationServerHandler serverHandler : replicationServers.values())
     {
-      stopServer(serverHandler);
+      stopServer(serverHandler, shutdown);
     }
 
     // Close session with other LDAP servers
     for (DataServerHandler serverHandler : directoryServers.values())
     {
-      stopServer(serverHandler);
+      stopServer(serverHandler, shutdown);
     }
   }
 
@@ -1026,9 +1029,11 @@
   /**
    * Stop operations with a given server.
    *
-   * @param handler the server for which we want to stop operations
+   * @param handler the server for which we want to stop operations.
+   * @param shutdown A boolean indicating if the stop is due to a
+   *                 shutdown condition.
    */
-  public void stopServer(ServerHandler handler)
+  public void stopServer(ServerHandler handler, boolean shutdown)
   {
       if (debugEnabled())
         TRACER.debugInfo(
@@ -1049,9 +1054,13 @@
       {
         try
         {
+
           // Acquire lock on domain (see more details in comment of start()
           // method of ServerHandler)
-          lock();
+          if (!shutdown)
+          {
+            lock();
+          }
         } catch (InterruptedException ex)
         {
           // Try doing job anyway...
@@ -1066,9 +1075,12 @@
 
             // Check if generation id has to be reset
             mayResetGenerationId();
-            // Warn our DSs that a RS or DS has quit (does not use this
-            // handler as already removed from list)
-            buildAndSendTopoInfoToDSs(null);
+            if (!shutdown)
+            {
+              // Warn our DSs that a RS or DS has quit (does not use this
+              // handler as already removed from list)
+              buildAndSendTopoInfoToDSs(null);
+            }
           }
         } else
         {
@@ -1093,10 +1105,13 @@
             mayResetGenerationId();
             // Update the remote replication servers with our list
             // of connected LDAP servers
-            buildAndSendTopoInfoToRSs();
-            // Warn our DSs that a RS or DS has quit (does not use this
-            // handler as already removed from list)
-            buildAndSendTopoInfoToDSs(null);
+            if (!shutdown)
+            {
+              buildAndSendTopoInfoToRSs();
+              // Warn our DSs that a RS or DS has quit (does not use this
+              // handler as already removed from list)
+              buildAndSendTopoInfoToDSs(null);
+            }
           }
           else if (otherHandlers.contains(handler))
           {
@@ -1113,7 +1128,10 @@
       }
       finally
       {
-        release();
+        if (!shutdown)
+        {
+          release();
+        }
       }
     }
   }
@@ -1710,7 +1728,7 @@
         mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
         mb2.append(stackTraceToSingleLineString(ioe));
         logError(mb2.toMessage());
-        stopServer(senderHandler);
+        stopServer(senderHandler, false);
       }
     } else
     {
@@ -1746,8 +1764,8 @@
             // an error happened on the sender session trying to recover
             // from an error on the receiver session.
             // We don't have much solution left beside closing the sessions.
-            stopServer(senderHandler);
-            stopServer(targetHandler);
+            stopServer(senderHandler, false);
+            stopServer(targetHandler, false);
           }
         // TODO Handle error properly (sender timeout in addition)
         }
@@ -1766,7 +1784,7 @@
     // Terminate the assured timer
     assuredTimeoutTimer.cancel();
 
-    stopAllServers();
+    stopAllServers(true);
 
     stopDbHandlers();
   }
@@ -3163,7 +3181,7 @@
           {
             TRACER.debugCaught(DebugLogLevel.ERROR, e);
             logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(rsHandler.getName()));
-            stopServer(rsHandler);
+            stopServer(rsHandler, false);
           }
         }
       }
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 22bb668..22adad3 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -375,7 +375,7 @@
 
       writer = new ServerWriter(session, serverId,
           this, replicationServerDomain);
-      reader = new ServerReader(session, serverId, this);
+      reader = new ServerReader(session, this);
 
       reader.start();
       writer.start();
@@ -1421,6 +1421,6 @@
   public void doStop()
   {
     if (replicationServerDomain!=null)
-      replicationServerDomain.stopServer(this);
+      replicationServerDomain.stopServer(this, false);
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 395b5a8..8121a0d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -58,7 +58,6 @@
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
-  private int serverId;
   private ProtocolSession session;
   private ServerHandler handler;
 
@@ -66,16 +65,14 @@
    * Constructor for the LDAP server reader part of the replicationServer.
    *
    * @param session The ProtocolSession from which to read the data.
-   * @param serverId The server ID of the server from which we read messages.
    * @param handler The server handler for this server reader.
    */
-  public ServerReader(ProtocolSession session, int serverId,
+  public ServerReader(ProtocolSession session,
       ServerHandler handler)
   {
     super("Replication Reader Thread for RS handler " +
         handler.getMonitorInstanceName());
     this.session = session;
-    this.serverId = serverId;
     this.handler = handler;
   }
 
@@ -302,9 +299,12 @@
       if (debugEnabled())
         TRACER.debugInfo(
             "In " + this.getName() + " " + stackTraceToSingleLineString(e));
-      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
-        Integer.toString(handler.getReplicationServerId()));
-      logError(errMessage);
+      if (!handler.shuttingDown())
+      {
+        errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
+            Integer.toString(handler.getReplicationServerId()));
+        logError(errMessage);
+      }
     }
     catch (ClassNotFoundException e)
     {
diff --git a/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 80ad900..1bdce8c 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -246,7 +246,7 @@
       {
        // Can't do much more : ignore
       }
-      replicationServerDomain.stopServer(handler);
+      replicationServerDomain.stopServer(handler, false);
       if (debugEnabled())
       {
         TRACER.debugInfo(this.getName() + " stopped " + errMessage);
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 9b8f67d..992fe7c 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -79,6 +79,7 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.AssuredMode;
 import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.MutableBoolean;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.common.StatusMachine;
@@ -309,7 +310,7 @@
    * This object is used as a conditional event to be notified about
    * the reception of monitor information from the Replication Server.
    */
-  private Object monitorResponse = new Object();
+  private final MutableBoolean monitorResponse = new MutableBoolean(false);
 
 
   /**
@@ -585,6 +586,8 @@
    */
   public Map<Integer, ServerState> getReplicaStates()
   {
+    monitorResponse.set(false);
+
     // publish Monitor Request Message to the Replication Server
     broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
 
@@ -593,7 +596,10 @@
     {
       synchronized (monitorResponse)
       {
-        monitorResponse.wait(10000);
+        if (monitorResponse.get() == false)
+        {
+          monitorResponse.wait(10000);
+        }
       }
     } catch (InterruptedException e)
     {}
@@ -844,6 +850,7 @@
           // Notify the sender that the response was received.
           synchronized (monitorResponse)
           {
+            monitorResponse.set(true);
             monitorResponse.notify();
           }
         }
@@ -1901,6 +1908,18 @@
     disableService();
     enableService();
 
+    // wait for the domain to reconnect.
+    int count = 0;
+    while (!isConnected() && (count < 10))
+    {
+      try
+      {
+        Thread.sleep(100);
+      } catch (InterruptedException e)
+      {
+      }
+    }
+
     resetGenerationId(getGenerationID());
 
     // check that at least one ReplicationServer did change its generation-id
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index d4e3302..0620f0a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -419,7 +419,7 @@
         openReplicationSession(DN.decode(TEST_ROOT_DN_STRING),  3,
                              100, replicationServerPort, 5000, state);
 
-      assertTrue(broker.isConnected());
+      assertTrue(broker.isConnected(), "Broker could not connect to RS");
 
       ReplicationMsg msg2 = broker.receive();
       broker.updateWindowAfterReplay();

--
Gitblit v1.10.0