From e2447ca29d7539529ef05a40a26abc2f7ae35d8c Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Wed, 04 Jul 2007 15:12:04 +0000
Subject: [PATCH] The problem was that the publisher thread is stuck waiting for the window to re-open on a connection that has been closed without notifying the publisher.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java                                |  510 ++++++++++++++++++++++----------------
 opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java                                        |   11 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java |   14 +
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java                |    3 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java                                    |   31 ++
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java                                   |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java                                     |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java                             |    4 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java    |   60 ++++
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java                               |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java                                |    4 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMessage.java                                  |   12 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbe.java                                    |   84 ++++++
 13 files changed, 516 insertions(+), 229 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java b/opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java
index 938c941..72f4ad0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/messages/ReplicationMessages.java
@@ -444,6 +444,12 @@
 
 
   /**
+   * The connection to the curent Replication Server has failed.
+   */
+  public static final int MSGID_DISCONNECTED_FROM_CHANGELOG =
+    CATEGORY_MASK_SYNC | SEVERITY_MASK_NOTICE | 63;
+
+  /**
    * Register the messages from this class in the core server.
    *
    */
@@ -607,6 +613,9 @@
         "The Replication is configured for suffix  %s "
         + "but was not able to connect to any Replication Server");
     registerMessage(MSGID_NOW_FOUND_CHANGELOG,
-        "A Replication Server was found for suffix %s");
+        "Replication Server %s now used for Replication Domain %s");
+    registerMessage(MSGID_DISCONNECTED_FROM_CHANGELOG,
+        "The connection to Replication Server %s has been dropped by the "
+        + "Replication Server");
   }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
index 70d43f9..36b7003 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingMatchingRule.java
@@ -54,7 +54,6 @@
   public HistoricalCsnOrderingMatchingRule()
   {
     super();
-    // TODO Auto-generated constructor stub
   }
 
   /**
@@ -81,7 +80,7 @@
   @Override
   public void initializeMatchingRule(OrderingMatchingRuleCfg configuration)
   {
-    // TODO Auto-generated method stub
+    // No implementation needed here.
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index 7d181f0..ff5105f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -106,7 +106,7 @@
    *
    * @return The number of update currently in the list.
    */
-  public synchronized int size()
+  public int size()
   {
     return pendingChanges.size();
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index bb298eb..c1fdbb6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -44,6 +44,7 @@
 import java.util.LinkedHashSet;
 import java.util.TreeSet;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.opends.server.protocols.asn1.ASN1OctetString;
 import org.opends.server.protocols.internal.InternalClientConnection;
@@ -60,6 +61,7 @@
 import org.opends.server.replication.protocol.SocketSession;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.protocol.WindowMessage;
+import org.opends.server.replication.protocol.WindowProbe;
 import org.opends.server.types.DN;
 import org.opends.server.types.DereferencePolicy;
 import org.opends.server.types.ErrorLogCategory;
@@ -83,7 +85,6 @@
   private boolean shutdown = false;
   private Collection<String> servers;
   private boolean connected = false;
-  private final Object lock = new Object();
   private String replicationServer = "Not connected";
   private TreeSet<FakeOperation> replayOperations;
   private ProtocolSession session = null;
@@ -120,7 +121,7 @@
   private int numLostConnections = 0;
 
   /**
-   * When the broker cannort connect to any replication server
+   * When the broker cannot connect to any replication server
    * it log an error and keeps continuing every second.
    * This boolean is set when the first failure happens and is used
    * to avoid repeating the error message for further failure to connect
@@ -129,6 +130,8 @@
    */
   private boolean connectionError = false;
 
+  private Object connectPhaseLock = new Object();
+
   /**
    * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
    *
@@ -217,234 +220,243 @@
 
     boolean checkState = true;
     boolean receivedResponse = true;
-    while ((!connected) && (!shutdown) && (receivedResponse))
+    synchronized (connectPhaseLock)
     {
-      receivedResponse = false;
-      for (String server : servers)
+      while ((!connected) && (!shutdown) && (receivedResponse))
       {
-        int separator = server.lastIndexOf(':');
-        String port = server.substring(separator + 1);
-        String hostname = server.substring(0, separator);
-
-        try
+        receivedResponse = false;
+        for (String server : servers)
         {
-          /*
-           * Open a socket connection to the next candidate.
-           */
-          InetSocketAddress ServerAddr = new InetSocketAddress(
-              InetAddress.getByName(hostname), Integer.parseInt(port));
-          Socket socket = new Socket();
-          socket.setReceiveBufferSize(1000000);
-          socket.setTcpNoDelay(true);
-          socket.connect(ServerAddr, 500);
-          session = new SocketSession(socket);
+          int separator = server.lastIndexOf(':');
+          String port = server.substring(separator + 1);
+          String hostname = server.substring(0, separator);
 
-          /*
-           * Send our ServerStartMessage.
-           */
-          ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
-              maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
-              halfRcvWindow*2, heartbeatInterval, state,
-              protocolVersion);
-          session.publish(msg);
-
-
-          /*
-           * Read the ReplServerStartMessage that should come back.
-           */
-          session.setSoTimeout(1000);
-          startMsg = (ReplServerStartMessage) session.receive();
-          receivedResponse = true;
-
-          /*
-           * We have sent our own protocol version to the replication server.
-           * The replication server will use the same one (or an older one
-           * if it is an old replication server).
-           */
-          protocolVersion = ProtocolVersion.minWithCurrent(
-              startMsg.getVersion());
-          session.setSoTimeout(timeout);
-
-          /*
-           * We must not publish changes to a replicationServer that has not
-           * seen all our previous changes because this could cause some
-           * other ldap servers to miss those changes.
-           * Check that the ReplicationServer has seen all our previous changes.
-           * If not, try another replicationServer.
-           * If no other replicationServer has seen all our changes, recover
-           * those changes and send them again to any replicationServer.
-           */
-          ChangeNumber replServerMaxChangeNumber =
-            startMsg.getServerState().getMaxChangeNumber(serverID);
-          if (replServerMaxChangeNumber == null)
-            replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
-          ChangeNumber ourMaxChangeNumber =  state.getMaxChangeNumber(serverID);
-          if ((ourMaxChangeNumber == null) ||
-              (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
+          try
           {
-            replicationServer = ServerAddr.toString();
-            maxSendWindow = startMsg.getWindowSize();
-            this.sendWindow = new Semaphore(maxSendWindow);
-            connected = true;
-            startHeartBeat();
-            break;
-          }
-          else
-          {
-            if (checkState == true)
+            /*
+             * Open a socket connection to the next candidate.
+             */
+            InetSocketAddress ServerAddr = new InetSocketAddress(
+                InetAddress.getByName(hostname), Integer.parseInt(port));
+            Socket socket = new Socket();
+            socket.setReceiveBufferSize(1000000);
+            socket.setTcpNoDelay(true);
+            socket.connect(ServerAddr, 500);
+            session = new SocketSession(socket);
+
+            /*
+             * Send our ServerStartMessage.
+             */
+            ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
+                maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+                halfRcvWindow*2, heartbeatInterval, state,
+                protocolVersion);
+            session.publish(msg);
+
+
+            /*
+             * Read the ReplServerStartMessage that should come back.
+             */
+            session.setSoTimeout(1000);
+            startMsg = (ReplServerStartMessage) session.receive();
+            receivedResponse = true;
+
+            /*
+             * We have sent our own protocol version to the replication server.
+             * The replication server will use the same one (or an older one
+             * if it is an old replication server).
+             */
+            protocolVersion = ProtocolVersion.minWithCurrent(
+                startMsg.getVersion());
+            session.setSoTimeout(timeout);
+
+            /*
+             * We must not publish changes to a replicationServer that has not
+             * seen all our previous changes because this could cause some
+             * other ldap servers to miss those changes.
+             * Check that the ReplicationServer has seen all our previous
+             * changes.
+             * If not, try another replicationServer.
+             * If no other replicationServer has seen all our changes, recover
+             * those changes and send them again to any replicationServer.
+             */
+            ChangeNumber replServerMaxChangeNumber =
+              startMsg.getServerState().getMaxChangeNumber(serverID);
+            if (replServerMaxChangeNumber == null)
+              replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
+            ChangeNumber ourMaxChangeNumber =
+              state.getMaxChangeNumber(serverID);
+            if ((ourMaxChangeNumber == null) ||
+                (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
             {
-              /* This replicationServer is missing some
-               * of our changes, we are going to try another server
-               * but before log a notice message
-               */
-              int    msgID   = MSGID_CHANGELOG_MISSING_CHANGES;
-              String message = getMessage(msgID, server);
-              logError(ErrorLogCategory.SYNCHRONIZATION,
-                       ErrorLogSeverity.NOTICE,
-                       message, msgID);
+              replicationServer = ServerAddr.toString();
+              maxSendWindow = startMsg.getWindowSize();
+              connected = true;
+              startHeartBeat();
+              break;
             }
             else
             {
-              replayOperations.clear();
-              /*
-               * Get all the changes that have not been seen by this
-               * replicationServer and update it
-               */
-              InternalClientConnection conn =
-                  InternalClientConnection.getRootConnection();
-              LDAPFilter filter = LDAPFilter.decode(
-                  "("+ Historical.HISTORICALATTRIBUTENAME +
-                  ">=dummy:" + replServerMaxChangeNumber + ")");
-              LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
-              attrs.add(Historical.HISTORICALATTRIBUTENAME);
-              InternalSearchOperation op = conn.processSearch(
-                  new ASN1OctetString(baseDn.toString()),
-                  SearchScope.WHOLE_SUBTREE,
-                  DereferencePolicy.NEVER_DEREF_ALIASES,
-                  0, 0, false, filter,
-                  attrs, this);
-              if (op.getResultCode() != ResultCode.SUCCESS)
+              if (checkState == true)
               {
-                /*
-                 * An error happened trying to search for the updates
-                 * This server therefore can't start acepting new updates.
-                 * TODO : should stop the LDAP server (how to ?)
+                /* This replicationServer is missing some
+                 * of our changes, we are going to try another server
+                 * but before log a notice message
                  */
-                int    msgID   = MSGID_CANNOT_RECOVER_CHANGES;
-                String message = getMessage(msgID);
+                int    msgID   = MSGID_CHANGELOG_MISSING_CHANGES;
+                String message = getMessage(msgID, server);
                 logError(ErrorLogCategory.SYNCHRONIZATION,
-                         ErrorLogSeverity.FATAL_ERROR,
-                         message, msgID);
+                    ErrorLogSeverity.NOTICE,
+                    message, msgID);
               }
               else
               {
-                replicationServer = ServerAddr.toString();
-                maxSendWindow = startMsg.getWindowSize();
-                this.sendWindow = new Semaphore(maxSendWindow);
-                connected = true;
-                for (FakeOperation replayOp : replayOperations)
+                replayOperations.clear();
+                logError(ErrorLogCategory.SYNCHRONIZATION,
+                    ErrorLogSeverity.NOTICE,
+                    "going to search for changes", 1);
+                /*
+                 * Get all the changes that have not been seen by this
+                 * replicationServer and update it
+                 */
+                InternalClientConnection conn =
+                  InternalClientConnection.getRootConnection();
+                LDAPFilter filter = LDAPFilter.decode(
+                    "("+ Historical.HISTORICALATTRIBUTENAME +
+                    ">=dummy:" + replServerMaxChangeNumber + ")");
+                LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+                attrs.add(Historical.HISTORICALATTRIBUTENAME);
+                InternalSearchOperation op = conn.processSearch(
+                    new ASN1OctetString(baseDn.toString()),
+                    SearchScope.WHOLE_SUBTREE,
+                    DereferencePolicy.NEVER_DEREF_ALIASES,
+                    0, 0, false, filter,
+                    attrs, this);
+                if (op.getResultCode() != ResultCode.SUCCESS)
                 {
-                  publish(replayOp.generateMessage());
+                  /*
+                   * An error happened trying to search for the updates
+                   * This server will start acepting again new updates but
+                   * some inconsistencies will stay between servers.
+                   * TODO : REPAIR : log an error for the repair tool
+                   * that will need to resynchronize the servers.
+                   */
+                  int    msgID   = MSGID_CANNOT_RECOVER_CHANGES;
+                  String message = getMessage(msgID);
+                  logError(ErrorLogCategory.SYNCHRONIZATION,
+                      ErrorLogSeverity.FATAL_ERROR,
+                      message, msgID);
                 }
-                startHeartBeat();
-                break;
+                else
+                {
+                  replicationServer = ServerAddr.toString();
+                  maxSendWindow = startMsg.getWindowSize();
+                  connected = true;
+                  for (FakeOperation replayOp : replayOperations)
+                  {
+                    logError(ErrorLogCategory.SYNCHRONIZATION,
+                        ErrorLogSeverity.NOTICE,
+                        "sendingChange", 1);
+                    session.publish(replayOp.generateMessage());
+                  }
+                  startHeartBeat();
+                  logError(ErrorLogCategory.SYNCHRONIZATION,
+                      ErrorLogSeverity.NOTICE,
+                      "changes sent", 1);
+                  break;
+                }
+              }
+            }
+          }
+          catch (ConnectException e)
+          {
+            /*
+             * There was no server waiting on this host:port
+             * Log a notice and try the next replicationServer in the list
+             */
+            if (!connectionError )
+            {
+              // the error message is only logged once to avoid overflowing
+              // the error log
+              int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
+              String message = getMessage(msgID, server);
+              logError(ErrorLogCategory.SYNCHRONIZATION,
+                  ErrorLogSeverity.NOTICE,
+                  message, msgID);
+            }
+          }
+          catch (Exception e)
+          {
+            int    msgID   = MSGID_EXCEPTION_STARTING_SESSION;
+            String message = getMessage(msgID);
+            logError(ErrorLogCategory.SYNCHRONIZATION,
+                ErrorLogSeverity.SEVERE_ERROR,
+                message + stackTraceToSingleLineString(e), msgID);
+          }
+          finally
+          {
+            if (connected == false)
+            {
+              if (session != null)
+              {
+                session.close();
+                session = null;
               }
             }
           }
         }
-        catch (ConnectException e)
+
+        if ((!connected) && (checkState == true) && receivedResponse)
         {
           /*
-           * There was no server waiting on this host:port
-           * Log a notice and try the next replicationServer in the list
+           * We could not find a replicationServer that has seen all the
+           * changes that this server has already processed, start again
+           * the loop looking for any replicationServer.
            */
-          if (!connectionError )
-          {
-            // the error message is only logged once to avoid overflowing
-            // the error log
-            int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
-            String message = getMessage(msgID, server);
-            logError(ErrorLogCategory.SYNCHRONIZATION,
-                ErrorLogSeverity.NOTICE,
-                message, msgID);
-          }
-        }
-        catch (Exception e)
-        {
-          int    msgID   = MSGID_EXCEPTION_STARTING_SESSION;
-          String message = getMessage(msgID)  + stackTraceToSingleLineString(e);
+          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
+          String message = getMessage(msgID);
           logError(ErrorLogCategory.SYNCHRONIZATION,
-                   ErrorLogSeverity.SEVERE_ERROR,
-                   message, msgID);
-        }
-        finally
-        {
-          if (connected == false)
-          {
-            if (session != null)
-            {
-              logError(ErrorLogCategory.SYNCHRONIZATION,
-                  ErrorLogSeverity.NOTICE,
-                  "Broker : connect closing session" , 1);
-
-              session.close();
-              session = null;
-            }
-          }
+              ErrorLogSeverity.NOTICE,
+              message, msgID);
+          checkState = false;
         }
       }
 
-      if ((!connected) && (checkState == true) && receivedResponse)
+      if (connected)
+      {
+        // This server has connected correctly.
+        // Log a message to let the administrator know that the failure was
+        // resolved.
+        // wakeup all the thread that were waiting on the window
+        // on the previous connection.
+        connectionError = false;
+        if (sendWindow != null)
+          sendWindow.release(Integer.MAX_VALUE);
+        this.sendWindow = new Semaphore(maxSendWindow);
+        connectPhaseLock.notify();
+        int    msgID   = MSGID_NOW_FOUND_CHANGELOG;
+        String message =
+          getMessage(msgID, replicationServer, baseDn.toString());
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.NOTICE, message, msgID);
+      }
+      else
       {
         /*
-         * We could not find a replicationServer that has seen all the
-         * changes that this server has already processed, start again
-         * the loop looking for any replicationServer.
+         * This server could not find any replicationServer
+         * It's going to start in degraded mode.
+         * Log a message
          */
-        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES;
-        String message = getMessage(msgID);
-        logError(ErrorLogCategory.SYNCHRONIZATION,
-            ErrorLogSeverity.NOTICE,
-            message, msgID);
-        try
+        if (!connectionError)
         {
-          Thread.sleep(500);
-        } catch (InterruptedException e)
-        {
+          checkState = false;
+          connectionError = true;
+          connectPhaseLock.notify();
+          int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
+          String message = getMessage(msgID, baseDn.toString());
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+              ErrorLogSeverity.NOTICE, message, msgID);
         }
-        checkState = false;
-      }
-    }
-
-    if (connected)
-    {
-      // This server has connected correctly.
-      // let's check if it was previosuly on error, in this case log
-      // a message to let the administratot know that the failure was resolved.
-      if (connectionError)
-      {
-        connectionError = false;
-        int    msgID   = MSGID_NOW_FOUND_CHANGELOG;
-        String message = getMessage(msgID, baseDn.toString());
-        logError(ErrorLogCategory.SYNCHRONIZATION,
-            ErrorLogSeverity.NOTICE, message, msgID);
-      }
-    }
-    else
-    {
-      /*
-       * This server could not find any replicationServer
-       * It's going to start in degraded mode.
-       * Log a message
-       */
-      if (!connectionError)
-      {
-        checkState = false;
-        connectionError = true;
-        int    msgID   = MSGID_COULD_NOT_FIND_CHANGELOG;
-        String message = getMessage(msgID, baseDn.toString());
-        logError(ErrorLogCategory.SYNCHRONIZATION,
-            ErrorLogSeverity.NOTICE, message, msgID);
       }
     }
   }
@@ -530,30 +542,90 @@
   public void publish(ReplicationMessage msg)
   {
     boolean done = false;
-    ProtocolSession failingSession = session;
 
     while (!done)
     {
       if (connectionError)
-        return;
-      synchronized (lock)
       {
-        try
+        // It was not possible to connect to any replication server.
+        // Since the operation was already processed, we have no other
+        // choice than to return without sending the ReplicationMessage
+        // and relying on the resend procedure of the connect phase to
+        // fix the problem when we finally connect.
+        return;
+      }
+
+      try
+      {
+        boolean credit;
+        ProtocolSession current_session;
+        Semaphore currentWindowSemaphore;
+
+        // save the session at the time when we acquire the
+        // sendwindow credit so that we can make sure later
+        // that the session did not change in between.
+        // This is necessary to make sure that we don't publish a message
+        // on a session with a credit that was acquired from a previous
+        // session.
+        synchronized (connectPhaseLock)
         {
-          if (this.connected == false)
-            this.reStart(failingSession);
-          if (msg instanceof UpdateMessage)
-            sendWindow.acquire();
-          session.publish(msg);
-          done = true;
-        } catch (IOException e)
-        {
-          this.reStart(failingSession);
+          current_session = session;
+          currentWindowSemaphore = sendWindow;
         }
-        catch (InterruptedException e)
+
+        if (msg instanceof UpdateMessage)
         {
-          this.reStart(failingSession);
+          // Acquiring the window credit must be done outside of the
+          // connectPhaseLock because it can be blocking and we don't
+          // want to hold off reconnection in case the connection dropped.
+          credit =
+            currentWindowSemaphore.tryAcquire(
+                (long) 500, TimeUnit.MILLISECONDS);
         }
+        else
+        {
+          credit = true;
+        }
+        if (credit)
+        {
+          synchronized (connectPhaseLock)
+          {
+            // check the session. If it has changed, some
+            // deconnection/reconnection happened and we need to restart from
+            // scratch.
+            if (session == current_session)
+            {
+              session.publish(msg);
+              done = true;
+            }
+          }
+        }
+        if (!credit)
+        {
+          // the window is still closed.
+          // Send a WindowProbe message to wakeup the receiver in case the
+          // window update message was lost somehow...
+          // then loop to check again if connection was closed.
+          session.publish(new WindowProbe());
+        }
+      } catch (IOException e)
+      {
+        // The receive threads should handle reconnection or
+        // mark this broker in error. Just retry.
+        synchronized (connectPhaseLock)
+        {
+          try
+          {
+            connectPhaseLock.wait(100);
+          } catch (InterruptedException e1)
+          {
+            // ignore
+          }
+        }
+      }
+      catch (InterruptedException e)
+      {
+        // just loop.
       }
     }
   }
@@ -561,6 +633,10 @@
 
   /**
    * Receive a message.
+   * This method is not multithread safe and should either always be
+   * called in a single thread or protected by a locking mechanism
+   * before being called.
+   *
    * @return the received message
    * @throws SocketTimeoutException if the timeout set by setSoTimeout
    *         has expired
@@ -603,10 +679,12 @@
       {
         if (shutdown == false)
         {
-          synchronized (lock)
-          {
-            this.reStart(failingSession);
-          }
+          int    msgID   = MSGID_DISCONNECTED_FROM_CHANGELOG;
+          String message = getMessage(msgID, replicationServer);
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+              ErrorLogSeverity.NOTICE,
+              message + " " + e.getMessage(), msgID);
+          this.reStart(failingSession);
         }
       }
     }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 29f3aa6..ac429e8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -561,7 +561,7 @@
   {
     if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
     {
-      // this policy imply that we always aceept updates.
+      // this policy imply that we always accept updates.
       return true;
     }
     if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
@@ -587,7 +587,7 @@
       }
     }
     // we should never get there as the only possible policies are
-    // ACCEPT_UPDATES and DENY_UPDATES
+    // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
     return true;
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
index 7cac348..3dffce0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
@@ -59,7 +59,7 @@
   @Override()
   public void initializeMonitorProvider(MonitorProviderCfg configuration)
   {
-    // TODO Auto-generated method stub
+    // no implementation needed.
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
index 3cd873d..15847ed 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -52,6 +52,7 @@
   static final byte MSG_TYPE_ENTRY = 12;
   static final byte MSG_TYPE_DONE = 13;
   static final byte MSG_TYPE_ERROR = 14;
+  static final byte MSG_TYPE_WINDOW_PROBE = 15;
   // Adding a new type of message here probably requires to
   // change accordingly generateMsg method below
 
@@ -136,6 +137,9 @@
       case MSG_TYPE_ERROR:
         msg = new ErrorMessage(buffer);
       break;
+      case MSG_TYPE_WINDOW_PROBE:
+        msg = new WindowProbe(buffer);
+      break;
       default:
         throw new DataFormatException("received message with unknown type");
     }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMessage.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMessage.java
index de39bf3..c311b3b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMessage.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowMessage.java
@@ -32,9 +32,13 @@
 
 
 /**
- * This message is used by LDAP server when they first connect.
- * to a replication server to let them know who they are and what is their state
- * (their RUV)
+ * This message is used by LDAP server or by Replication Servers to
+ * update the send window of the remote entities.
+ *
+ * A receiving entity should create such a message with a given credit
+ * when it wants to open the send window of the remote entity.
+ * A LDAP or Replication Server should increase its send window when receiving
+ * such a message.
  */
 public class WindowMessage extends ReplicationMessage implements
     Serializable
@@ -47,7 +51,7 @@
    * Create a new WindowMessage.
    *
    * @param numAck The number of acknowledged messages.
-   *               The window will be increase by this number.
+   *               The window will be increase by this credit number.
    */
   public WindowMessage(int numAck)
   {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbe.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbe.java
new file mode 100644
index 0000000..f30a219
--- /dev/null
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbe.java
@@ -0,0 +1,84 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.Serializable;
+import java.util.zip.DataFormatException;
+
+
+/**
+ * This message is used by LDAP or Replication Server that have been
+ * out of credit for a while and want to check that the remote servers.
+ *
+ * A sending entity that is blocked because its send window is closed
+ * for a while should create such a message to check that the window
+ * closure is valid.
+ *
+ * An entity that received such a message should respond with a
+ * WindowUpdate message indicating the curent credit available.
+ */
+public class WindowProbe extends ReplicationMessage implements
+    Serializable
+{
+  private static final long serialVersionUID = 8442267608764026867L;
+
+  /**
+   * Create a new WindowProbe message.
+   */
+  public WindowProbe()
+  {
+  }
+
+  /**
+   * Creates a new WindowProbe from its encoded form.
+   *
+   * @param in The byte array containing the encoded form of the
+   *           WindowMessage.
+   * @throws DataFormatException If the byte array does not contain a valid
+   *                             encoded form of the WindowMessage.
+   */
+  public WindowProbe(byte[] in) throws DataFormatException
+  {
+    // WindowProbe Message only contains its type.
+    if (in[0] != MSG_TYPE_WINDOW_PROBE)
+      throw new DataFormatException("input is not a valid Window Message");
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes()
+  {
+    // WindowProbe Message only contains its type.
+
+    byte[] resultByteArray = new byte[1];
+    resultByteArray[0] = MSG_TYPE_WINDOW_PROBE;
+
+    return resultByteArray;
+  }
+}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a98df94..7a0f72c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -60,6 +60,7 @@
 import org.opends.server.replication.protocol.ReplicationMessage;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.protocol.WindowMessage;
+import org.opends.server.replication.protocol.WindowProbe;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeType;
 import org.opends.server.types.AttributeValue;
@@ -1372,4 +1373,34 @@
 
     session.publish(msg);
   }
+
+  /**
+   * Process the reception of a WindowProbe message.
+   *
+   * @param  windowProbeMsg The message to process.
+   *
+   * @throws IOException    When the session becomes unavailable.
+   */
+  public void process(WindowProbe windowProbeMsg) throws IOException
+  {
+    if (rcvWindow > 0)
+    {
+      // The LDAP server believes that its window is closed
+      // while it is not, this means that some problem happened in the
+      // window exchange procedure !
+      // lets update the LDAP server with out current window size and hope
+      // that everything will work better in the futur.
+      // TODO also log an error message.
+      WindowMessage msg = new WindowMessage(rcvWindow);
+      session.publish(msg);
+      outAckCount++;
+    }
+    else
+    {
+      // Both the LDAP server and the replication server believes that the
+      // window is closed. Lets check the flowcontrol in case we
+      // can now resume operations and send a windowMessage if necessary.
+      checkWindow();
+    }
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 41fb483..16373fc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -45,6 +45,7 @@
 import org.opends.server.replication.protocol.ReplicationMessage;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.protocol.WindowMessage;
+import org.opends.server.replication.protocol.WindowProbe;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
 import org.opends.server.loggers.debug.DebugTracer;
@@ -159,6 +160,11 @@
           ErrorMessage errorMsg = (ErrorMessage) msg;
           handler.process(errorMsg);
         }
+        else if (msg instanceof WindowProbe)
+        {
+          WindowProbe windowProbeMsg = (WindowProbe) msg;
+          handler.process(windowProbeMsg);
+        }
         else if (msg == null)
         {
           /*
@@ -184,7 +190,7 @@
       String message = getMessage(msgID, handler.toString());
       logError(ErrorLogCategory.SYNCHRONIZATION,
                ErrorLogSeverity.NOTICE,
-               message + e.getMessage(), msgID);
+               message + ": " + e.getMessage(), msgID);
     } catch (ClassNotFoundException e)
     {
       /*
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index bd48b77..c203fea 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -534,7 +534,7 @@
 
   /**
    * Test that WindowMessageTest encoding and decoding works
-   * by checking that : msg == new WindowMessageTest(msg.getBytes()).
+   * by checking that : msg == new WindowMessage(msg.getBytes()).
    */
   @Test()
   public void WindowMessageTest() throws Exception
@@ -543,6 +543,18 @@
     WindowMessage newMsg = new WindowMessage(msg.getBytes());
     assertEquals(msg.getNumAck(), newMsg.getNumAck());
   }
+  
+  /**
+   * Test that WindowProbe encoding and decoding works
+   * by checking that : new WindowProbe(msg.getBytes()) does not throws
+   * an exception.
+   */
+  @Test()
+  public void WindowProbeTest() throws Exception
+  {
+    WindowProbe msg = new WindowProbe();
+    new WindowProbe(msg.getBytes());
+  }
 
   /**
    * Test that EntryMessage encoding and decoding works
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index b1a1eb2..15a03e3 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -27,11 +27,15 @@
 package org.opends.server.replication.server;
 
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
 import static org.opends.server.replication.protocol.OperationContext.*;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.net.Socket;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.SortedSet;
@@ -49,7 +53,13 @@
 import org.opends.server.replication.protocol.ModifyDNMsg;
 import org.opends.server.replication.protocol.ModifyDnContext;
 import org.opends.server.replication.protocol.ModifyMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartMessage;
 import org.opends.server.replication.protocol.ReplicationMessage;
+import org.opends.server.replication.protocol.ServerStartMessage;
+import org.opends.server.replication.protocol.SocketSession;
+import org.opends.server.replication.protocol.WindowMessage;
+import org.opends.server.replication.protocol.WindowProbe;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.DN;
@@ -748,6 +758,56 @@
   }
 
   /**
+   * Test that the Replication sends back correctly WindowsUpdate
+   * when we send a WindowProbe.
+   */
+  @Test()
+  public void windowProbeTest() throws Exception
+  {
+    final int WINDOW = 10;
+    /*
+     * Open a socket connection to the replication server
+     */
+    InetSocketAddress ServerAddr = new InetSocketAddress(
+        InetAddress.getByName("localhost"), replicationServerPort);
+    Socket socket = new Socket();
+    socket.setReceiveBufferSize(1000000);
+    socket.setTcpNoDelay(true);
+    socket.connect(ServerAddr, 500);
+    SocketSession session = new SocketSession(socket);
+
+    /*
+     * Send our ServerStartMessage.
+     */
+    ServerStartMessage msg =
+      new ServerStartMessage((short) 1723, DN.decode("dc=example,dc=com"),
+          0, 0, 0, 0, WINDOW, (long) 5000, new ServerState(),
+          ProtocolVersion.currentVersion());
+    session.publish(msg);
+
+    /*
+     * Read the ReplServerStartMessage that should come back.
+     */
+    session.setSoTimeout(10000);
+    ReplServerStartMessage replStartMsg =
+      (ReplServerStartMessage) session.receive();
+    int serverwindow = replStartMsg.getWindowSize();
+
+    // push a WindowProbe message 
+    session.publish(new WindowProbe());
+    
+    WindowMessage windowMsg = (WindowMessage) session.receive();
+    assertEquals(serverwindow, windowMsg.getNumAck());
+    
+    // check that this did not change the window by sending a probe again.
+    session.publish(new WindowProbe());
+    
+    windowMsg = (WindowMessage) session.receive();
+    assertEquals(serverwindow, windowMsg.getNumAck());
+  }
+
+
+  /**
    * After the tests stop the replicationServer.
    */
   @AfterClass()

--
Gitblit v1.10.0