From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication

---
 opends/src/server/org/opends/server/replication/server/ReplicationCache.java |  346 ++++++++++++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 326 insertions(+), 20 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
index d618cf8..990eb1f 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -28,6 +28,8 @@
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import org.opends.server.loggers.debug.DebugTracer;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -47,6 +49,7 @@
 import org.opends.server.replication.protocol.RoutableMessage;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.protocol.ReplServerInfoMessage;
+import org.opends.server.replication.protocol.ResetGenerationId;
 import org.opends.server.types.DN;
 
 import com.sleepycat.je.DatabaseException;
@@ -106,6 +109,15 @@
     new ConcurrentHashMap<Short, DbHandler>();
   private ReplicationServer replicationServer;
 
+  /* GenerationId management */
+  private long generationId = -1;
+  private boolean generationIdSavedStatus = false;
+
+  /**
+   * The tracer object for the debug logger.
+   */
+  private static final DebugTracer TRACER = getTracer();
+
   /**
    * Creates a new ReplicationCache associated to the DN baseDn.
    *
@@ -117,7 +129,13 @@
   {
     this.baseDn = baseDn;
     this.replicationServer = replicationServer;
-  }
+
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "In " + this.replicationServer.getMonitorInstanceName() +
+        " Created Cache for " + baseDn + " " +
+        stackTraceToSingleLineString(new Exception()));
+}
 
   /**
    * Add an update that has been received to the list of
@@ -138,6 +156,7 @@
      * other replication server before pushing it to the LDAP servers
      */
 
+    short id  = update.getChangeNumber().getServerId();
     sourceHandler.updateServerState(update);
     sourceHandler.incrementInCount();
 
@@ -158,19 +177,21 @@
       }
     }
 
-    // look for the dbHandler that is responsible for the master server which
+    // look for the dbHandler that is responsible for the LDAP server which
     // generated the change.
     DbHandler dbHandler = null;
     synchronized (sourceDbHandlers)
     {
-      short id  = update.getChangeNumber().getServerId();
       dbHandler   = sourceDbHandlers.get(id);
       if (dbHandler == null)
       {
         try
         {
-          dbHandler = replicationServer.newDbHandler(id, baseDn);
-        } catch (DatabaseException e)
+          dbHandler = replicationServer.newDbHandler(id,
+              baseDn, generationId);
+          generationIdSavedStatus = true;
+        }
+        catch (DatabaseException e)
         {
           /*
            * Because of database problem we can't save any more changes
@@ -250,6 +271,15 @@
       }
       connectedServers.put(handler.getServerId(), handler);
 
+      // It can be that the server that connects here is the
+      // first server connected for a domain.
+      // In that case, we will establish the appriopriate connections
+      // to the other repl servers for this domain and receive
+      // their ReplServerInfo messages.
+      // FIXME: Is it necessary to end this above processing BEFORE listening
+      //        to incoming messages for that domain ? But the replica
+      //        would raise Read Timeout for replica that connects.
+
       // Update the remote replication servers with our list
       // of connected LDAP servers
       sendReplServerInfo();
@@ -265,17 +295,90 @@
    */
   public void stopServer(ServerHandler handler)
   {
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "In RS " + this.replicationServer.getMonitorInstanceName() +
+        " for " + baseDn + " " +
+        " stopServer " + handler.getMonitorInstanceName());
+
     handler.stopHandler();
 
     if (handler.isReplicationServer())
+    {
       replicationServers.remove(handler.getServerId());
+    }
     else
     {
       connectedServers.remove(handler.getServerId());
+    }
 
-      // Update the remote replication servers with our list
-      // of connected LDAP servers
-      sendReplServerInfo();
+    mayResetGenerationId();
+
+    // Update the remote replication servers with our list
+    // of connected LDAP servers
+    sendReplServerInfo();
+  }
+
+  /**
+   * Resets the generationId for this domain if there is no LDAP
+   * server currently connected and if the generationId has never
+   * been saved.
+   */
+  protected void mayResetGenerationId()
+  {
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "In RS " + this.replicationServer.getMonitorInstanceName() +
+        " for " + baseDn + " " +
+        " mayResetGenerationId generationIdSavedStatus=" +
+        generationIdSavedStatus);
+
+    // If there is no more any LDAP server connected to this domain in the
+    // topology and the generationId has never been saved, then we can reset
+    // it and the next LDAP server to connect will become the new reference.
+    boolean lDAPServersConnectedInTheTopology = false;
+    if (connectedServers.isEmpty())
+    {
+      for (ServerHandler rsh : replicationServers.values())
+      {
+        if (generationId != rsh.getGenerationId())
+        {
+          if (debugEnabled())
+            TRACER.debugInfo(
+                "In RS " + this.replicationServer.getMonitorInstanceName() +
+                " for " + baseDn + " " +
+                " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() +
+                " thas different genId");
+        }
+        else
+        {
+          if (!rsh.getRemoteLDAPServers().isEmpty())
+          {
+            lDAPServersConnectedInTheTopology = true;
+
+            if (debugEnabled())
+              TRACER.debugInfo(
+                  "In RS " + this.replicationServer.getMonitorInstanceName() +
+                  " for " + baseDn + " " +
+                  " mayResetGenerationId RS" + rsh.getMonitorInstanceName() +
+              " has servers connected to it - will not reset generationId");
+          }
+        }
+      }
+    }
+    else
+    {
+      lDAPServersConnectedInTheTopology = true;
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In RS " + this.replicationServer.getMonitorInstanceName() +
+          " for " + baseDn + " " +
+          " has servers connected to it - will not reset generationId");
+    }
+
+    if ((!lDAPServersConnectedInTheTopology) && (!this.generationIdSavedStatus))
+    {
+      setGenerationId(-1, false);
     }
   }
 
@@ -321,7 +424,7 @@
       // Update this server with the list of LDAP servers
       // already connected
       handler.sendInfo(
-          new ReplServerInfoMessage(getConnectedLDAPservers()));
+          new ReplServerInfoMessage(getConnectedLDAPservers(),generationId));
 
       return true;
     }
@@ -437,17 +540,20 @@
   }
 
   /**
-   * creates a new ReplicationDB with specified identifier.
-   * @param id the identifier of the new ReplicationDB.
-   * @param db the new db.
+   * Sets the provided DbHandler associated to the provided serverId.
+   *
+   * @param serverId  the serverId for the server to which is
+   *                  associated the Dbhandler.
+   * @param dbHandler the dbHandler associated to the serverId.
    *
    * @throws DatabaseException If a database error happened.
    */
-  public void newDb(short id, DbHandler db) throws DatabaseException
+  public void setDbHandler(short serverId, DbHandler dbHandler)
+  throws DatabaseException
   {
     synchronized (sourceDbHandlers)
     {
-      sourceDbHandlers.put(id , db);
+      sourceDbHandlers.put(serverId , dbHandler);
     }
   }
 
@@ -557,7 +663,8 @@
   }
 
   /**
-   * Process an InitializeRequestMessage.
+   * Processes a message coming from one server in the topology
+   * and potentially forwards it to one or all other servers.
    *
    * @param msg The message received and to be processed.
    * @param senderHandler The server handler of the server that emitted
@@ -565,6 +672,23 @@
    */
   public void process(RoutableMessage msg, ServerHandler senderHandler)
   {
+    // A replication server is not expected to be the destination
+    // of a routable message except for an error message.
+    if (msg.getDestination() == this.replicationServer.getServerId())
+    {
+      if (msg instanceof ErrorMessage)
+      {
+        ErrorMessage errorMsg = (ErrorMessage)msg;
+        logError(ERR_ERROR_MSG_RECEIVED.get(
+                   errorMsg.getDetails()));
+      }
+      else
+      {
+        logError(NOTE_ERR_ROUTING_TO_SERVER.get(
+            msg.getClass().getCanonicalName()));
+      }
+      return;
+    }
 
     List<ServerHandler> servers = getDestinationServers(msg, senderHandler);
 
@@ -572,9 +696,13 @@
     {
       MessageBuilder mb = new MessageBuilder();
       mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get());
-      mb.append("serverID:" + msg.getDestination());
+      mb.append(" unreachable server ID=" + msg.getDestination());
+      mb.append(" unroutable message =" + msg);
       ErrorMessage errMsg = new ErrorMessage(
-              msg.getsenderID(), mb.toMessage());
+          this.replicationServer.getServerId(),
+          msg.getsenderID(),
+          mb.toMessage());
+
       try
       {
         senderHandler.send(errMsg);
@@ -583,8 +711,8 @@
       {
         // TODO Handle error properly (sender timeout in addition)
         /*
-         * An error happened trying the send back an error to this server.
-         * Log an error and close the connection to the sender server.
+         * An error happened trying to send an error msg to this server.
+         * Log an error and close the connection to this server.
          */
         MessageBuilder mb2 = new MessageBuilder();
         mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString()));
@@ -788,7 +916,7 @@
     private void sendReplServerInfo()
     {
       ReplServerInfoMessage info =
-        new ReplServerInfoMessage(getConnectedLDAPservers());
+        new ReplServerInfoMessage(getConnectedLDAPservers(), generationId);
       for (ServerHandler handler : replicationServers.values())
       {
         try
@@ -811,6 +939,26 @@
     }
 
     /**
+     * Get the generationId associated to this domain.
+     *
+     * @return The generationId
+     */
+    public long getGenerationId()
+    {
+      return generationId;
+    }
+
+    /**
+     * Get the generationId saved status.
+     *
+     * @return The generationId saved status.
+     */
+    public boolean getGenerationIdSavedStatus()
+    {
+      return generationIdSavedStatus;
+    }
+
+    /**
      * Sets the replication server informations for the provided
      * handler from the provided ReplServerInfoMessage.
      *
@@ -822,4 +970,162 @@
     {
       handler.setReplServerInfo(infoMsg);
     }
+
+    /**
+     * Sets the provided value as the new in memory generationId.
+     *
+     * @param generationId The new value of generationId.
+     * @param savedStatus  The saved status of the generationId.
+     */
+    synchronized public void setGenerationId(long generationId,
+        boolean savedStatus)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDN=" + baseDn +
+          " RCache.set GenerationId=" + generationId);
+
+      if (generationId == this.generationId)
+        return;
+
+      if (this.generationId>0)
+      {
+        for (ServerHandler handler : connectedServers.values())
+        {
+          handler.resetGenerationId();
+        }
+      }
+
+      this.generationId = generationId;
+      this.generationIdSavedStatus = savedStatus;
+
+    }
+
+    /**
+     * Resets the generationID.
+     *
+     * @param senderHandler The handler associated to the server
+     *        that requested to reset the generationId.
+     */
+    public void resetGenerationId(ServerHandler senderHandler)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+          "In " + this.replicationServer.getMonitorInstanceName() +
+          " baseDN=" + baseDn +
+          " RCache.resetGenerationId");
+
+      // Notifies the others LDAP servers that from now on
+      // they have the bad generationId
+      for (ServerHandler handler : connectedServers.values())
+      {
+        handler.resetGenerationId();
+      }
+
+      // Propagates the reset message to the others replication servers
+      // dealing with the same domain.
+      if (senderHandler.isLDAPserver())
+      {
+        for (ServerHandler handler : replicationServers.values())
+        {
+          try
+          {
+            handler.sendGenerationId(new ResetGenerationId());
+          }
+          catch (IOException e)
+          {
+            logError(ERR_CHANGELOG_ERROR_SENDING_INFO.
+                get(handler.getMonitorInstanceName()));
+           }
+        }
+      }
+
+      // Reset the localchange and state db for the current domain
+      synchronized (sourceDbHandlers)
+      {
+        for (DbHandler dbHandler : sourceDbHandlers.values())
+        {
+          try
+          {
+            dbHandler.clear();
+          }
+          catch (Exception e)
+          {
+            // TODO: i18n
+            logError(Message.raw(
+                "Exception caught while clearing dbHandler:" +
+                e.getLocalizedMessage()));
+          }
+        }
+        sourceDbHandlers.clear();
+
+        if (debugEnabled())
+          TRACER.debugInfo(
+              "In " + this.replicationServer.getMonitorInstanceName() +
+              " baseDN=" + baseDn +
+              " The source db handler has been cleared");
+      }
+      try
+      {
+        replicationServer.clearGenerationId(baseDn);
+      }
+      catch (Exception e)
+      {
+        // TODO: i18n
+        logError(Message.raw(
+            "Exception caught while clearing generationId:" +
+            e.getLocalizedMessage()));
+      }
+
+      // Reset the in memory domain generationId
+      generationId = -1;
+    }
+
+    /**
+     * Returns whether the provided server is in degraded
+     * state due to the fact that the peer server has an invalid
+     * generationId for this domain.
+     *
+     * @param serverId The serverId for which we want to know the
+     *                 the state.
+     * @return Whether it is degraded or not.
+     */
+
+    public boolean isDegradedDueToGenerationId(short serverId)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDN=" + baseDn +
+            " isDegraded serverId=" + serverId +
+            " given local generation Id=" + this.generationId);
+
+      ServerHandler handler = replicationServers.get(serverId);
+      if (handler == null)
+      {
+        handler = connectedServers.get(serverId);
+        if (handler == null)
+        {
+          return false;
+        }
+      }
+
+      if (debugEnabled())
+        TRACER.debugInfo(
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " baseDN=" + baseDn +
+            " Compute degradation of serverId=" + serverId +
+            " LS server generation Id=" + handler.getGenerationId());
+      return (handler.getGenerationId() != this.generationId);
+    }
+
+    /**
+     * Return the associated replication server.
+     * @return The replication server.
+     */
+    public ReplicationServer getReplicationServer()
+    {
+      return replicationServer;
+    }
 }

--
Gitblit v1.10.0