From b4f8838b15342670c31753a484abf0129e3c9653 Mon Sep 17 00:00:00 2001
From: jcduff <jcduff@localhost>
Date: Thu, 23 Oct 2008 14:04:24 +0000
Subject: [PATCH] The commit will bring the following features :     - An updated version of the underlying database. BDB JE 3.3 is now used.     - Attribute API refactoring providing a better abstraction and offering improved performances.     - A new GUI called the Control-Panel to replace the Status-Panel: the specifications for this       GUI are available on OpenDS Wiki and contains a link to a mockup.        See <https://www.opends.org/wiki/page/ControlPanelUISpecification>.     - Some changes in the replication protocol to implement "Assured Replication Mode". The        specifications are on OpenDS Wiki at <https://www.opends.org/wiki/page/AssuredMode> and section 7       described some of the replication changes required to support this. Assured Replication is not finished,       but the main replication protocol changes to support it are done. As explained by Gilles on an email on       the Dev mailing list (http://markmail.org/message/46rgo3meq3vriy4a), with these changes the newer versions       of OpenDS may not be able to replicate with OpenDS 1.0 instances.     - Support for Service Tags on the platforms where the functionality is available and enabled. Specifications       are published at <https://www.opends.org/wiki/page/OpenDSServiceTagEnabled>. For more information on       Service Tags see <http://wikis.sun.com/display/ServiceTag/Sun+Service+Tag+FAQ>.     - The Admin Connector service. In order to provide agentry of the OpenDS server at any time, a new service       has been added, dedicated to the administration, configuration and monitoring of the server.       An overview of the Admin Connector service and it's use is available on the       OpenDS wiki <https://www.opends.org/wiki/page/ManagingAdministrationTrafficToTheServer>     - Updates to the various command line tools to support the Admin Connector service.     - Some internal re-architecting of the server to put the foundation of future developments such as virtual       directory services. The new NetworkGroups and WorkFlow internal services which have been specified in       <https://www.opends.org/wiki/page/BasicOperationRoutingThroughNetworkGroup> are now implemented.     - Many bug fixes...

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java |  254 +++++++++++++++++++++++++++++++-------------------
 1 files changed, 155 insertions(+), 99 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index ef1b67c..7df0c87 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -25,6 +25,7 @@
  *      Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
+
 import org.opends.messages.Message;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
@@ -36,23 +37,26 @@
 import java.io.IOException;
 
 import org.opends.server.api.DirectoryThread;
-import org.opends.server.replication.protocol.AckMessage;
-import org.opends.server.replication.protocol.DoneMessage;
-import org.opends.server.replication.protocol.EntryMessage;
-import org.opends.server.replication.protocol.ErrorMessage;
-import org.opends.server.replication.protocol.ResetGenerationId;
-import org.opends.server.replication.protocol.InitializeRequestMessage;
-import org.opends.server.replication.protocol.InitializeTargetMessage;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.DoneMsg;
+import org.opends.server.replication.protocol.EntryMsg;
+import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
+import org.opends.server.replication.protocol.InitializeRequestMsg;
+import org.opends.server.replication.protocol.InitializeTargetMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
-import org.opends.server.replication.protocol.ReplicationMessage;
-import org.opends.server.replication.protocol.UpdateMessage;
-import org.opends.server.replication.protocol.WindowMessage;
-import org.opends.server.replication.protocol.WindowProbe;
-import org.opends.server.replication.protocol.ReplServerInfoMessage;
-import org.opends.server.replication.protocol.MonitorMessage;
-import org.opends.server.replication.protocol.MonitorRequestMessage;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.WindowMsg;
+import org.opends.server.replication.protocol.WindowProbeMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
 import org.opends.server.loggers.debug.DebugTracer;
-
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.
+  NotSupportedOldVersionPDUException;
 
 /**
  * This class implement the part of the replicationServer that is reading
@@ -66,11 +70,11 @@
  */
 public class ServerReader extends DirectoryThread
 {
+
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
-
   private short serverId;
   private ProtocolSession session;
   private ServerHandler handler;
@@ -86,10 +90,11 @@
    *        reader.
    */
   public ServerReader(ProtocolSession session, short serverId,
-                      ServerHandler handler,
-                      ReplicationServerDomain replicationServerDomain)
+    ServerHandler handler,
+    ReplicationServerDomain replicationServerDomain)
   {
-    super(handler.toString() + " reader");
+    super("Replication Reader for " + handler.toString() + " in RS " +
+      replicationServerDomain.getReplicationServer().getServerId());
     this.session = session;
     this.serverId = serverId;
     this.handler = handler;
@@ -104,10 +109,10 @@
     if (debugEnabled())
     {
       TRACER.debugInfo(
-          "In RS " + replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() +
-          (handler.isReplicationServer()?" RS ":" LS")+
-          " reader starting for serverId=" + serverId);
+        "In RS " + replicationServerDomain.getReplicationServer().
+        getMonitorInstanceName() +
+        (handler.isReplicationServer() ? " RS " : " LS") +
+        " reader starting for serverId=" + serverId);
     }
     /*
      * wait on input stream
@@ -118,103 +123,146 @@
     {
       while (true)
       {
-        ReplicationMessage msg = session.receive();
+        ReplicationMsg msg = session.receive();
 
         /*
         if (debugEnabled())
         {
-          TRACER.debugInfo(
-              "In RS " + replicationServerDomain.getReplicationServer().
-              getMonitorInstanceName() +
-              (handler.isReplicationServer()?" From RS ":" From LS")+
-              " with serverId=" + serverId + " receives " + msg);
+        TRACER.debugInfo(
+        "In RS " + replicationServerDomain.getReplicationServer().
+        getMonitorInstanceName() +
+        (handler.isReplicationServer()?" From RS ":" From LS")+
+        " with serverId=" + serverId + " receives " + msg);
         }
-        */
-        if (msg instanceof AckMessage)
+         */
+        if (msg instanceof AckMsg)
         {
-          AckMessage ack = (AckMessage) msg;
+          AckMsg ack = (AckMsg) msg;
           handler.checkWindow();
           replicationServerDomain.ack(ack, serverId);
-        }
-        else if (msg instanceof UpdateMessage)
+        } else if (msg instanceof UpdateMsg)
         {
-          // Ignore update received from a replica with
-          // a bad generation ID
-          long referenceGenerationId =
-                  replicationServerDomain.getGenerationId();
-          if ((referenceGenerationId>0) &&
+          boolean filtered = false;
+          /* Ignore updates in some cases */
+          if (handler.isLDAPserver())
+          {
+            /**
+             * Ignore updates from DS in bad BAD_GENID_STATUS or
+             * FULL_UPDATE_STATUS
+             *
+             * The RSD lock should not be taken here as it is acceptable to have
+             * a delay between the time the server has a wrong status and the
+             * fact we detect it: the updates that succeed to pass during this
+             * time will have no impact on remote server. But it is interesting
+             * to not saturate uselessly the network if the updates are not
+             * necessary so this check to stop sending updates is interesting
+             * anyway. Not taking the RSD lock allows to have better
+             * performances in normal mode (most of the time).
+             */
+            ServerStatus dsStatus = handler.getStatus();
+            if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
+              (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+            {
+              long referenceGenerationId =
+                replicationServerDomain.getGenerationId();
+              if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+                logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
+                  Short.toString(replicationServerDomain.getReplicationServer().
+                  getServerId()),
+                  replicationServerDomain.getBaseDn().toNormalizedString(),
+                  ((UpdateMsg) msg).getChangeNumber().toString(),
+                  Short.toString(handler.getServerId()),
+                  Long.toString(referenceGenerationId),
+                  Long.toString(handler.getGenerationId())));
+              if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+                logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
+                  Short.toString(replicationServerDomain.getReplicationServer().
+                  getServerId()),
+                  replicationServerDomain.getBaseDn().toNormalizedString(),
+                  ((UpdateMsg) msg).getChangeNumber().toString(),
+                  Short.toString(handler.getServerId())));
+              filtered = true;
+            }
+          } else
+          {
+            /**
+             * Ignore updates from RS with bad gen id
+             * (no system managed status for a RS)
+             */
+            long referenceGenerationId =
+              replicationServerDomain.getGenerationId();
+            if ((referenceGenerationId > 0) &&
               (referenceGenerationId != handler.getGenerationId()))
-          {
-            logError(ERR_IGNORING_UPDATE_FROM.get(
-                msg.toString(),
-                handler.getMonitorInstanceName()));
+            {
+              logError(ERR_IGNORING_UPDATE_FROM_RS.get(
+                Short.toString(replicationServerDomain.getReplicationServer().
+                getServerId()),
+                replicationServerDomain.getBaseDn().toNormalizedString(),
+                ((UpdateMsg) msg).getChangeNumber().toString(),
+                Short.toString(handler.getServerId()),
+                Long.toString(referenceGenerationId),
+                Long.toString(handler.getGenerationId())));
+              filtered = true;
+            }
           }
-          else
+
+          if (!filtered)
           {
-            UpdateMessage update = (UpdateMessage) msg;
+            UpdateMsg update = (UpdateMsg) msg;
             handler.decAndCheckWindow();
             replicationServerDomain.put(update, handler);
           }
-        }
-        else if (msg instanceof WindowMessage)
+        } else if (msg instanceof WindowMsg)
         {
-          WindowMessage windowMsg = (WindowMessage) msg;
+          WindowMsg windowMsg = (WindowMsg) msg;
           handler.updateWindow(windowMsg);
-        }
-        else if (msg instanceof InitializeRequestMessage)
+        } else if (msg instanceof InitializeRequestMsg)
         {
-          InitializeRequestMessage initializeMsg =
-            (InitializeRequestMessage) msg;
+          InitializeRequestMsg initializeMsg =
+            (InitializeRequestMsg) msg;
           handler.process(initializeMsg);
-        }
-        else if (msg instanceof InitializeTargetMessage)
+        } else if (msg instanceof InitializeTargetMsg)
         {
-          InitializeTargetMessage initializeMsg = (InitializeTargetMessage) msg;
+          InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
           handler.process(initializeMsg);
-        }
-        else if (msg instanceof EntryMessage)
+        } else if (msg instanceof EntryMsg)
         {
-          EntryMessage entryMsg = (EntryMessage) msg;
+          EntryMsg entryMsg = (EntryMsg) msg;
           handler.process(entryMsg);
-        }
-        else if (msg instanceof DoneMessage)
+        } else if (msg instanceof DoneMsg)
         {
-          DoneMessage doneMsg = (DoneMessage) msg;
+          DoneMsg doneMsg = (DoneMsg) msg;
           handler.process(doneMsg);
-        }
-        else if (msg instanceof ErrorMessage)
+        } else if (msg instanceof ErrorMsg)
         {
-          ErrorMessage errorMsg = (ErrorMessage) msg;
+          ErrorMsg errorMsg = (ErrorMsg) msg;
           handler.process(errorMsg);
-        }
-        else if (msg instanceof ResetGenerationId)
+        } else if (msg instanceof ResetGenerationIdMsg)
         {
-          ResetGenerationId genIdMsg = (ResetGenerationId) msg;
-          replicationServerDomain.resetGenerationId(this.handler, genIdMsg);
-        }
-        else if (msg instanceof WindowProbe)
+          ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
+          replicationServerDomain.resetGenerationId(handler, genIdMsg);
+        } else if (msg instanceof WindowProbeMsg)
         {
-          WindowProbe windowProbeMsg = (WindowProbe) msg;
+          WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
           handler.process(windowProbeMsg);
-        }
-        else if (msg instanceof ReplServerInfoMessage)
+        } else if (msg instanceof TopologyMsg)
         {
-          ReplServerInfoMessage infoMsg = (ReplServerInfoMessage)msg;
-          handler.receiveReplServerInfo(infoMsg);
-          replicationServerDomain.receiveReplServerInfo(infoMsg, handler);
-        }
-        else if (msg instanceof MonitorRequestMessage)
+          TopologyMsg topoMsg = (TopologyMsg) msg;
+          replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true);
+        } else if (msg instanceof ChangeStatusMsg)
         {
-          MonitorRequestMessage replServerMonitorRequestMsg =
-            (MonitorRequestMessage) msg;
+          ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
+          replicationServerDomain.processNewStatus(handler, csMsg);
+        } else if (msg instanceof MonitorRequestMsg)
+        {
+          MonitorRequestMsg replServerMonitorRequestMsg =
+            (MonitorRequestMsg) msg;
           handler.process(replServerMonitorRequestMsg);
-        }
-        else if (msg instanceof MonitorMessage)
+        } else if (msg instanceof MonitorMsg)
         {
-          MonitorMessage replServerMonitorMsg = (MonitorMessage) msg;
+          MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
           handler.process(replServerMonitorMsg);
-        }
-        else if (msg == null)
+        } else if (msg == null)
         {
           /*
            * The remote server has sent an unknown message,
@@ -236,9 +284,11 @@
         TRACER.debugInfo(
           "In RS " + replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() +
-          " reader IO EXCEPTION for serverID=" + serverId
-          + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
-      Message message = NOTE_SERVER_DISCONNECT.get(handler.toString());
+          " reader IO EXCEPTION for serverID=" + serverId +
+          stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
+      Message message = NOTE_SERVER_DISCONNECT.get(handler.toString(),
+        Short.toString(replicationServerDomain.
+        getReplicationServer().getServerId()));
       logError(message);
     } catch (ClassNotFoundException e)
     {
@@ -246,30 +296,36 @@
         TRACER.debugInfo(
           "In RS <" + replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() +
-          " reader CNF EXCEPTION serverID=" + serverId
-          + stackTraceToSingleLineString(e));
+          " reader CNF EXCEPTION serverID=" + serverId +
+          stackTraceToSingleLineString(e));
       /*
        * The remote server has sent an unknown message,
        * close the connection.
        */
       Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
       logError(message);
+    } catch (NotSupportedOldVersionPDUException e)
+    {
+      // Received a V1 PDU we do not need to support:
+      // we just trash the message and log the event for debug purpose
+      if (debugEnabled())
+      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
+        getMonitorInstanceName() + ":" + e.getMessage());
     } catch (Exception e)
     {
       if (debugEnabled())
         TRACER.debugInfo(
           "In RS <" + replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() +
-          " server reader EXCEPTION serverID=" + serverId
-          + stackTraceToSingleLineString(e));
+          " server reader EXCEPTION serverID=" + serverId +
+          stackTraceToSingleLineString(e));
       /*
        * The remote server has sent an unknown message,
        * close the connection.
        */
       Message message = NOTE_READER_EXCEPTION.get(handler.toString());
       logError(message);
-    }
-    finally
+    } finally
     {
       /*
        * The thread only exit the loop above is some error condition
@@ -287,15 +343,15 @@
         session.close();
       } catch (IOException e)
       {
-       // ignore
+      // ignore
       }
       replicationServerDomain.stopServer(handler);
     }
     if (debugEnabled())
       TRACER.debugInfo(
-          "In RS " + replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() +
-          (handler.isReplicationServer()?" RS":" LDAP") +
-          " server reader stopped for serverID=" + serverId);
+        "In RS " + replicationServerDomain.getReplicationServer().
+        getMonitorInstanceName() +
+        (handler.isReplicationServer() ? " RS" : " LDAP") +
+        " server reader stopped for serverID=" + serverId);
   }
 }

--
Gitblit v1.10.0