From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java |  113 ++++++++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 92 insertions(+), 21 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index ed6270d..6733366 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,8 +25,7 @@
  *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
+import org.opends.messages.*;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -93,6 +92,7 @@
   private int maxRcvWindow;
   private int timeout = 0;
   private short protocolVersion;
+  private long generationId = -1;
   private ReplSessionSecurity replSessionSecurity;
 
   /**
@@ -143,12 +143,15 @@
    * @param window The size of the send and receive window to use.
    * @param heartbeatInterval The interval between heartbeats requested of the
    * replicationServer, or zero if no heartbeats are requested.
+   *
+   * @param generationId The generationId for the server associated to the
+   * provided serverID and for the domain associated to the provided baseDN.
    * @param replSessionSecurity The session security configuration.
    */
   public ReplicationBroker(ServerState state, DN baseDn, short serverID,
       int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
       int maxSendDelay, int window, long heartbeatInterval,
-      ReplSessionSecurity replSessionSecurity)
+      long generationId, ReplSessionSecurity replSessionSecurity)
   {
     this.baseDn = baseDn;
     this.serverID = serverID;
@@ -164,6 +167,7 @@
     this.halfRcvWindow = window/2;
     this.heartbeatInterval = heartbeatInterval;
     this.protocolVersion = ProtocolVersion.currentVersion();
+    this.generationId = generationId;
     this.replSessionSecurity = replSessionSecurity;
   }
 
@@ -198,7 +202,7 @@
    */
   private void connect()
   {
-    ReplServerStartMessage startMsg;
+    ReplServerStartMessage replServerStartMsg;
 
     // Stop any existing heartbeat monitor from a previous session.
     if (heartbeatMonitor != null)
@@ -207,8 +211,24 @@
       heartbeatMonitor = null;
     }
 
+    // checkState is true for the first loop on all replication servers
+    // looking for one already up-to-date.
+    // If we found some responding replication servers but none up-to-date
+    // then we set check-state to false and do a second loop where the first
+    // found will be the one elected and then we will update this replication
+    // server.
     boolean checkState = true;
     boolean receivedResponse = true;
+
+    // TODO: We are doing here 2 loops opening , closing , reopening session to
+    // the same servers .. risk to have 'same server id' erros.
+    // Would be better to do only one loop, keeping the best candidate while
+    // traversing the list of replication servers to connect to.
+    if (servers.size()==1)
+    {
+      checkState = false;
+    }
+
     synchronized (connectPhaseLock)
     {
       while ((!connected) && (!shutdown) && (receivedResponse))
@@ -240,7 +260,7 @@
             ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
                 maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
                 halfRcvWindow*2, heartbeatInterval, state,
-                protocolVersion, isSslEncryption);
+                protocolVersion, generationId, isSslEncryption);
             session.publish(msg);
 
 
@@ -248,7 +268,7 @@
              * Read the ReplServerStartMessage that should come back.
              */
             session.setSoTimeout(1000);
-            startMsg = (ReplServerStartMessage) session.receive();
+            replServerStartMsg = (ReplServerStartMessage) session.receive();
             receivedResponse = true;
 
             /*
@@ -257,7 +277,7 @@
              * if it is an old replication server).
              */
             protocolVersion = ProtocolVersion.minWithCurrent(
-                startMsg.getVersion());
+                replServerStartMsg.getVersion());
             session.setSoTimeout(timeout);
 
             if (!isSslEncryption)
@@ -276,7 +296,7 @@
              * those changes and send them again to any replicationServer.
              */
             ChangeNumber replServerMaxChangeNumber =
-              startMsg.getServerState().getMaxChangeNumber(serverID);
+              replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
             if (replServerMaxChangeNumber == null)
               replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
             ChangeNumber ourMaxChangeNumber =
@@ -285,7 +305,7 @@
                 (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
             {
               replicationServer = ServerAddr.toString();
-              maxSendWindow = startMsg.getWindowSize();
+              maxSendWindow = replServerStartMsg.getWindowSize();
               connected = true;
               startHeartBeat();
               break;
@@ -298,7 +318,8 @@
                  * of our changes, we are going to try another server
                  * but before log a notice message
                  */
-                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server);
+                Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
+                    baseDn.toNormalizedString());
                 logError(message);
               }
               else
@@ -341,7 +362,7 @@
                 else
                 {
                   replicationServer = ServerAddr.toString();
-                  maxSendWindow = startMsg.getWindowSize();
+                  maxSendWindow = replServerStartMsg.getWindowSize();
                   connected = true;
                   for (FakeOperation replayOp : replayOperations)
                   {
@@ -371,8 +392,9 @@
           }
           catch (Exception e)
           {
-            Message message =
-                    ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage());
+            Message message = NOTE_EXCEPTION_STARTING_SESSION.get(
+                baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
+                stackTraceToSingleLineString(e));
             logError(message);
           }
           finally
@@ -392,7 +414,9 @@
               }
             }
           }
-        }
+        } // for servers
+
+        // We have traversed all the replication servers
 
         if ((!connected) && (checkState == true) && receivedResponse)
         {
@@ -401,12 +425,16 @@
            * changes that this server has already processed, start again
            * the loop looking for any replicationServer.
            */
-          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get();
+          Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
+              baseDn.toNormalizedString());
           logError(message);
           checkState = false;
         }
       }
 
+      // We have traversed all the replication servers as many times as needed
+      // to find one if one is up and running.
+
       if (connected)
       {
         // This server has connected correctly.
@@ -462,9 +490,9 @@
   /**
    * restart the ReplicationBroker.
    */
-  private void reStart()
+  public void reStart()
   {
-    reStart(null);
+    reStart(this.session);
   }
 
   /**
@@ -472,7 +500,7 @@
    *
    * @param failingSession the socket which failed
    */
-  private void reStart(ProtocolSession failingSession)
+  public void reStart(ProtocolSession failingSession)
   {
     try
     {
@@ -498,7 +526,8 @@
       } catch (Exception e)
       {
         MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_EXCEPTION_STARTING_SESSION.get(e.getMessage()));
+        mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
+         baseDn.toNormalizedString(), e.getLocalizedMessage()));
         mb.append(stackTraceToSingleLineString(e));
         logError(mb.toMessage());
       }
@@ -533,6 +562,13 @@
         // choice than to return without sending the ReplicationMessage
         // and relying on the resend procedure of the connect phase to
         // fix the problem when we finally connect.
+
+        if (debugEnabled())
+        {
+          debugInfo("ReplicationBroker.publish() Publishing a " +
+              " message is not possible due to existing connection error.");
+        }
+
         return;
       }
 
@@ -601,12 +637,22 @@
           } catch (InterruptedException e1)
           {
             // ignore
+            if (debugEnabled())
+            {
+              debugInfo("ReplicationBroker.publish() " +
+                  "IO exception raised : " + e.getLocalizedMessage());
+            }
           }
         }
       }
       catch (InterruptedException e)
       {
         // just loop.
+        if (debugEnabled())
+        {
+          debugInfo("ReplicationBroker.publish() " +
+              "Interrupted exception raised." + e.getLocalizedMessage());
+        }
       }
     }
   }
@@ -628,7 +674,7 @@
     {
       if (!connected)
       {
-        reStart();
+        reStart(null);
       }
 
       ProtocolSession failingSession = session;
@@ -663,6 +709,9 @@
           Message message =
               NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
           logError(message);
+
+          debugInfo("ReplicationBroker.receive() " + baseDn +
+              " Exception raised." + e + e.getLocalizedMessage());
           this.reStart(failingSession);
         }
       }
@@ -683,7 +732,8 @@
     {
       if (debugEnabled())
       {
-        TRACER.debugInfo("ReplicationBroker Stop Closing session");
+        debugInfo("ReplicationBroker is stopping. and will" +
+          "close the connection");
       }
 
       if (session != null)
@@ -713,6 +763,20 @@
   }
 
   /**
+   * Set the value of the generationId for that broker. Normally the
+   * generationId is set through the constructor but there are cases
+   * where the value of the generationId must be changed while the broker
+   * already exist for example after an on-line import.
+   *
+   * @param generationId The value of the generationId.
+   *
+   */
+  public void setGenerationId(long generationId)
+  {
+    this.generationId = generationId;
+  }
+
+  /**
    * Get the name of the replicationServer to which this broker is currently
    * connected.
    *
@@ -857,6 +921,13 @@
     return !connectionError;
   }
 
+  private boolean debugEnabled() { return true; }
+  private static final void debugInfo(String s)
+  {
+    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+    TRACER.debugInfo(s);
+  }
+
   /**
    * Determine whether the connection to the replication server is encrypted.
    * @return true if the connection is encrypted, false otherwise.

--
Gitblit v1.10.0