opends/src/messages/messages/replication.properties
@@ -274,3 +274,5 @@ for domain %s for replication server %s : %s NOTICE_IGNORING_REMOTE_MONITOR_DATA_116=Some monitor data have been received \ from the server with server ID %s too late and are ignored NOTICE_SERVER_STATE_RECOVERY_117=ServerState recovery for domain %s, \ updated with changeNumber %s opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -34,6 +34,7 @@ import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Iterator; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyOperationBasis; @@ -72,6 +73,7 @@ private InternalClientConnection conn = InternalClientConnection.getRootConnection(); private ASN1OctetString asn1BaseDn; private short serverId; /** * The attribute name used to store the state in the backend. @@ -81,10 +83,12 @@ /** * create a new ServerState. * @param baseDn The baseDN for which the ServerState is created * @param serverId The serverId */ public PersistentServerState(DN baseDn) public PersistentServerState(DN baseDn, short serverId) { this.baseDn = baseDn; this.serverId = serverId; asn1BaseDn = new ASN1OctetString(baseDn.toString()); loadState(); } @@ -139,16 +143,12 @@ } /* * TODO : The ServerState is saved to the database periodically, * therefore in case of crash it is possible that is does not contain * the latest changes that have been processed and saved to the * database. * In order to make sure that we don't loose them, search all the entries * that have been updated after this entry. * This is done by using the HistoricalCsnOrderingMatchingRule * and an ordering index for historical attribute * In order to make sure that the replication never looses changes, * the server needs to search all the entries that have been * updated after the last write of the ServerState. * Inconsistencies may append after a crash. */ checkAndUpdateServerState(); } /** @@ -362,4 +362,91 @@ clearInMemory(); save(); } } /** * The ServerState is saved to the database periodically, * therefore in case of crash it is possible that is does not contain * the latest changes that have been processed and saved to the * database. * In order to make sure that we don't loose them, search all the entries * that have been updated after this entry. * This is done by using the HistoricalCsnOrderingMatchingRule * and an ordering index for historical attribute */ public final void checkAndUpdateServerState() { Message message; InternalSearchOperation op; ChangeNumber serverStateMaxCn; ChangeNumber dbMaxCn; final AttributeType histType = DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME); // Retrieves the entries that have changed since the // maxCn stored in the serverState synchronized (this) { serverStateMaxCn = this.getMaxChangeNumber(serverId); if (serverStateMaxCn == null) return; try { op = ReplicationBroker.searchForChangedEntries(baseDn, serverStateMaxCn, null); } catch (Exception e) { return; } if (op.getResultCode() != ResultCode.SUCCESS) { // An error happened trying to search for the updates // Log an error message = ERR_CANNOT_RECOVER_CHANGES.get( baseDn.toNormalizedString()); logError(message); } else { dbMaxCn = serverStateMaxCn; for (SearchResultEntry resEntry : op.getSearchEntries()) { List<Attribute> attrs = resEntry.getAttribute(histType); Iterator<AttributeValue> iav = attrs.get(0).getValues().iterator(); try { while (true) { AttributeValue attrVal = iav.next(); HistVal histVal = new HistVal(attrVal.getStringValue()); ChangeNumber cn = histVal.getCn(); if ((cn != null) && (cn.getServerId() == serverId)) { // compare the csn regarding the maxCn we know and // store the biggest if (ChangeNumber.compare(dbMaxCn, cn) < 0) { dbMaxCn = cn; } } } } catch(Exception e) { } } if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0) { // Update the serverState with the new maxCn // present in the database this.update(dbMaxCn); message = NOTE_SERVER_STATE_RECOVERY.get( baseDn.toNormalizedString(), dbMaxCn.toString()); logError(message); } } } } } opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -276,7 +276,7 @@ * replication server and populate the replayOperations * list. */ InternalSearchOperation op = seachForChangedEntries( InternalSearchOperation op = searchForChangedEntries( baseDn, replServerMaxChangeNumber, this); if (op.getResultCode() != ResultCode.SUCCESS) { @@ -709,7 +709,7 @@ * @return the internal search operation * @throws Exception when raised. */ public static InternalSearchOperation seachForChangedEntries( public static InternalSearchOperation searchForChangedEntries( DN baseDn, ChangeNumber fromChangeNumber, InternalSearchListener resultListener) opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -439,7 +439,7 @@ * Create a new Persistent Server State that will be used to store * the last ChangeNmber seen from all LDAP servers in the topology. */ state = new PersistentServerState(baseDN); state = new PersistentServerState(baseDN, serverId); /* * Create a replication monitor object responsible for publishing opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -193,7 +193,7 @@ { ServerState state; if (emptyOldChanges) state = new PersistentServerState(baseDn); state = new PersistentServerState(baseDn, serverId); else state = new ServerState(); @@ -297,7 +297,7 @@ { ServerState state; if (emptyOldChanges) state = new PersistentServerState(baseDn); state = new PersistentServerState(baseDn, serverId); else state = new ServerState(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -275,7 +275,7 @@ // Retrieves the entries that have changed since the first modification InternalSearchOperation op = ReplicationBroker.seachForChangedEntries(baseDn, fromChangeNumber, null); ReplicationBroker.searchForChangedEntries(baseDn, fromChangeNumber, null); // The expected result is one entry .. the one previously modified assertTrue(op.getResultCode() == ResultCode.SUCCESS); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
@@ -100,7 +100,7 @@ * 2 ChangeNumbers have been saved in this new PersistentServerState. */ DN baseDn = DN.decode(dn); PersistentServerState state = new PersistentServerState(baseDn); PersistentServerState state = new PersistentServerState(baseDn, (short) 1); ChangeNumberGenerator gen1 = new ChangeNumberGenerator((short) 1, state); ChangeNumberGenerator gen2 = new ChangeNumberGenerator((short) 2, state); @@ -112,7 +112,7 @@ state.save(); PersistentServerState stateSaved = new PersistentServerState(baseDn); PersistentServerState stateSaved = new PersistentServerState(baseDn, (short) 1); ChangeNumber cn1Saved = stateSaved.getMaxChangeNumber((short) 1); ChangeNumber cn2Saved = stateSaved.getMaxChangeNumber((short) 2); @@ -122,7 +122,7 @@ "cn2 has not been saved or loaded correctly for " + dn); state.clear(); stateSaved = new PersistentServerState(baseDn); stateSaved = new PersistentServerState(baseDn, (short) 1); cn1Saved = stateSaved.getMaxChangeNumber((short) 1); assertEquals(cn1Saved, null, "cn1 has not been saved after clear for " + dn); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java
@@ -100,7 +100,7 @@ * 2 ChangeNumbers have been saved in this new PersistentServerState. */ DN baseDn = DN.decode(dn); PersistentServerState state = new PersistentServerState(baseDn); PersistentServerState state = new PersistentServerState(baseDn, (short) 1); ChangeNumberGenerator gen1 = new ChangeNumberGenerator((short) 1, state); ChangeNumberGenerator gen2 = new ChangeNumberGenerator((short) 2, state); @@ -112,7 +112,7 @@ state.save(); PersistentServerState stateSaved = new PersistentServerState(baseDn); PersistentServerState stateSaved = new PersistentServerState(baseDn, (short) 1); ChangeNumber cn1Saved = stateSaved.getMaxChangeNumber((short) 1); ChangeNumber cn2Saved = stateSaved.getMaxChangeNumber((short) 2);