From 2b90d968d139530054b8b0f29924f5bfc49f767d Mon Sep 17 00:00:00 2001
From: fdorson <fdorson@localhost>
Date: Tue, 11 Mar 2008 14:14:54 +0000
Subject: [PATCH] Fix for 1873 : ServerState should be updated after a server crash

---
 opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java                                 |    4 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java              |    4 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java |    6 +-
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java       |    4 
 opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java                             |  109 ++++++++++++++++++++++++++++++++---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java |    2 
 opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java                                 |    2 
 opends/src/messages/messages/replication.properties                                                           |    2 
 8 files changed, 111 insertions(+), 22 deletions(-)

diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 3019188..86bd3d6 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -274,3 +274,5 @@
 for domain %s for replication server %s : %s
 NOTICE_IGNORING_REMOTE_MONITOR_DATA_116=Some monitor data have been received \
  from the server with server ID %s too late and are ignored
+NOTICE_SERVER_STATE_RECOVERY_117=ServerState recovery for domain %s, \
+updated with changeNumber %s 
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java b/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
index 4145672..53f3bd5 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -34,6 +34,7 @@
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Iterator;
 
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.ModifyOperationBasis;
@@ -72,6 +73,7 @@
    private InternalClientConnection conn =
        InternalClientConnection.getRootConnection();
    private ASN1OctetString asn1BaseDn;
+   private short serverId;
 
    /**
     * The attribute name used to store the state in the backend.
@@ -81,10 +83,12 @@
   /**
    * create a new ServerState.
    * @param baseDn The baseDN for which the ServerState is created
+   *  @param serverId The serverId
    */
-  public PersistentServerState(DN baseDn)
+  public PersistentServerState(DN baseDn, short serverId)
   {
     this.baseDn = baseDn;
+    this.serverId = serverId;
     asn1BaseDn = new ASN1OctetString(baseDn.toString());
     loadState();
   }
@@ -139,16 +143,12 @@
     }
 
     /*
-     * TODO : The ServerState is saved to the database periodically,
-     * therefore in case of crash it is possible that is does not contain
-     * the latest changes that have been processed and saved to the
-     * database.
-     * In order to make sure that we don't loose them, search all the entries
-     * that have been updated after this entry.
-     * This is done by using the HistoricalCsnOrderingMatchingRule
-     * and an ordering index for historical attribute
+     * In order to make sure that the replication never looses changes,
+     * the server needs to search all the entries that have been
+     * updated after the last write of the ServerState.
+     * Inconsistencies may append after a crash.
      */
-
+    checkAndUpdateServerState();
   }
 
   /**
@@ -362,4 +362,91 @@
     clearInMemory();
     save();
   }
-}
+
+  /**
+   * The ServerState is saved to the database periodically,
+   * therefore in case of crash it is possible that is does not contain
+   * the latest changes that have been processed and saved to the
+   * database.
+   * In order to make sure that we don't loose them, search all the entries
+   * that have been updated after this entry.
+   * This is done by using the HistoricalCsnOrderingMatchingRule
+   * and an ordering index for historical attribute
+   */
+  public final void checkAndUpdateServerState() {
+    Message message;
+    InternalSearchOperation op;
+    ChangeNumber serverStateMaxCn;
+    ChangeNumber dbMaxCn;
+    final AttributeType histType =
+      DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
+
+    // Retrieves the entries that have changed since the
+    // maxCn stored in the serverState
+    synchronized (this)
+    {
+      serverStateMaxCn = this.getMaxChangeNumber(serverId);
+
+      if (serverStateMaxCn == null)
+        return;
+
+      try {
+        op = ReplicationBroker.searchForChangedEntries(baseDn,
+            serverStateMaxCn, null);
+      }
+      catch (Exception  e)
+      {
+        return;
+      }
+      if (op.getResultCode() != ResultCode.SUCCESS)
+      {
+        // An error happened trying to search for the updates
+        // Log an error
+        message = ERR_CANNOT_RECOVER_CHANGES.get(
+            baseDn.toNormalizedString());
+        logError(message);
+      }
+      else
+      {
+        dbMaxCn = serverStateMaxCn;
+        for (SearchResultEntry resEntry : op.getSearchEntries())
+        {
+          List<Attribute> attrs = resEntry.getAttribute(histType);
+          Iterator<AttributeValue> iav = attrs.get(0).getValues().iterator();
+          try
+          {
+            while (true)
+            {
+              AttributeValue attrVal = iav.next();
+              HistVal histVal = new HistVal(attrVal.getStringValue());
+              ChangeNumber cn = histVal.getCn();
+
+              if ((cn != null) && (cn.getServerId() == serverId))
+              {
+                // compare the csn regarding the maxCn we know and
+                // store the biggest
+                if (ChangeNumber.compare(dbMaxCn, cn) < 0)
+                {
+                  dbMaxCn = cn;
+                }
+              }
+            }
+          }
+          catch(Exception e)
+          {
+          }
+        }
+
+        if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0)
+        {
+          // Update the serverState with the new maxCn
+          // present in the database
+          this.update(dbMaxCn);
+          message = NOTE_SERVER_STATE_RECOVERY.get(
+              baseDn.toNormalizedString(), dbMaxCn.toString());
+          logError(message);
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index c74a0e8..0bd9ee2 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -276,7 +276,7 @@
                * replication server and populate the replayOperations
                * list.
                */
-              InternalSearchOperation op = seachForChangedEntries(
+              InternalSearchOperation op = searchForChangedEntries(
                 baseDn, replServerMaxChangeNumber, this);
               if (op.getResultCode() != ResultCode.SUCCESS)
               {
@@ -709,7 +709,7 @@
    * @return the internal search operation
    * @throws Exception when raised.
    */
-  public static InternalSearchOperation seachForChangedEntries(
+  public static InternalSearchOperation searchForChangedEntries(
     DN baseDn,
     ChangeNumber fromChangeNumber,
     InternalSearchListener resultListener)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 57e665e..6b248c7 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -439,7 +439,7 @@
      * Create a new Persistent Server State that will be used to store
      * the last ChangeNmber seen from all LDAP servers in the topology.
      */
-    state = new PersistentServerState(baseDN);
+    state = new PersistentServerState(baseDN, serverId);
 
     /*
      * Create a replication monitor object responsible for publishing
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 949e982..4fba9f1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -193,7 +193,7 @@
   {
     ServerState state;
     if (emptyOldChanges)
-       state = new PersistentServerState(baseDn);
+       state = new PersistentServerState(baseDn, serverId);
     else
        state = new ServerState();
 
@@ -297,7 +297,7 @@
   {
     ServerState state;
     if (emptyOldChanges)
-       state = new PersistentServerState(baseDn);
+       state = new PersistentServerState(baseDn, serverId);
     else
        state = new ServerState();
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
index 65370f8..a9e3151 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -275,7 +275,7 @@
 
     // Retrieves the entries that have changed since the first modification
     InternalSearchOperation op =
-      ReplicationBroker.seachForChangedEntries(baseDn, fromChangeNumber, null);
+      ReplicationBroker.searchForChangedEntries(baseDn, fromChangeNumber, null);
 
     // The expected result is one entry .. the one previously modified
     assertTrue(op.getResultCode() == ResultCode.SUCCESS);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
index 2f6b617..c96d920 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
@@ -100,7 +100,7 @@
      * 2 ChangeNumbers have been saved in this new PersistentServerState.
      */
     DN baseDn = DN.decode(dn);
-    PersistentServerState state = new PersistentServerState(baseDn);
+    PersistentServerState state = new PersistentServerState(baseDn, (short) 1);
     ChangeNumberGenerator gen1 = new ChangeNumberGenerator((short) 1, state);
     ChangeNumberGenerator gen2 = new ChangeNumberGenerator((short) 2, state);
 
@@ -112,7 +112,7 @@
 
     state.save();
 
-    PersistentServerState stateSaved = new PersistentServerState(baseDn);
+    PersistentServerState stateSaved = new PersistentServerState(baseDn, (short) 1);
     ChangeNumber cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
     ChangeNumber cn2Saved = stateSaved.getMaxChangeNumber((short) 2);
 
@@ -122,7 +122,7 @@
         "cn2 has not been saved or loaded correctly for " + dn);
     
     state.clear();
-    stateSaved = new PersistentServerState(baseDn);
+    stateSaved = new PersistentServerState(baseDn, (short) 1);
     cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
     assertEquals(cn1Saved, null,
         "cn1 has not been saved after clear for " + dn);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java
index 81c1732..6145e74 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java
@@ -100,7 +100,7 @@
      * 2 ChangeNumbers have been saved in this new PersistentServerState.
      */
     DN baseDn = DN.decode(dn);
-    PersistentServerState state = new PersistentServerState(baseDn);
+    PersistentServerState state = new PersistentServerState(baseDn, (short) 1);
     ChangeNumberGenerator gen1 = new ChangeNumberGenerator((short) 1, state);
     ChangeNumberGenerator gen2 = new ChangeNumberGenerator((short) 2, state);
 
@@ -112,7 +112,7 @@
 
     state.save();
 
-    PersistentServerState stateSaved = new PersistentServerState(baseDn);
+    PersistentServerState stateSaved = new PersistentServerState(baseDn, (short) 1);
     ChangeNumber cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
     ChangeNumber cn2Saved = stateSaved.getMaxChangeNumber((short) 2);
 

--
Gitblit v1.10.0