From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication

---
 opends/src/server/org/opends/server/replication/server/ServerHandler.java |  313 +++++++++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 286 insertions(+), 27 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 858a498..cc97ccf 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -51,6 +51,8 @@
  *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
+
+import org.opends.messages.*;
 import org.opends.messages.MessageBuilder;
 
 import static org.opends.server.loggers.ErrorLogger.logError;
@@ -67,6 +69,7 @@
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.Semaphore;
@@ -143,6 +146,7 @@
   private short replicationServerId;
 
   private short protocolVersion;
+  private long generationId=-1;
 
 
   /**
@@ -189,7 +193,7 @@
    * Then create the reader and writer thread.
    *
    * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
-   *               null if this is an incoming connection.
+   *               null if this is an incoming connection (listen).
    * @param replicationServerId The identifier of the replicationServer that
    *                            creates this server handler.
    * @param replicationServerURL The URL of the replicationServer that creates
@@ -206,22 +210,34 @@
                     int windowSize, boolean sslEncryption,
                     ReplicationServer replicationServer)
   {
+    if (debugEnabled())
+      TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() +
+                " starts a new LS or RS " +
+                ((baseDn == null)?"incoming connection":"outgoing connection"));
+
     this.replicationServerId = replicationServerId;
     rcvWindowSizeHalf = windowSize/2;
     maxRcvWindow = windowSize;
     rcvWindow = windowSize;
+    long localGenerationId=-1;
     try
     {
       if (baseDn != null)
       {
         // This is an outgoing connection. Publish our start message.
         this.baseDn = baseDn;
-        replicationCache = replicationServer.getReplicationCache(baseDn);
+
+        // Get or create the ReplicationCache
+        replicationCache = replicationServer.getReplicationCache(baseDn, true);
+        localGenerationId = replicationCache.getGenerationId();
+
         ServerState localServerState = replicationCache.getDbServerState();
         ReplServerStartMessage msg =
           new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                     baseDn, windowSize, localServerState,
-                                    protocolVersion, sslEncryption);
+                                    protocolVersion, localGenerationId,
+                                    sslEncryption);
+
         session.publish(msg);
       }
 
@@ -229,9 +245,10 @@
       ReplicationMessage msg = session.receive();
       if (msg instanceof ServerStartMessage)
       {
-        // The remote server is an LDAP Server
+        // The remote server is an LDAP Server.
         ServerStartMessage receivedMsg = (ServerStartMessage) msg;
 
+        generationId = receivedMsg.getGenerationId();
         protocolVersion = ProtocolVersion.minWithCurrent(
             receivedMsg.getVersion());
         serverId = receivedMsg.getServerId();
@@ -281,15 +298,69 @@
 
         serverIsLDAPserver = true;
 
-        // This an incoming connection. Publish our start message
-        replicationCache = replicationServer.getReplicationCache(this.baseDn);
+        // Get or Create the ReplicationCache
+        replicationCache = replicationServer.getReplicationCache(this.baseDn,
+            true);
+        localGenerationId = replicationCache.getGenerationId();
+
         ServerState localServerState = replicationCache.getDbServerState();
+        // This an incoming connection. Publish our start message
         ReplServerStartMessage myStartMsg =
           new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                     this.baseDn, windowSize, localServerState,
-                                    protocolVersion, sslEncryption);
+                                    protocolVersion, localGenerationId,
+                                    sslEncryption);
         session.publish(myStartMsg);
         sendWindowSize = receivedMsg.getWindowSize();
+
+        /* Until here session is encrypted then it depends on the negociation */
+        if (!sslEncryption)
+        {
+          session.stopEncryption();
+        }
+
+        if (debugEnabled())
+        {
+          Set<String> ss = this.serverState.toStringSet();
+          Set<String> lss = replicationCache.getDbServerState().toStringSet();
+          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+                   getMonitorInstanceName() +
+                   ", SH received START from LS serverId=" + serverId +
+                   " baseDN=" + this.baseDn +
+                   " generationId=" + generationId +
+                   " localGenerationId=" + localGenerationId +
+                   " state=" + ss +
+                   " and sent ReplServerStart with state=" + lss);
+        }
+
+        /*
+         * If we have already a generationID set for the domain
+         * then
+         *   if the connecting replica has not the same
+         *   then it is degraded locally and notified by an error message
+         * else
+         *   we set the generationID from the one received
+         *   (unsaved yet on disk . will be set with the 1rst change received)
+         */
+        if (localGenerationId>0)
+        {
+          if (generationId != localGenerationId)
+          {
+            Message message = NOTE_BAD_GENERATION_ID.get(
+                receivedMsg.getBaseDn().toNormalizedString(),
+                Short.toString(receivedMsg.getServerId()),
+                Long.toString(generationId),
+                Long.toString(localGenerationId));
+
+            ErrorMessage errorMsg =
+              new ErrorMessage(replicationServerId, serverId, message);
+            session.publish(errorMsg);
+          }
+        }
+        else
+        {
+          replicationCache.setGenerationId(generationId, false);
+        }
       }
       else if (msg instanceof ReplServerStartMessage)
       {
@@ -297,6 +368,7 @@
         ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
         protocolVersion = ProtocolVersion.minWithCurrent(
             receivedMsg.getVersion());
+        generationId = receivedMsg.getGenerationId();
         serverId = receivedMsg.getServerId();
         serverURL = receivedMsg.getServerURL();
         int separator = serverURL.lastIndexOf(':');
@@ -306,7 +378,10 @@
         this.baseDn = receivedMsg.getBaseDn();
         if (baseDn == null)
         {
-          replicationCache = replicationServer.getReplicationCache(this.baseDn);
+          // Get or create the ReplicationCache
+          replicationCache = replicationServer.getReplicationCache(this.baseDn,
+              true);
+          localGenerationId = replicationCache.getGenerationId();
           ServerState serverState = replicationCache.getDbServerState();
 
           // The session initiator decides whether to use SSL.
@@ -317,7 +392,9 @@
             new ReplServerStartMessage(replicationServerId,
                                        replicationServerURL,
                                        this.baseDn, windowSize, serverState,
-                                       protocolVersion, sslEncryption);
+                                       protocolVersion,
+                                       localGenerationId,
+                                       sslEncryption);
           session.publish(outMsg);
         }
         else
@@ -326,6 +403,107 @@
         }
         this.serverState = receivedMsg.getServerState();
         sendWindowSize = receivedMsg.getWindowSize();
+
+        /* Until here session is encrypted then it depends on the negociation */
+        if (!sslEncryption)
+        {
+          session.stopEncryption();
+        }
+
+        if (debugEnabled())
+        {
+          Set<String> ss = this.serverState.toStringSet();
+          Set<String> lss = replicationCache.getDbServerState().toStringSet();
+          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+                   getMonitorInstanceName() +
+                   ", SH received START from RS serverId=" + serverId +
+                   " baseDN=" + this.baseDn +
+                   " generationId=" + generationId +
+                   " localGenerationId=" + localGenerationId +
+                   " state=" + ss +
+                   " and sent ReplServerStart with state=" + lss);
+        }
+
+        // if the remote RS and the local RS have the same genID
+        // then it's ok and nothing else to do
+        if (generationId == localGenerationId)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+              getMonitorInstanceName() + " RS with serverID=" + serverId +
+              " is connected with the right generation ID");
+          }
+        }
+        else
+        {
+          if (localGenerationId>0)
+          {
+            // if the local RS is initialized
+            if (generationId>0)
+            {
+              // if the remote RS is initialized
+              if (generationId != localGenerationId)
+              {
+                // if the 2 RS have different generationID
+                if (replicationCache.getGenerationIdSavedStatus())
+                {
+                  // it the present RS has received changes regarding its
+                  //     gen ID and so won't change without a reset
+                  // then  we are just degrading the peer.
+                  Message message = NOTE_BAD_GENERATION_ID.get(
+                      this.baseDn.toNormalizedString(),
+                      Short.toString(receivedMsg.getServerId()),
+                      Long.toString(generationId),
+                      Long.toString(localGenerationId));
+
+                  ErrorMessage errorMsg =
+                    new ErrorMessage(replicationServerId, serverId, message);
+                  session.publish(errorMsg);
+                }
+                else
+                {
+                  // The present RS has never received changes regarding its
+                  // gen ID.
+                  //
+                  // Example case:
+                  // - we are in RS1
+                  // - RS2 has genId2 from LS2 (genId2 <=> no data in LS2)
+                  // - RS1 has genId1 from LS1 /genId1 comes from data in suffix
+                  // - we are in RS1 and we receive a START msg from RS2
+                  // - Each RS keeps its genID / is degraded and when LS2 will
+                  //   be populated from LS1 everything will becomes ok.
+                  //
+                  // Issue:
+                  // FIXME : Would it be a good idea in some cases to just
+                  //         set the gen ID received from the peer RS
+                  //         specially if the peer has a non nul state and
+                  //         we have a nul state ?
+                  // replicationCache.setGenerationId(generationId, false);
+                  Message message = NOTE_BAD_GENERATION_ID.get(
+                      this.baseDn.toNormalizedString(),
+                      Short.toString(receivedMsg.getServerId()),
+                      Long.toString(generationId),
+                      Long.toString(localGenerationId));
+
+                  ErrorMessage errorMsg =
+                    new ErrorMessage(replicationServerId, serverId, message);
+                  session.publish(errorMsg);
+                }
+              }
+            }
+            else
+            {
+              // The remote has no genId. We don't change anything for the
+              // current RS.
+            }
+          }
+          else
+          {
+            // The local RS is not initialized - take the one received
+            replicationCache.setGenerationId(generationId, false);
+          }
+        }
       }
       else
       {
@@ -333,12 +511,9 @@
         return;   // we did not recognize the message, ignore it
       }
 
-      if (!sslEncryption)
-      {
-        session.stopEncryption();
-      }
-
-      replicationCache = replicationServer.getReplicationCache(this.baseDn);
+      // Get or create the ReplicationCache
+      replicationCache = replicationServer.getReplicationCache(this.baseDn,
+          true);
 
       boolean started;
       if (serverIsLDAPserver)
@@ -352,10 +527,11 @@
 
       if (started)
       {
-        writer = new ServerWriter(session, serverId, this, replicationCache);
+        // sendWindow MUST be created before starting the writer
+        sendWindow = new Semaphore(sendWindowSize);
 
-        reader = new ServerReader(session, serverId, this,
-            replicationCache);
+        writer = new ServerWriter(session, serverId, this, replicationCache);
+        reader = new ServerReader(session, serverId, this, replicationCache);
 
         reader.start();
         writer.start();
@@ -377,6 +553,12 @@
         // the connection is not valid, close it.
         try
         {
+          if (debugEnabled())
+          {
+            TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+              getMonitorInstanceName() + " RS failed to start locally " +
+              " the connection from serverID="+serverId);
+          }
           session.close();
         } catch (IOException e1)
         {
@@ -388,7 +570,8 @@
     {
       // some problem happened, reject the connection
       MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(this.toString()));
+      mb.append(ERR_CHANGELOG_CONNECTION_ERROR.get(
+          this.getMonitorInstanceName()));
       mb.append(stackTraceToSingleLineString(e));
       logError(mb.toMessage());
       try
@@ -399,7 +582,6 @@
         // ignore
       }
     }
-    sendWindow = new Semaphore(sendWindowSize);
   }
 
   /**
@@ -720,6 +902,21 @@
    */
   public void add(UpdateMessage update, ServerHandler sourceHandler)
   {
+    /*
+     * Ignore updates from a server that is degraded due to
+     * its inconsistent generationId
+     */
+    long referenceGenerationId = replicationCache.getGenerationId();
+    if ((referenceGenerationId>0) &&
+        (referenceGenerationId != generationId))
+    {
+      logError(ERR_IGNORING_UPDATE_TO.get(
+               update.getDn(),
+               this.getMonitorInstanceName()));
+
+      return;
+    }
+
     synchronized (msgQueue)
     {
       /*
@@ -1164,7 +1361,7 @@
     if (serverIsLDAPserver)
       return "Remote LDAP Server " + str;
     else
-      return "Remote Replication Server " + str;
+      return "Remote Repl Server " + str;
   }
 
   /**
@@ -1261,7 +1458,10 @@
     attributes.add(attr);
 
     attributes.add(new Attribute("ssl-encryption",
-                                 String.valueOf(session.isEncrypted())));
+        String.valueOf(session.isEncrypted())));
+
+    attributes.add(new Attribute("generation-id",
+        String.valueOf(generationId)));
 
     return attributes;
   }
@@ -1385,9 +1585,10 @@
   public void process(RoutableMessage msg)
   {
     if (debugEnabled())
-      TRACER.debugInfo("SH(" + replicationServerId + ") receives " +
-                 msg + " from " + serverId);
-
+       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+                 getMonitorInstanceName() +
+                 " SH for remote server " + this.getMonitorInstanceName() +
+                 " processes received msg=" + msg);
     replicationCache.process(msg, this);
   }
 
@@ -1401,6 +1602,12 @@
    public void sendInfo(ReplServerInfoMessage info)
    throws IOException
    {
+     if (debugEnabled())
+       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+           getMonitorInstanceName() +
+           " SH for remote server " + this.getMonitorInstanceName() +
+           " sends message=" + info);
+
      session.publish(info);
    }
 
@@ -1412,7 +1619,13 @@
     */
    public void setReplServerInfo(ReplServerInfoMessage infoMsg)
    {
+     if (debugEnabled())
+       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+           getMonitorInstanceName() +
+           " SH for remote server " + this.getMonitorInstanceName() +
+           " sets replServerInfo " + "<" + infoMsg + ">");
      remoteLDAPservers = infoMsg.getConnectedServers();
+     generationId = infoMsg.getGenerationId();
    }
 
    /**
@@ -1458,8 +1671,10 @@
   public void send(RoutableMessage msg) throws IOException
   {
     if (debugEnabled())
-      TRACER.debugInfo("SH(" + replicationServerId + ") forwards " +
-                 msg + " to " + serverId);
+          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+              getMonitorInstanceName() +
+              " SH for remote server " + this.getMonitorInstanceName() +
+              " sends message=" + msg);
     session.publish(msg);
   }
 
@@ -1492,4 +1707,48 @@
       checkWindow();
     }
   }
+
+  /**
+   * Returns the value of generationId for that handler.
+   * @return The value of the generationId.
+   */
+  public long getGenerationId()
+  {
+    return generationId;
+  }
+
+  /**
+   * Resets the generationId for this domain.
+   */
+  public void resetGenerationId()
+  {
+    // Notify the peer that it is now invalid regarding the generationId
+    // We are now waiting a startServer message from this server with
+    // a valid generationId.
+    try
+    {
+      Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString());
+      ErrorMessage errorMsg =
+        new ErrorMessage(serverId, replicationServerId, message);
+      session.publish(errorMsg);
+    }
+    catch (Exception e)
+    {
+      // FIXME Log exception when sending reset error message
+    }
+  }
+
+  /**
+   * Sends a message containing a generationId to a peer server.
+   * The peer is expected to be a replication server.
+   *
+   * @param  msg         The GenerationIdMessage message to be sent.
+   * @throws IOException When it occurs while sending the message,
+   *
+   */
+   public void sendGenerationId(ResetGenerationId msg)
+   throws IOException
+   {
+     session.publish(msg);
+   }
 }

--
Gitblit v1.10.0