From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication

---
 opends/src/server/org/opends/server/replication/server/ServerReader.java |  151 ++++++++++++++++++++++++++++++++++++++++++--------
 1 files changed, 127 insertions(+), 24 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index e61446f..cb66bd2 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -30,6 +30,7 @@
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 
 import java.io.IOException;
@@ -39,6 +40,7 @@
 import org.opends.server.replication.protocol.DoneMessage;
 import org.opends.server.replication.protocol.EntryMessage;
 import org.opends.server.replication.protocol.ErrorMessage;
+import org.opends.server.replication.protocol.ResetGenerationId;
 import org.opends.server.replication.protocol.InitializeRequestMessage;
 import org.opends.server.replication.protocol.InitializeTargetMessage;
 import org.opends.server.replication.protocol.ProtocolSession;
@@ -76,7 +78,7 @@
    * Constructor for the LDAP server reader part of the replicationServer.
    *
    * @param session The ProtocolSession from which to read the data.
-   * @param serverId The server ID of the server from which we read changes.
+   * @param serverId The server ID of the server from which we read messages.
    * @param handler The server handler for this server reader.
    * @param replicationCache The ReplicationCache for this server reader.
    */
@@ -97,14 +99,11 @@
   {
     if (debugEnabled())
     {
-      if (handler.isReplicationServer())
-      {
-        TRACER.debugInfo("Replication server reader starting " + serverId);
-      }
-      else
-      {
-        TRACER.debugInfo("LDAP server reader starting " + serverId);
-      }
+      TRACER.debugInfo(
+          "In RS <" + replicationCache.getReplicationServer().
+          getMonitorInstanceName() +
+          (handler.isReplicationServer()?" RS ":" LS")+
+          " reader starting for serverId=" + serverId);
     }
     /*
      * wait on input stream
@@ -116,6 +115,25 @@
       {
         ReplicationMessage msg = session.receive();
 
+        if (debugEnabled())
+        {
+          if (handler.isReplicationServer())
+          {
+            TRACER.debugInfo(
+                "In RS <" + replicationCache.getReplicationServer().
+                getMonitorInstanceName() +
+                "> from RS server with serverId=" + serverId +
+                " receives " + msg);
+          }
+          else
+          {
+            TRACER.debugInfo(
+                "In RS <" + replicationCache.getReplicationServer().
+                getMonitorInstanceName() +
+                "> from LDAP server with serverId=" + serverId +
+                " receives " + msg);
+          }
+        }
         if (msg instanceof AckMessage)
         {
           AckMessage ack = (AckMessage) msg;
@@ -124,9 +142,22 @@
         }
         else if (msg instanceof UpdateMessage)
         {
-          UpdateMessage update = (UpdateMessage) msg;
-          handler.decAndCheckWindow();
-          replicationCache.put(update, handler);
+          // Ignore update received from a replica with
+          // a bad generation ID
+          long referenceGenerationId = replicationCache.getGenerationId();
+          if ((referenceGenerationId>0) &&
+              (referenceGenerationId != handler.getGenerationId()))
+          {
+            logError(ERR_IGNORING_UPDATE_FROM.get(
+                msg.toString(),
+                handler.getMonitorInstanceName()));
+          }
+          else
+          {
+            UpdateMessage update = (UpdateMessage) msg;
+            handler.decAndCheckWindow();
+            replicationCache.put(update, handler);
+          }
         }
         else if (msg instanceof WindowMessage)
         {
@@ -159,6 +190,11 @@
           ErrorMessage errorMsg = (ErrorMessage) msg;
           handler.process(errorMsg);
         }
+        else if (msg instanceof ResetGenerationId)
+        {
+          ResetGenerationId genIdMsg = (ResetGenerationId) msg;
+          replicationCache.resetGenerationId(this.handler);
+        }
         else if (msg instanceof WindowProbe)
         {
           WindowProbe windowProbeMsg = (WindowProbe) msg;
@@ -168,6 +204,52 @@
         {
           ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
           handler.setReplServerInfo(infoMsg);
+
+          if (debugEnabled())
+          {
+            if (handler.isReplicationServer())
+              TRACER.debugInfo(
+               "In RS " + replicationCache.getReplicationServer().
+               getServerId() +
+               " Receiving replServerInfo from " + handler.getServerId() +
+               " baseDn=" + replicationCache.getBaseDn() +
+               " genId=" + infoMsg.getGenerationId());
+          }
+
+          if (replicationCache.getGenerationId()<0)
+          {
+            // Here is the case where a ReplicationServer receives from
+            // another ReplicationServer the generationId for a domain
+            // for which the generation ID has never been set.
+            replicationCache.setGenerationId(infoMsg.getGenerationId(), false);
+          }
+          else
+          {
+            if (infoMsg.getGenerationId()<0)
+            {
+              // Here is the case where another ReplicationServer
+              // signals that it has no generationId set for the domain.
+              // If we have generationId set locally and no server currently
+              // connected for that domain in the topology then we may also
+              // reset the generationId localy.
+              replicationCache.mayResetGenerationId();
+            }
+
+            if (replicationCache.getGenerationId() != infoMsg.getGenerationId())
+            {
+              Message message = NOTE_BAD_GENERATION_ID.get(
+                  replicationCache.getBaseDn().toNormalizedString(),
+                  Short.toString(handler.getServerId()),
+                  Long.toString(infoMsg.getGenerationId()),
+                  Long.toString(replicationCache.getGenerationId()));
+
+              ErrorMessage errorMsg = new ErrorMessage(
+                  replicationCache.getReplicationServer().getServerId(),
+                  handler.getServerId(),
+                  message);
+              session.publish(errorMsg);
+            }
+          }
         }
         else if (msg == null)
         {
@@ -187,21 +269,40 @@
        * Log a message and exit from this loop
        * So that this handler is stopped.
        */
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In RS <" + replicationCache.getReplicationServer().
+          getMonitorInstanceName() +
+          " reader IO EXCEPTION serverID=" + serverId
+          + stackTraceToSingleLineString(e) + e.getLocalizedMessage() +
+          e.getCause());
       Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
       logError(message);
     } catch (ClassNotFoundException e)
     {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In RS <" + replicationCache.getReplicationServer().
+          getMonitorInstanceName() +
+          " reader CNF EXCEPTION serverID=" + serverId
+          + stackTraceToSingleLineString(e));
       /*
        * The remote server has sent an unknown message,
-       * close the conenction.
+       * close the connection.
        */
       Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
       logError(message);
     } catch (Exception e)
     {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In RS <" + replicationCache.getReplicationServer().
+          getMonitorInstanceName() +
+          " server reader EXCEPTION serverID=" + serverId
+          + stackTraceToSingleLineString(e));
       /*
        * The remote server has sent an unknown message,
-       * close the conenction.
+       * close the connection.
        */
       Message message = NOTE_READER_EXCEPTION.get(handler.toString());
       logError(message);
@@ -213,6 +314,12 @@
        * happen.
        * Attempt to close the socket and stop the server handler.
        */
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In RS <" + replicationCache.getReplicationServer().
+          getMonitorInstanceName() +
+          " reader CLOSE serverID=" + serverId
+          + stackTraceToSingleLineString(new Exception()));
       try
       {
         session.close();
@@ -223,15 +330,11 @@
       replicationCache.stopServer(handler);
     }
     if (debugEnabled())
-    {
-      if (handler.isReplicationServer())
-      {
-        TRACER.debugInfo("Replication server reader stopping " + serverId);
-      }
-      else
-      {
-        TRACER.debugInfo("LDAP server reader stopping " + serverId);
-      }
-    }
+      TRACER.debugInfo(
+          "In RS <" + replicationCache.getReplicationServer().
+          getMonitorInstanceName() +
+          (handler.isReplicationServer()?"RS":"LDAP") +
+          " server reader stopped for serverID=" + serverId
+          + stackTraceToSingleLineString(new Exception()));
   }
 }

--
Gitblit v1.10.0