From 4fe72a4bef946169b0f50bc05bd9dc3b4b1131d3 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 14 Aug 2009 12:37:19 +0000
Subject: [PATCH] Support for External change log compatible with draft-good-ldap-changelog-04.txt , March 2003

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  401 ++++++++++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 322 insertions(+), 79 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 3bae821..4f844d4 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -63,6 +63,7 @@
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.common.StatusMachineEvent;
 import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
 import org.opends.server.replication.protocol.ChangeStatusMsg;
 import org.opends.server.replication.protocol.ErrorMsg;
 import org.opends.server.replication.protocol.MonitorMsg;
@@ -75,9 +76,11 @@
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeBuilder;
 import org.opends.server.types.Attributes;
+import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
 import org.opends.server.util.TimeThread;
+
 import com.sleepycat.je.DatabaseException;
 
 /**
@@ -173,6 +176,8 @@
   // every n number of treated assured messages
   private int assuredTimeoutTimerPurgeCounter = 0;
 
+  ServerState ctHeartbeatState = null;
+
   /**
    * Creates a new ReplicationServerDomain associated to the DN baseDn.
    *
@@ -360,8 +365,7 @@
         if ( (generationId>0) && (generationId != handler.getGenerationId()) )
         {
           if (debugEnabled())
-            TRACER.debugInfo("In RS " +
-              replicationServer.getServerId() +
+            TRACER.debugInfo("In " + this.getName() +
               " for dn " + baseDn + ", update " +
               update.getChangeNumber().toString() +
               " will not be sent to replication server " +
@@ -426,8 +430,7 @@
         if (debugEnabled())
         {
           if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
-            TRACER.debugInfo("In RS " +
-              replicationServer.getServerId() +
+            TRACER.debugInfo("In " + this +
               " for dn " + baseDn + ", update " +
               update.getChangeNumber().toString() +
               " will not be sent to directory server " +
@@ -1024,10 +1027,9 @@
   {
       if (debugEnabled())
         TRACER.debugInfo(
-            "In RS " + this.replicationServer.getMonitorInstanceName() +
-            " domain=" + this +
-            " stopServer(SH)" + handler.getMonitorInstanceName() +
-          " " + stackTraceToSingleLineString(new Exception()));
+            "In " + this.replicationServer.getMonitorInstanceName() +
+            " domain=" + this + " stopServer() on the server handler " +
+            handler.getMonitorInstanceName());
     /*
      * We must prevent deadlock on replication server domain lock, when for
      * instance this code is called from dying ServerReader but also dying
@@ -1119,10 +1121,9 @@
   {
     if (debugEnabled())
       TRACER.debugInfo(
-          "In RS " + this.replicationServer.getMonitorInstanceName() +
-          " domain=" + this +
-          " stopServer(MH)" + handler.getMonitorInstanceName() +
-          " " + stackTraceToSingleLineString(new Exception()));
+          "In " + this.replicationServer.getMonitorInstanceName()
+          + " domain=" + this + " stopServer() on the message handler "
+          + handler.getMonitorInstanceName());
     /*
      * We must prevent deadlock on replication server domain lock, when for
      * instance this code is called from dying ServerReader but also dying
@@ -1363,7 +1364,40 @@
 
     try
     {
-      return handler.generateIterator(changeNumber);
+      ReplicationIterator it = handler.generateIterator(changeNumber);
+      if (it.next()==false)
+      {
+        it.releaseCursor();
+        throw new Exception("no new change");
+      }
+      return it;
+    } catch (Exception e)
+    {
+      return null;
+    }
+  }
+
+  /**
+   * Creates and returns an iterator.
+   * When the iterator is not used anymore, the caller MUST call the
+   * ReplicationIterator.releaseCursor() method to free the resources
+   * and locks used by the ReplicationIterator.
+   *
+   * @param serverId Identifier of the server for which the iterator is created.
+   * @param changeNumber Starting point for the iterator.
+   * @return the created ReplicationIterator. Null when no DB is available
+   * for the provided server Id.
+   */
+  public ReplicationIterator getIterator(short serverId,
+    ChangeNumber changeNumber)
+  {
+    DbHandler handler = sourceDbHandlers.get(serverId);
+    if (handler == null)
+      return null;
+    try
+    {
+      ReplicationIterator it = handler.generateIterator(changeNumber);
+      return it;
     } catch (Exception e)
     {
       return null;
@@ -1955,12 +1989,10 @@
     ResetGenerationIdMsg genIdMsg)
   {
     if (debugEnabled())
-    {
       TRACER.debugInfo(
-        "In RS " + getReplicationServer().getServerId() +
+        "In " + this +
         " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId() +
         " for baseDn " + baseDn + ":\n" + genIdMsg);
-    }
 
     try
     {
@@ -1982,14 +2014,12 @@
     {
       // Order to take a gen id we already have, just ignore
       if (debugEnabled())
-      {
         TRACER.debugInfo(
-          "In RS " + getReplicationServer().getServerId()
+          "In " + this
           + " Reset generation id requested for baseDn " + baseDn
           + " but generation id was already " + this.generationId
           + ":\n" + genIdMsg);
       }
-    }
 
     // If we are the first replication server warned,
     // then forwards the reset message to the remote replication servers
@@ -2002,7 +2032,7 @@
         rsHandler.setGenerationId(newGenId);
         if (senderHandler.isDataServer())
         {
-          rsHandler.forwardGenerationIdToRS(genIdMsg);
+          rsHandler.send(genIdMsg);
         }
       } catch (IOException e)
       {
@@ -2158,7 +2188,7 @@
   }
 
   /**
-   * Clears the Db associated with that cache.
+   * Clears the Db associated with that domain.
    */
   public void clearDbs()
   {
@@ -2181,12 +2211,6 @@
         }
       }
       stopDbHandlers();
-
-      if (debugEnabled())
-        TRACER.debugInfo(
-          "In " + this.replicationServer.getMonitorInstanceName() +
-          " baseDN=" + baseDn +
-          " The source db handler has been cleared");
     }
     try
     {
@@ -2471,11 +2495,6 @@
    */
   public void receivesMonitorDataResponse(MonitorMsg msg)
   {
-    if (debugEnabled())
-      TRACER.debugInfo(
-        "In " + this.replicationServer.getMonitorInstanceName() +
-        "Receiving " + msg + " from " + msg.getsenderID());
-
     try
     {
       synchronized (monitorDataLock)
@@ -2543,7 +2562,7 @@
         {
           if (debugEnabled())
             TRACER.debugInfo(
-              "In " + this.replicationServer.getMonitorInstanceName() +
+              "In " + this +
               " baseDn=" + baseDn +
               " Processed msg from " + msg.getsenderID() +
               " New monitor data: " + wrkMonitorData.toString());
@@ -2819,24 +2838,29 @@
    * Return the state that contain for each server the time of eligibility.
    * @return the state.
    */
-  public ServerState getHeartbeatState()
+  public ServerState getChangeTimeHeartbeatState()
   {
-    // TODO:ECL Eligility must be supported
-    return this.getDbServerState();
+    if (ctHeartbeatState == null)
+    {
+      ctHeartbeatState = this.getDbServerState().duplicate();
+    }
+    return ctHeartbeatState;
   }
+
   /**
+   * TODO: code cleaning - remove this method.
    * Computes the change number eligible to the ECL.
    * @return null if the domain does not play in eligibility.
    */
-  public ChangeNumber computeEligibleCN()
+  public ChangeNumber computeEligibleCN2()
   {
-    ChangeNumber elligibleCN = null;
-    ServerState heartbeatState = getHeartbeatState();
+    ChangeNumber eligibleCN = null;
+    ServerState heartbeatState = getChangeTimeHeartbeatState();
 
     if (heartbeatState==null)
       return null;
 
-    // compute elligible CN
+    // compute eligible CN
     ServerState hbState = heartbeatState.duplicate();
 
     Iterator<Short> it = hbState.iterator();
@@ -2850,70 +2874,87 @@
       if (TimeThread.getTime()-storedCN.getTime()>2000)
       {
         if (debugEnabled())
-          TRACER.debugInfo(
-            "For RSD." + this.baseDn + " Server " + sid
+          TRACER.debugInfo("In " + this.getName() +
+            " Server " + sid
             + " is not considered for eligibility ... potentially down");
         continue;
       }
 
-      if ((elligibleCN == null) || (storedCN.older(elligibleCN)))
+      if ((eligibleCN == null) || (storedCN.older(eligibleCN)))
       {
-        elligibleCN = storedCN;
+        eligibleCN = storedCN;
       }
     }
 
     if (debugEnabled())
-      TRACER.debugInfo(
-        "For RSD." + this.baseDn + " ElligibleCN()=" + elligibleCN);
-    return elligibleCN;
+      TRACER.debugInfo("In " + this.getName() +
+        " computeEligibleCN() returns " + eligibleCN);
+    return eligibleCN;
   }
 
   /**
-   * Computes the eligible server state by minimizing the dbServerState and the
-   * elligibleCN.
+   * Computes the eligible server state for the domain.
+   * Consists in taking the most recent change from the dbServerState and the
+   * eligibleCN.
+   * @param eligibleCN The provided eligibleCN.
    * @return The computed eligible server state.
    */
-  public ServerState getCLElligibleState()
+  public ServerState getEligibleState(ChangeNumber eligibleCN)
   {
-    // ChangeNumber elligibleCN = computeEligibleCN();
-    ServerState res = new ServerState();
-    ServerState dbState = this.getDbServerState();
-    res = dbState;
+    ServerState result = new ServerState();
 
-    /* TODO:ECL Eligibility is not yet implemented
-    Iterator<Short> it = dbState.iterator();
-    while (it.hasNext())
+    ServerState dbState = this.getDbServerState();
+
+    result = dbState.duplicate();
+
+    if (eligibleCN != null)
     {
-      Short sid = it.next();
-      DbHandler h = sourceDbHandlers.get(sid);
-      ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
-      try
+      Iterator<Short> it = dbState.iterator();
+      while (it.hasNext())
       {
-        if ((elligibleCN!=null)&&(elligibleCN.older(dbCN)))
+        Short sid = it.next();
+        DbHandler h = sourceDbHandlers.get(sid);
+        ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
+        try
         {
-          // some CN exist in the db newer than elligible CN
-          ReplicationIterator ri = h.generateIterator(elligibleCN);
-          ChangeNumber newCN = ri.getCurrentCN();
-          res.update(newCN);
-          ri.releaseCursor();
+          if (eligibleCN.older(dbCN))
+          {
+            // some CN exist in the db newer than eligible CN
+            // let's get it
+            ReplicationIterator ri = h.generateIterator(eligibleCN);
+            try
+            {
+              if ((ri != null) && (ri.getChange()!=null))
+              {
+                ChangeNumber newCN = ri.getChange().getChangeNumber();
+                result.update(newCN);
+              }
+            }
+            finally
+            {
+              ri.releaseCursor();
+              ri = null;
+            }
+          }
+          else
+          {
+            // no CN exist in the db newer than elligible CN
+            result.update(dbCN);
+          }
         }
-        else
+        catch(Exception e)
         {
-          // no CN exist in the db newer than elligible CN
-          res.update(dbCN);
+          Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(
+              " " +  stackTraceToSingleLineString(e));
+          logError(errMessage);
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
         }
       }
-      catch(Exception e)
-      {
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      }
     }
-    */
-
     if (debugEnabled())
-      TRACER.debugInfo("In " + this.getName()
-        + " getCLElligibleState returns:" + res);
-    return res;
+      TRACER.debugInfo("In " + this
+        + " getEligibleState() result is " + result);
+    return result;
   }
 
   /**
@@ -2930,4 +2971,206 @@
     }
     return domainStartState;
   }
+
+  /**
+   * Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat
+   * state.
+   * For each DS, take the oldest CN from the changetime hearbeat state
+   * and from the changelog db last CN. Can be null.
+   * @return the eligible CN.
+   */
+  public ChangeNumber getEligibleCN()
+  {
+    ChangeNumber eligibleCN = null;
+
+    for (DbHandler db : sourceDbHandlers.values())
+    {
+      // Consider this producer (DS/db).
+      short sid = db.getServerId();
+
+      ChangeNumber changelogLastCN = db.getLastChange();
+      if (changelogLastCN != null)
+      {
+        if ((eligibleCN == null) || (changelogLastCN.newer(eligibleCN)))
+        {
+          eligibleCN = changelogLastCN;
+        }
+      }
+
+      ChangeNumber heartbeatLastDN =
+        getChangeTimeHeartbeatState().getMaxChangeNumber(sid);
+
+      if ((heartbeatLastDN != null) &&
+       ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN))))
+      {
+        eligibleCN = heartbeatLastDN;
+      }
+    }
+
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "In " + this.getName() + " getEligibleCN() returns result ="
+        + eligibleCN);
+    return eligibleCN;
+  }
+
+  /**
+   * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp)
+   * value received, and forwarding the message to the other RSes.
+   * @param senderHandler The handler for the server that sent the heartbeat.
+   * @param msg The message to process.
+   */
+  public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
+      ChangeTimeHeartbeatMsg msg )
+  {
+    try
+    {
+      // Acquire lock on domain (see more details in comment of start() method
+      // of ServerHandler)
+      lock();
+    } catch (InterruptedException ex)
+    {
+      // Try doing job anyway...
+    }
+
+    try
+    {
+      storeReceivedCTHeartbeat(msg.getChangeNumber());
+
+      // If we are the first replication server warned,
+      // then forwards the reset message to the remote replication servers
+      for (ReplicationServerHandler rsHandler : replicationServers.values())
+      {
+        try
+        {
+          // After we'll have sent the message , the remote RS will adopt
+          // the new genId
+          if (senderHandler.isDataServer())
+          {
+            rsHandler.send(msg);
+          }
+        } catch (IOException e)
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(rsHandler.getName()));
+          stopServer(rsHandler);
+        }
+      }
+    }
+    finally
+    {
+      release();
+    }
+  }
+
+  /**
+   * Store a change time value received from a data server.
+   * @param cn The provided change time.
+   */
+  public void storeReceivedCTHeartbeat(ChangeNumber cn)
+  {
+    // TODO:May be we can spare processing by only storing CN (timestamp)
+    // instead of a server state.
+    getChangeTimeHeartbeatState().update(cn);
+
+    /*
+    if (debugEnabled())
+    {
+      Set<String> ss = ctHeartbeatState.toStringSet();
+      String dss = "";
+      for (String s : ss)
+      {
+        dss = dss + " \\ " + s;
+      }
+      TRACER.debugInfo("In " + this.getName() + " " + dss);
+    }
+    */
+  }
+
+  /**
+   * This methods count the changes, server by server :
+   * - from a start point (cn taken from the provided startState)
+   * - to an end point (the provided endCN).
+   * @param startState The provided start server state.
+   * @param endCN The provided end change number.
+   * @return The number of changes between startState and endCN.
+   */
+  public long getEligibleCount(ServerState startState, ChangeNumber endCN)
+  {
+    long res = 0;
+    ReplicationIterator ri=null;
+
+    // Parses the dbState of the domain , server by server
+    ServerState dbState = this.getDbServerState();
+    Iterator<Short> it = dbState.iterator();
+    while (it.hasNext())
+    {
+      // for each server
+      Short sid = it.next();
+      DbHandler h = sourceDbHandlers.get(sid);
+
+      try
+      {
+        // Set on the change related to the startState
+        ChangeNumber startCN = null;
+        try
+        {
+          ri = h.generateIterator(startState.getMaxChangeNumber(sid));
+          startCN = ri.getChange().getChangeNumber();
+        }
+        catch(Exception e)
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          // no change found (purge from CL)
+          startCN = null;
+        }
+        finally
+        {
+          if (ri!=null)
+          {
+            ri.releaseCursor();
+            ri = null;
+          }
+        }
+
+        if (startCN != null)
+        {
+          // Set on the change related to the endCN
+          ChangeNumber upperCN;
+          try
+          {
+            // Build a changenumber for this very server, with the timestamp
+            // of the endCN
+            ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid);
+            ri = h.generateIterator(f);
+            upperCN = ri.getChange().getChangeNumber();
+          }
+          catch(Exception e)
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+            // no new change
+            upperCN = h.getLastChange();
+          }
+          finally
+          {
+            if (ri!=null)
+            {
+              ri.releaseCursor();
+              ri = null;
+            }
+          }
+
+          long diff = upperCN.getSeqnum() - startCN.getSeqnum() + 1;
+
+          res += diff;
+        }
+        // TODO:ECL We should compute if changenumber.seqnum has turned !
+      }
+      catch(Exception e)
+      {
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
+    }
+    return res;
+  }
 }

--
Gitblit v1.10.0