From 2b90d968d139530054b8b0f29924f5bfc49f767d Mon Sep 17 00:00:00 2001
From: fdorson <fdorson@localhost>
Date: Tue, 11 Mar 2008 14:14:54 +0000
Subject: [PATCH] Fix for 1873 : ServerState should be updated after a server crash
---
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java | 6 +-
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java | 4
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java | 109 ++++++++++++++++++++++++++++++++---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java | 2
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 2
opends/src/messages/messages/replication.properties | 2
8 files changed, 111 insertions(+), 22 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 3019188..86bd3d6 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -274,3 +274,5 @@
for domain %s for replication server %s : %s
NOTICE_IGNORING_REMOTE_MONITOR_DATA_116=Some monitor data have been received \
from the server with server ID %s too late and are ignored
+NOTICE_SERVER_STATE_RECOVERY_117=ServerState recovery for domain %s, \
+updated with changeNumber %s
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java b/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
index 4145672..53f3bd5 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -34,6 +34,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.Iterator;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperationBasis;
@@ -72,6 +73,7 @@
private InternalClientConnection conn =
InternalClientConnection.getRootConnection();
private ASN1OctetString asn1BaseDn;
+ private short serverId;
/**
* The attribute name used to store the state in the backend.
@@ -81,10 +83,12 @@
/**
* create a new ServerState.
* @param baseDn The baseDN for which the ServerState is created
+ * @param serverId The serverId
*/
- public PersistentServerState(DN baseDn)
+ public PersistentServerState(DN baseDn, short serverId)
{
this.baseDn = baseDn;
+ this.serverId = serverId;
asn1BaseDn = new ASN1OctetString(baseDn.toString());
loadState();
}
@@ -139,16 +143,12 @@
}
/*
- * TODO : The ServerState is saved to the database periodically,
- * therefore in case of crash it is possible that is does not contain
- * the latest changes that have been processed and saved to the
- * database.
- * In order to make sure that we don't loose them, search all the entries
- * that have been updated after this entry.
- * This is done by using the HistoricalCsnOrderingMatchingRule
- * and an ordering index for historical attribute
+ * In order to make sure that the replication never looses changes,
+ * the server needs to search all the entries that have been
+ * updated after the last write of the ServerState.
+ * Inconsistencies may append after a crash.
*/
-
+ checkAndUpdateServerState();
}
/**
@@ -362,4 +362,91 @@
clearInMemory();
save();
}
-}
+
+ /**
+ * The ServerState is saved to the database periodically,
+ * therefore in case of crash it is possible that is does not contain
+ * the latest changes that have been processed and saved to the
+ * database.
+ * In order to make sure that we don't loose them, search all the entries
+ * that have been updated after this entry.
+ * This is done by using the HistoricalCsnOrderingMatchingRule
+ * and an ordering index for historical attribute
+ */
+ public final void checkAndUpdateServerState() {
+ Message message;
+ InternalSearchOperation op;
+ ChangeNumber serverStateMaxCn;
+ ChangeNumber dbMaxCn;
+ final AttributeType histType =
+ DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
+
+ // Retrieves the entries that have changed since the
+ // maxCn stored in the serverState
+ synchronized (this)
+ {
+ serverStateMaxCn = this.getMaxChangeNumber(serverId);
+
+ if (serverStateMaxCn == null)
+ return;
+
+ try {
+ op = ReplicationBroker.searchForChangedEntries(baseDn,
+ serverStateMaxCn, null);
+ }
+ catch (Exception e)
+ {
+ return;
+ }
+ if (op.getResultCode() != ResultCode.SUCCESS)
+ {
+ // An error happened trying to search for the updates
+ // Log an error
+ message = ERR_CANNOT_RECOVER_CHANGES.get(
+ baseDn.toNormalizedString());
+ logError(message);
+ }
+ else
+ {
+ dbMaxCn = serverStateMaxCn;
+ for (SearchResultEntry resEntry : op.getSearchEntries())
+ {
+ List<Attribute> attrs = resEntry.getAttribute(histType);
+ Iterator<AttributeValue> iav = attrs.get(0).getValues().iterator();
+ try
+ {
+ while (true)
+ {
+ AttributeValue attrVal = iav.next();
+ HistVal histVal = new HistVal(attrVal.getStringValue());
+ ChangeNumber cn = histVal.getCn();
+
+ if ((cn != null) && (cn.getServerId() == serverId))
+ {
+ // compare the csn regarding the maxCn we know and
+ // store the biggest
+ if (ChangeNumber.compare(dbMaxCn, cn) < 0)
+ {
+ dbMaxCn = cn;
+ }
+ }
+ }
+ }
+ catch(Exception e)
+ {
+ }
+ }
+
+ if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0)
+ {
+ // Update the serverState with the new maxCn
+ // present in the database
+ this.update(dbMaxCn);
+ message = NOTE_SERVER_STATE_RECOVERY.get(
+ baseDn.toNormalizedString(), dbMaxCn.toString());
+ logError(message);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index c74a0e8..0bd9ee2 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -276,7 +276,7 @@
* replication server and populate the replayOperations
* list.
*/
- InternalSearchOperation op = seachForChangedEntries(
+ InternalSearchOperation op = searchForChangedEntries(
baseDn, replServerMaxChangeNumber, this);
if (op.getResultCode() != ResultCode.SUCCESS)
{
@@ -709,7 +709,7 @@
* @return the internal search operation
* @throws Exception when raised.
*/
- public static InternalSearchOperation seachForChangedEntries(
+ public static InternalSearchOperation searchForChangedEntries(
DN baseDn,
ChangeNumber fromChangeNumber,
InternalSearchListener resultListener)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 57e665e..6b248c7 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -439,7 +439,7 @@
* Create a new Persistent Server State that will be used to store
* the last ChangeNmber seen from all LDAP servers in the topology.
*/
- state = new PersistentServerState(baseDN);
+ state = new PersistentServerState(baseDN, serverId);
/*
* Create a replication monitor object responsible for publishing
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index 949e982..4fba9f1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -193,7 +193,7 @@
{
ServerState state;
if (emptyOldChanges)
- state = new PersistentServerState(baseDn);
+ state = new PersistentServerState(baseDn, serverId);
else
state = new ServerState();
@@ -297,7 +297,7 @@
{
ServerState state;
if (emptyOldChanges)
- state = new PersistentServerState(baseDn);
+ state = new PersistentServerState(baseDn, serverId);
else
state = new ServerState();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
index 65370f8..a9e3151 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -275,7 +275,7 @@
// Retrieves the entries that have changed since the first modification
InternalSearchOperation op =
- ReplicationBroker.seachForChangedEntries(baseDn, fromChangeNumber, null);
+ ReplicationBroker.searchForChangedEntries(baseDn, fromChangeNumber, null);
// The expected result is one entry .. the one previously modified
assertTrue(op.getResultCode() == ResultCode.SUCCESS);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
index 2f6b617..c96d920 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
@@ -100,7 +100,7 @@
* 2 ChangeNumbers have been saved in this new PersistentServerState.
*/
DN baseDn = DN.decode(dn);
- PersistentServerState state = new PersistentServerState(baseDn);
+ PersistentServerState state = new PersistentServerState(baseDn, (short) 1);
ChangeNumberGenerator gen1 = new ChangeNumberGenerator((short) 1, state);
ChangeNumberGenerator gen2 = new ChangeNumberGenerator((short) 2, state);
@@ -112,7 +112,7 @@
state.save();
- PersistentServerState stateSaved = new PersistentServerState(baseDn);
+ PersistentServerState stateSaved = new PersistentServerState(baseDn, (short) 1);
ChangeNumber cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
ChangeNumber cn2Saved = stateSaved.getMaxChangeNumber((short) 2);
@@ -122,7 +122,7 @@
"cn2 has not been saved or loaded correctly for " + dn);
state.clear();
- stateSaved = new PersistentServerState(baseDn);
+ stateSaved = new PersistentServerState(baseDn, (short) 1);
cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
assertEquals(cn1Saved, null,
"cn1 has not been saved after clear for " + dn);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java
index 81c1732..6145e74 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java
@@ -100,7 +100,7 @@
* 2 ChangeNumbers have been saved in this new PersistentServerState.
*/
DN baseDn = DN.decode(dn);
- PersistentServerState state = new PersistentServerState(baseDn);
+ PersistentServerState state = new PersistentServerState(baseDn, (short) 1);
ChangeNumberGenerator gen1 = new ChangeNumberGenerator((short) 1, state);
ChangeNumberGenerator gen2 = new ChangeNumberGenerator((short) 2, state);
@@ -112,7 +112,7 @@
state.save();
- PersistentServerState stateSaved = new PersistentServerState(baseDn);
+ PersistentServerState stateSaved = new PersistentServerState(baseDn, (short) 1);
ChangeNumber cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
ChangeNumber cn2Saved = stateSaved.getMaxChangeNumber((short) 2);
--
Gitblit v1.10.0