From 23b1e20ff9fe938572a0b62ec5a12f12154445df Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 04 Jul 2007 15:12:04 +0000
Subject: [PATCH] The problem was that the publisher thread is stuck waiting for the window to re-open on a connection that has been closed without notifying the publisher.

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java |  510 ++++++++++++++++++++++++++++++++------------------------
 1 files changed, 294 insertions(+), 216 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index bb298eb..c1fdbb6 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -44,6 +44,7 @@
 import java.util.LinkedHashSet;
 import java.util.TreeSet;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.opends.server.protocols.asn1.ASN1OctetString;
 import org.opends.server.protocols.internal.InternalClientConnection;
@@ -60,6 +61,7 @@
 import org.opends.server.replication.protocol.SocketSession;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.protocol.WindowMessage;
+import org.opends.server.replication.protocol.WindowProbe;
 import org.opends.server.types.DN;
 import org.opends.server.types.DereferencePolicy;
 import org.opends.server.types.ErrorLogCategory;
@@ -83,7 +85,6 @@
   private boolean shutdown = false;
   private Collection<String> servers;
   private boolean connected = false;
-  private final Object lock = new Object();
   private String replicationServer = "Not connected";
   private TreeSet<FakeOperation> replayOperations;
   private ProtocolSession session = null;
@@ -120,7 +121,7 @@
   private int numLostConnections = 0;
 
   /**
-   * When the broker cannort connect to any replication server
+   * When the broker cannot connect to any replication server
    * it log an error and keeps continuing every second.
    * This boolean is set when the first failure happens and is used
    * to avoid repeating the error message for further failure to connect
@@ -129,6 +130,8 @@
    */
   private boolean connectionError = false;
 
+  private Object connectPhaseLock = new Object();
+
   /**
    * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
    *
@@ -217,234 +220,243 @@
 
     boolean checkState = true;
     boolean receivedResponse = true;
-    while ((!connected) && (!shutdown) && (receivedResponse))
+    synchronized (connectPhaseLock)
     {
-      receivedResponse = false;
-      for (String server : servers)
+      while ((!connected) && (!shutdown) && (receivedResponse))
       {
-        int separator = server.lastIndexOf(':');
-        String port = server.substring(separator + 1);
-        String hostname = server.substring(0, separator);
-
-        try
+        receivedResponse = false;
+        for (String server : servers)
         {
-          /*
-           * Open a socket connection to the next candidate.
-           */
-          InetSocketAddress ServerAddr = new InetSocketAddress(
-              InetAddress.getByName(hostname), Integer.parseInt(port));
-          Socket socket = new Socket();
-          socket.setReceiveBufferSize(1000000);
-          socket.setTcpNoDelay(true);
-          socket.connect(ServerAddr, 500);
-          session = new SocketSession(socket);
+          int separator = server.lastIndexOf(':');
+          String port = server.substring(separator + 1);
+          String hostname = server.substring(0, separator);
 
-          /*
-           * Send our ServerStartMessage.
-           */
-          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
-              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
-              halfRcvWindow*2, heartbeatInterval, state,
-              protocolVersion);
-          session.publish(msg);
-
-
-          /*
-           * Read the ReplServerStartMessage that should come back.
-           */
-          session.setSoTimeout(1000);
-          startMsg = (ReplServerStartMessage) session.receive();
-          receivedResponse = true;
-
-          /*
-           * We have sent our own protocol version to the replication server.
-           * The replication server will use the same one (or an older one
-           * if it is an old replication server).
-           */
-          protocolVersion = ProtocolVersion.minWithCurrent(
-              startMsg.getVersion());
-          session.setSoTimeout(timeout);
-
-          /*
-           * We must not publish changes to a replicationServer that has not
-           * seen all our previous changes because this could cause some
-           * other ldap servers to miss those changes.
-           * Check that the ReplicationServer has seen all our previous changes.
-           * If not, try another replicationServer.
-           * If no other replicationServer has seen all our changes, recover
-           * those changes and send them again to any replicationServer.
-           */
-          ChangeNumber replServerMaxChangeNumber =
-            startMsg.getServerState().getMaxChangeNumber(serverID);
-          if (replServerMaxChangeNumber == null)
-            replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
-          ChangeNumber ourMaxChangeNumber =  state.getMaxChangeNumber(serverID);
-          if ((ourMaxChangeNumber == null) ||
-              (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
+          try
           {
-            replicationServer = ServerAddr.toString();
-            maxSendWindow = startMsg.getWindowSize();
-            this.sendWindow = new Semaphore(maxSendWindow);
-            connected = true;
-            startHeartBeat();
-            break;
-          }
-          else
-          {
-            if (checkState == true)
+            /*
+             * Open a socket connection to the next candidate.
+             */
+            InetSocketAddress ServerAddr = new InetSocketAddress(
+                InetAddress.getByName(hostname), Integer.parseInt(port));
+            Socket socket = new Socket();
+            socket.setReceiveBufferSize(1000000);
+            socket.setTcpNoDelay(true);
+            socket.connect(ServerAddr, 500);
+            session = new SocketSession(socket);
+
+            /*
+             * Send our ServerStartMessage.
+             */
+            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
+                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+                halfRcvWindow*2, heartbeatInterval, state,
+                protocolVersion);
+            session.publish(msg);
+
+
+            /*
+             * Read the ReplServerStartMessage that should come back.
+             */
+            session.setSoTimeout(1000);
+            startMsg = (ReplServerStartMessage) session.receive();
+            receivedResponse = true;
+
+            /*
+             * We have sent our own protocol version to the replication server.
+             * The replication server will use the same one (or an older one
+             * if it is an old replication server).
+             */
+            protocolVersion = ProtocolVersion.minWithCurrent(
+                startMsg.getVersion());
+            session.setSoTimeout(timeout);
+
+            /*
+             * We must not publish changes to a replicationServer that has not
+             * seen all our previous changes because this could cause some
+             * other ldap servers to miss those changes.
+             * Check that the ReplicationServer has seen all our previous
+             * changes.
+             * If not, try another replicationServer.
+             * If no other replicationServer has seen all our changes, recover
+             * those changes and send them again to any replicationServer.
+             */
+            ChangeNumber replServerMaxChangeNumber =
+              startMsg.getServerState().getMaxChangeNumber(serverID);
+            if (replServerMaxChangeNumber == null)
+              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
+            ChangeNumber ourMaxChangeNumber =
+              state.getMaxChangeNumber(serverID);
+            if ((ourMaxChangeNumber == null) ||
+                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
             {
-              /* This replicationServer is missing some
-               * of our changes, we are going to try another server
-               * but before log a notice message
-               */
-              int    msgID   = MSGID_CHANGELOG_MISSING_CHANGES;
-              String message = getMessage(msgID, server);
-              logError(ErrorLogCategory.SYNCHRONIZATION,
-                       ErrorLogSeverity.NOTICE,
-                       message, msgID);
+              replicationServer = ServerAddr.toString();
+              maxSendWindow = startMsg.getWindowSize();
+              connected = true;
+              startHeartBeat();
+              break;
             }
             else
             {
-              replayOperations.clear();
-              /*
-               * Get all the changes that have not been seen by this
-               * replicationServer and update it
-               */
-              InternalClientConnection conn =
-                  InternalClientConnection.getRootConnection();
-              LDAPFilter filter = LDAPFilter.decode(
-                  "("+ Historical.HISTORICALATTRIBUTENAME +
-                  ">=dummy:" + replServerMaxChangeNumber + ")");
-              LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
-              attrs.add(Historical.HISTORICALATTRIBUTENAME);
-              InternalSearchOperation op = conn.processSearch(
-                  new ASN1OctetString(baseDn.toString()),
-                  SearchScope.WHOLE_SUBTREE,
-                  DereferencePolicy.NEVER_DEREF_ALIASES,
-                  0, 0, false, filter,
-                  attrs, this);
-              if (op.getResultCode() != ResultCode.SUCCESS)
+              if (checkState == true)
               {
-                /*
-                 * An error happened trying to search for the updates
-                 * This server therefore can't start acepting new updates.
-                 * TODO : should stop the LDAP server (how to ?)
+                /* This replicationServer is missing some
+                 * of our changes, we are going to try another server
+                 * but before log a notice message
                  */
-                int    msgID   = MSGID_CANNOT_RECOVER_CHANGES;
-                String message = getMessage(msgID);
+                int    msgID   = MSGID_CHANGELOG_MISSING_CHANGES;
+                String message = getMessage(msgID, server);
                 logError(ErrorLogCategory.SYNCHRONIZATION,
-                         ErrorLogSeverity.FATAL_ERROR,
-                         message, msgID);
+                    ErrorLogSeverity.NOTICE,
+                    message, msgID);
               }
               else
               {
-                replicationServer = ServerAddr.toString();
-                maxSendWindow = startMsg.getWindowSize();
-                this.sendWindow = new Semaphore(maxSendWindow);
-                connected = true;
-                for (FakeOperation replayOp : replayOperations)
+                replayOperations.clear();
+                logError(ErrorLogCategory.SYNCHRONIZATION,
+                    ErrorLogSeverity.NOTICE,
+                    "going to search for changes", 1);
+                /*
+                 * Get all the changes that have not been seen by this
+                 * replicationServer and update it
+                 */
+                InternalClientConnection conn =
+                  InternalClientConnection.getRootConnection();
+                LDAPFilter filter = LDAPFilter.decode(
+                    "("+ Historical.HISTORICALATTRIBUTENAME +
+                    ">=dummy:" + replServerMaxChangeNumber + ")");
+                LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+                attrs.add(Historical.HISTORICALATTRIBUTENAME);
+                InternalSearchOperation op = conn.processSearch(
+                    new ASN1OctetString(baseDn.toString()),
+                    SearchScope.WHOLE_SUBTREE,
+                    DereferencePolicy.NEVER_DEREF_ALIASES,
+                    0, 0, false, filter,
+                    attrs, this);
+                if (op.getResultCode() != ResultCode.SUCCESS)
                 {
-                  publish(replayOp.generateMessage());
+                  /*
+                   * An error happened trying to search for the updates
+                   * This server will start acepting again new updates but
+                   * some inconsistencies will stay between servers.
+                   * TODO : REPAIR : log an error for the repair tool
+                   * that will need to resynchronize the servers.
+                   */
+                  int    msgID   = MSGID_CANNOT_RECOVER_CHANGES;
+                  String message = getMessage(msgID);
+                  logError(ErrorLogCategory.SYNCHRONIZATION,
+                      ErrorLogSeverity.FATAL_ERROR,
+                      message, msgID);
                 }
-                startHeartBeat();
-                break;
+                else
+                {
+                  replicationServer = ServerAddr.toString();
+                  maxSendWindow = startMsg.getWindowSize();
+                  connected = true;
+                  for (FakeOperation replayOp : replayOperations)
+                  {
+                    logError(ErrorLogCategory.SYNCHRONIZATION,
+                        ErrorLogSeverity.NOTICE,
+                        "sendingChange", 1);
+                    session.publish(replayOp.generateMessage());
+                  }
+                  startHeartBeat();
+                  logError(ErrorLogCategory.SYNCHRONIZATION,
+                      ErrorLogSeverity.NOTICE,
+                      "changes sent", 1);
+                  break;
+                }
+              }
+            }
+          }
+          catch (ConnectException e)
+          {
+            /*
+             * There was no server waiting on this host:port
+             * Log a notice and try the next replicationServer in the list
+             */
+            if (!connectionError )
+            {
+              // the error message is only logged once to avoid overflowing
+              // the error log
+              int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
+              String message = getMessage(msgID, server);
+              logError(ErrorLogCategory.SYNCHRONIZATION,
+                  ErrorLogSeverity.NOTICE,
+                  message, msgID);
+            }
+          }
+          catch (Exception e)
+          {
+            int    msgID   = MSGID_EXCEPTION_STARTING_SESSION;
+            String message = getMessage(msgID);
+            logError(ErrorLogCategory.SYNCHRONIZATION,
+                ErrorLogSeverity.SEVERE_ERROR,
+                message + stackTraceToSingleLineString(e), msgID);
+          }
+          finally
+          {
+            if (connected == false)
+            {
+              if (session != null)
+              {
+                session.close();
+                session = null;
               }
             }
           }
         }
-        catch (ConnectException e)
+
+        if ((!connected) && (checkState == true) && receivedResponse)
         {
           /*
-           * There was no server waiting on this host:port
-           * Log a notice and try the next replicationServer in the list
+           * We could not find a replicationServer that has seen all the
+           * changes that this server has already processed, start again
+           * the loop looking for any replicationServer.
            */
-          if (!connectionError )
-          {
-            // the error message is only logged once to avoid overflowing
-            // the error log
-            int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
-            String message = getMessage(msgID, server);
-            logError(ErrorLogCategory.SYNCHRONIZATION,
-                ErrorLogSeverity.NOTICE,
-                message, msgID);
-          }
-        }
-        catch (Exception e)
-        {
-          int    msgID   = MSGID_EXCEPTION_STARTING_SESSION;
-          String message = getMessage(msgID)  + stackTraceToSingleLineString(e);
+          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
+          String message = getMessage(msgID);
           logError(ErrorLogCategory.SYNCHRONIZATION,
-                   ErrorLogSeverity.SEVERE_ERROR,
-                   message, msgID);
-        }
-        finally
-        {
-          if (connected == false)
-          {
-            if (session != null)
-            {
-              logError(ErrorLogCategory.SYNCHRONIZATION,
-                  ErrorLogSeverity.NOTICE,
-                  "Broker : connect closing session" , 1);
-
-              session.close();
-              session = null;
-            }
-          }
+              ErrorLogSeverity.NOTICE,
+              message, msgID);
+          checkState = false;
         }
       }
 
-      if ((!connected) && (checkState == true) && receivedResponse)
+      if (connected)
+      {
+        // This server has connected correctly.
+        // Log a message to let the administrator know that the failure was
+        // resolved.
+        // wakeup all the thread that were waiting on the window
+        // on the previous connection.
+        connectionError = false;
+        if (sendWindow != null)
+          sendWindow.release(Integer.MAX_VALUE);
+        this.sendWindow = new Semaphore(maxSendWindow);
+        connectPhaseLock.notify();
+        int    msgID   = MSGID_NOW_FOUND_CHANGELOG;
+        String message =
+          getMessage(msgID, replicationServer, baseDn.toString());
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.NOTICE, message, msgID);
+      }
+      else
       {
         /*
-         * We could not find a replicationServer that has seen all the
-         * changes that this server has already processed, start again
-         * the loop looking for any replicationServer.
+         * This server could not find any replicationServer
+         * It's going to start in degraded mode.
+         * Log a message
          */
-        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
-        String message = getMessage(msgID);
-        logError(ErrorLogCategory.SYNCHRONIZATION,
-            ErrorLogSeverity.NOTICE,
-            message, msgID);
-        try
+        if (!connectionError)
         {
-          Thread.sleep(500);
-        } catch (InterruptedException e)
-        {
+          checkState = false;
+          connectionError = true;
+          connectPhaseLock.notify();
+          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
+          String message = getMessage(msgID, baseDn.toString());
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+              ErrorLogSeverity.NOTICE, message, msgID);
         }
-        checkState = false;
-      }
-    }
-
-    if (connected)
-    {
-      // This server has connected correctly.
-      // let's check if it was previosuly on error, in this case log
-      // a message to let the administratot know that the failure was resolved.
-      if (connectionError)
-      {
-        connectionError = false;
-        int    msgID   = MSGID_NOW_FOUND_CHANGELOG;
-        String message = getMessage(msgID, baseDn.toString());
-        logError(ErrorLogCategory.SYNCHRONIZATION,
-            ErrorLogSeverity.NOTICE, message, msgID);
-      }
-    }
-    else
-    {
-      /*
-       * This server could not find any replicationServer
-       * It's going to start in degraded mode.
-       * Log a message
-       */
-      if (!connectionError)
-      {
-        checkState = false;
-        connectionError = true;
-        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
-        String message = getMessage(msgID, baseDn.toString());
-        logError(ErrorLogCategory.SYNCHRONIZATION,
-            ErrorLogSeverity.NOTICE, message, msgID);
       }
     }
   }
@@ -530,30 +542,90 @@
   public void publish(ReplicationMessage msg)
   {
     boolean done = false;
-    ProtocolSession failingSession = session;
 
     while (!done)
     {
       if (connectionError)
-        return;
-      synchronized (lock)
       {
-        try
+        // It was not possible to connect to any replication server.
+        // Since the operation was already processed, we have no other
+        // choice than to return without sending the ReplicationMessage
+        // and relying on the resend procedure of the connect phase to
+        // fix the problem when we finally connect.
+        return;
+      }
+
+      try
+      {
+        boolean credit;
+        ProtocolSession current_session;
+        Semaphore currentWindowSemaphore;
+
+        // save the session at the time when we acquire the
+        // sendwindow credit so that we can make sure later
+        // that the session did not change in between.
+        // This is necessary to make sure that we don't publish a message
+        // on a session with a credit that was acquired from a previous
+        // session.
+        synchronized (connectPhaseLock)
         {
-          if (this.connected == false)
-            this.reStart(failingSession);
-          if (msg instanceof UpdateMessage)
-            sendWindow.acquire();
-          session.publish(msg);
-          done = true;
-        } catch (IOException e)
-        {
-          this.reStart(failingSession);
+          current_session = session;
+          currentWindowSemaphore = sendWindow;
         }
-        catch (InterruptedException e)
+
+        if (msg instanceof UpdateMessage)
         {
-          this.reStart(failingSession);
+          // Acquiring the window credit must be done outside of the
+          // connectPhaseLock because it can be blocking and we don't
+          // want to hold off reconnection in case the connection dropped.
+          credit =
+            currentWindowSemaphore.tryAcquire(
+                (long) 500, TimeUnit.MILLISECONDS);
         }
+        else
+        {
+          credit = true;
+        }
+        if (credit)
+        {
+          synchronized (connectPhaseLock)
+          {
+            // check the session. If it has changed, some
+            // deconnection/reconnection happened and we need to restart from
+            // scratch.
+            if (session == current_session)
+            {
+              session.publish(msg);
+              done = true;
+            }
+          }
+        }
+        if (!credit)
+        {
+          // the window is still closed.
+          // Send a WindowProbe message to wakeup the receiver in case the
+          // window update message was lost somehow...
+          // then loop to check again if connection was closed.
+          session.publish(new WindowProbe());
+        }
+      } catch (IOException e)
+      {
+        // The receive threads should handle reconnection or
+        // mark this broker in error. Just retry.
+        synchronized (connectPhaseLock)
+        {
+          try
+          {
+            connectPhaseLock.wait(100);
+          } catch (InterruptedException e1)
+          {
+            // ignore
+          }
+        }
+      }
+      catch (InterruptedException e)
+      {
+        // just loop.
       }
     }
   }
@@ -561,6 +633,10 @@
 
   /**
    * Receive a message.
+   * This method is not multithread safe and should either always be
+   * called in a single thread or protected by a locking mechanism
+   * before being called.
+   *
    * @return the received message
    * @throws SocketTimeoutException if the timeout set by setSoTimeout
    *         has expired
@@ -603,10 +679,12 @@
       {
         if (shutdown == false)
         {
-          synchronized (lock)
-          {
-            this.reStart(failingSession);
-          }
+          int    msgID   = MSGID_DISCONNECTED_FROM_CHANGELOG;
+          String message = getMessage(msgID, replicationServer);
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+              ErrorLogSeverity.NOTICE,
+              message + " " + e.getMessage(), msgID);
+          this.reStart(failingSession);
         }
       }
     }

--
Gitblit v1.10.0