From 7ee84fd2ff21c5e25b9a234f974c08b49070fcaf Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Thu, 30 Oct 2008 16:30:36 +0000
Subject: [PATCH] Fix for #3543: Replication protocol incompatibility between v1 and v2: cannot upgrade a running replicated topology from v1 to v2

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java |  282 ++++++++++++++++++++++++++++---------------------------
 1 files changed, 144 insertions(+), 138 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 7df0c87..78c70fa 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -123,50 +123,80 @@
     {
       while (true)
       {
-        ReplicationMsg msg = session.receive();
+        try
+        {
+          ReplicationMsg msg = session.receive();
 
-        /*
-        if (debugEnabled())
-        {
-        TRACER.debugInfo(
-        "In RS " + replicationServerDomain.getReplicationServer().
-        getMonitorInstanceName() +
-        (handler.isReplicationServer()?" From RS ":" From LS")+
-        " with serverId=" + serverId + " receives " + msg);
-        }
-         */
-        if (msg instanceof AckMsg)
-        {
-          AckMsg ack = (AckMsg) msg;
-          handler.checkWindow();
-          replicationServerDomain.ack(ack, serverId);
-        } else if (msg instanceof UpdateMsg)
-        {
-          boolean filtered = false;
-          /* Ignore updates in some cases */
-          if (handler.isLDAPserver())
+          /*
+          if (debugEnabled())
           {
-            /**
-             * Ignore updates from DS in bad BAD_GENID_STATUS or
-             * FULL_UPDATE_STATUS
-             *
-             * The RSD lock should not be taken here as it is acceptable to have
-             * a delay between the time the server has a wrong status and the
-             * fact we detect it: the updates that succeed to pass during this
-             * time will have no impact on remote server. But it is interesting
-             * to not saturate uselessly the network if the updates are not
-             * necessary so this check to stop sending updates is interesting
-             * anyway. Not taking the RSD lock allows to have better
-             * performances in normal mode (most of the time).
-             */
-            ServerStatus dsStatus = handler.getStatus();
-            if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
-              (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+          TRACER.debugInfo(
+          "In RS " + replicationServerDomain.getReplicationServer().
+          getMonitorInstanceName() +
+          (handler.isReplicationServer()?" From RS ":" From LS")+
+          " with serverId=" + serverId + " receives " + msg);
+          }
+           */
+          if (msg instanceof AckMsg)
+          {
+            AckMsg ack = (AckMsg) msg;
+            handler.checkWindow();
+            replicationServerDomain.ack(ack, serverId);
+          } else if (msg instanceof UpdateMsg)
+          {
+            boolean filtered = false;
+            /* Ignore updates in some cases */
+            if (handler.isLDAPserver())
             {
+              /**
+               * Ignore updates from DS in bad BAD_GENID_STATUS or
+               * FULL_UPDATE_STATUS
+               *
+               * The RSD lock should not be taken here as it is acceptable to
+               * have a delay between the time the server has a wrong status and
+               * the fact we detect it: the updates that succeed to pass during
+               * this time will have no impact on remote server. But it is
+               * interesting to not saturate uselessly the network if the
+               * updates are not necessary so this check to stop sending updates
+               * is interesting anyway. Not taking the RSD lock allows to have
+               * better performances in normal mode (most of the time).
+               */
+              ServerStatus dsStatus = handler.getStatus();
+              if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
+                (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+              {
+                long referenceGenerationId =
+                  replicationServerDomain.getGenerationId();
+                if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+                  logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
+                    Short.toString(replicationServerDomain.
+                    getReplicationServer().getServerId()),
+                    replicationServerDomain.getBaseDn().toNormalizedString(),
+                    ((UpdateMsg) msg).getChangeNumber().toString(),
+                    Short.toString(handler.getServerId()),
+                    Long.toString(referenceGenerationId),
+                    Long.toString(handler.getGenerationId())));
+                if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+                  logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
+                    Short.toString(replicationServerDomain.
+                    getReplicationServer().getServerId()),
+                    replicationServerDomain.getBaseDn().toNormalizedString(),
+                    ((UpdateMsg) msg).getChangeNumber().toString(),
+                    Short.toString(handler.getServerId())));
+                filtered = true;
+              }
+            } else
+            {
+              /**
+               * Ignore updates from RS with bad gen id
+               * (no system managed status for a RS)
+               */
               long referenceGenerationId =
                 replicationServerDomain.getGenerationId();
-              if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
-                logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
+              if ((referenceGenerationId > 0) &&
+                (referenceGenerationId != handler.getGenerationId()))
+              {
+                logError(ERR_IGNORING_UPDATE_FROM_RS.get(
                   Short.toString(replicationServerDomain.getReplicationServer().
                   getServerId()),
                   replicationServerDomain.getBaseDn().toNormalizedString(),
@@ -174,103 +204,86 @@
                   Short.toString(handler.getServerId()),
                   Long.toString(referenceGenerationId),
                   Long.toString(handler.getGenerationId())));
-              if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
-                logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
-                  Short.toString(replicationServerDomain.getReplicationServer().
-                  getServerId()),
-                  replicationServerDomain.getBaseDn().toNormalizedString(),
-                  ((UpdateMsg) msg).getChangeNumber().toString(),
-                  Short.toString(handler.getServerId())));
-              filtered = true;
+                filtered = true;
+              }
             }
-          } else
-          {
-            /**
-             * Ignore updates from RS with bad gen id
-             * (no system managed status for a RS)
-             */
-            long referenceGenerationId =
-              replicationServerDomain.getGenerationId();
-            if ((referenceGenerationId > 0) &&
-              (referenceGenerationId != handler.getGenerationId()))
-            {
-              logError(ERR_IGNORING_UPDATE_FROM_RS.get(
-                Short.toString(replicationServerDomain.getReplicationServer().
-                getServerId()),
-                replicationServerDomain.getBaseDn().toNormalizedString(),
-                ((UpdateMsg) msg).getChangeNumber().toString(),
-                Short.toString(handler.getServerId()),
-                Long.toString(referenceGenerationId),
-                Long.toString(handler.getGenerationId())));
-              filtered = true;
-            }
-          }
 
-          if (!filtered)
+            if (!filtered)
+            {
+              UpdateMsg update = (UpdateMsg) msg;
+              handler.decAndCheckWindow();
+              replicationServerDomain.put(update, handler);
+            }
+          } else if (msg instanceof WindowMsg)
           {
-            UpdateMsg update = (UpdateMsg) msg;
-            handler.decAndCheckWindow();
-            replicationServerDomain.put(update, handler);
+            WindowMsg windowMsg = (WindowMsg) msg;
+            handler.updateWindow(windowMsg);
+          } else if (msg instanceof InitializeRequestMsg)
+          {
+            InitializeRequestMsg initializeMsg =
+              (InitializeRequestMsg) msg;
+            handler.process(initializeMsg);
+          } else if (msg instanceof InitializeTargetMsg)
+          {
+            InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
+            handler.process(initializeMsg);
+          } else if (msg instanceof EntryMsg)
+          {
+            EntryMsg entryMsg = (EntryMsg) msg;
+            handler.process(entryMsg);
+          } else if (msg instanceof DoneMsg)
+          {
+            DoneMsg doneMsg = (DoneMsg) msg;
+            handler.process(doneMsg);
+          } else if (msg instanceof ErrorMsg)
+          {
+            ErrorMsg errorMsg = (ErrorMsg) msg;
+            handler.process(errorMsg);
+          } else if (msg instanceof ResetGenerationIdMsg)
+          {
+            ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
+            replicationServerDomain.resetGenerationId(handler, genIdMsg);
+          } else if (msg instanceof WindowProbeMsg)
+          {
+            WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
+            handler.process(windowProbeMsg);
+          } else if (msg instanceof TopologyMsg)
+          {
+            TopologyMsg topoMsg = (TopologyMsg) msg;
+            replicationServerDomain.receiveTopoInfoFromRS(topoMsg,
+              handler, true);
+          } else if (msg instanceof ChangeStatusMsg)
+          {
+            ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
+            replicationServerDomain.processNewStatus(handler, csMsg);
+          } else if (msg instanceof MonitorRequestMsg)
+          {
+            MonitorRequestMsg replServerMonitorRequestMsg =
+              (MonitorRequestMsg) msg;
+            handler.process(replServerMonitorRequestMsg);
+          } else if (msg instanceof MonitorMsg)
+          {
+            MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
+            handler.process(replServerMonitorMsg);
+          } else if (msg == null)
+          {
+            /*
+             * The remote server has sent an unknown message,
+             * close the conenction.
+             */
+            Message message = NOTE_READER_NULL_MSG.get(handler.toString());
+            logError(message);
+            return;
           }
-        } else if (msg instanceof WindowMsg)
+        } catch (NotSupportedOldVersionPDUException e)
         {
-          WindowMsg windowMsg = (WindowMsg) msg;
-          handler.updateWindow(windowMsg);
-        } else if (msg instanceof InitializeRequestMsg)
-        {
-          InitializeRequestMsg initializeMsg =
-            (InitializeRequestMsg) msg;
-          handler.process(initializeMsg);
-        } else if (msg instanceof InitializeTargetMsg)
-        {
-          InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
-          handler.process(initializeMsg);
-        } else if (msg instanceof EntryMsg)
-        {
-          EntryMsg entryMsg = (EntryMsg) msg;
-          handler.process(entryMsg);
-        } else if (msg instanceof DoneMsg)
-        {
-          DoneMsg doneMsg = (DoneMsg) msg;
-          handler.process(doneMsg);
-        } else if (msg instanceof ErrorMsg)
-        {
-          ErrorMsg errorMsg = (ErrorMsg) msg;
-          handler.process(errorMsg);
-        } else if (msg instanceof ResetGenerationIdMsg)
-        {
-          ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
-          replicationServerDomain.resetGenerationId(handler, genIdMsg);
-        } else if (msg instanceof WindowProbeMsg)
-        {
-          WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
-          handler.process(windowProbeMsg);
-        } else if (msg instanceof TopologyMsg)
-        {
-          TopologyMsg topoMsg = (TopologyMsg) msg;
-          replicationServerDomain.receiveTopoInfoFromRS(topoMsg, handler, true);
-        } else if (msg instanceof ChangeStatusMsg)
-        {
-          ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
-          replicationServerDomain.processNewStatus(handler, csMsg);
-        } else if (msg instanceof MonitorRequestMsg)
-        {
-          MonitorRequestMsg replServerMonitorRequestMsg =
-            (MonitorRequestMsg) msg;
-          handler.process(replServerMonitorRequestMsg);
-        } else if (msg instanceof MonitorMsg)
-        {
-          MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
-          handler.process(replServerMonitorMsg);
-        } else if (msg == null)
-        {
-          /*
-           * The remote server has sent an unknown message,
-           * close the conenction.
-           */
-          Message message = NOTE_READER_NULL_MSG.get(handler.toString());
-          logError(message);
-          return;
+          // Received a V1 PDU we do not need to support:
+          // we just trash the message and log the event for debug purpose,
+          // then continue receiving messages.
+          if (debugEnabled())
+            TRACER.debugInfo("In " + replicationServerDomain.
+              getReplicationServer().
+              getMonitorInstanceName() + ":" + e.getMessage());
         }
       }
     } catch (IOException e)
@@ -304,13 +317,6 @@
        */
       Message message = ERR_UNKNOWN_MESSAGE.get(handler.toString());
       logError(message);
-    } catch (NotSupportedOldVersionPDUException e)
-    {
-      // Received a V1 PDU we do not need to support:
-      // we just trash the message and log the event for debug purpose
-      if (debugEnabled())
-      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
-        getMonitorInstanceName() + ":" + e.getMessage());
     } catch (Exception e)
     {
       if (debugEnabled())

--
Gitblit v1.10.0