mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

fdorson
11.14.2008 2b90d968d139530054b8b0f29924f5bfc49f767d
Fix for 1873 : ServerState should be updated after a server crash

The ServerState is saved to the database periodically,
therefore in case of crash it is possible that is does not contain
the latest changes that have been processed and saved to the
database.
In order to make sure that the replication never looses changes,
when replication starts the server needs to search all the entries
that have been updated after the last write of the ServerState.
This is done by using the HistoricalCsnOrderingMatchingRule.
8 files modified
133 ■■■■ changed files
opends/src/messages/messages/replication.properties 2 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java 109 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 4 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java 6 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java 4 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -274,3 +274,5 @@
for domain %s for replication server %s : %s
NOTICE_IGNORING_REMOTE_MONITOR_DATA_116=Some monitor data have been received \
 from the server with server ID %s too late and are ignored
NOTICE_SERVER_STATE_RECOVERY_117=ServerState recovery for domain %s, \
updated with changeNumber %s
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -34,6 +34,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Iterator;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperationBasis;
@@ -72,6 +73,7 @@
   private InternalClientConnection conn =
       InternalClientConnection.getRootConnection();
   private ASN1OctetString asn1BaseDn;
   private short serverId;
   /**
    * The attribute name used to store the state in the backend.
@@ -81,10 +83,12 @@
  /**
   * create a new ServerState.
   * @param baseDn The baseDN for which the ServerState is created
   *  @param serverId The serverId
   */
  public PersistentServerState(DN baseDn)
  public PersistentServerState(DN baseDn, short serverId)
  {
    this.baseDn = baseDn;
    this.serverId = serverId;
    asn1BaseDn = new ASN1OctetString(baseDn.toString());
    loadState();
  }
@@ -139,16 +143,12 @@
    }
    /*
     * TODO : The ServerState is saved to the database periodically,
     * therefore in case of crash it is possible that is does not contain
     * the latest changes that have been processed and saved to the
     * database.
     * In order to make sure that we don't loose them, search all the entries
     * that have been updated after this entry.
     * This is done by using the HistoricalCsnOrderingMatchingRule
     * and an ordering index for historical attribute
     * In order to make sure that the replication never looses changes,
     * the server needs to search all the entries that have been
     * updated after the last write of the ServerState.
     * Inconsistencies may append after a crash.
     */
    checkAndUpdateServerState();
  }
  /**
@@ -362,4 +362,91 @@
    clearInMemory();
    save();
  }
}
  /**
   * The ServerState is saved to the database periodically,
   * therefore in case of crash it is possible that is does not contain
   * the latest changes that have been processed and saved to the
   * database.
   * In order to make sure that we don't loose them, search all the entries
   * that have been updated after this entry.
   * This is done by using the HistoricalCsnOrderingMatchingRule
   * and an ordering index for historical attribute
   */
  public final void checkAndUpdateServerState() {
    Message message;
    InternalSearchOperation op;
    ChangeNumber serverStateMaxCn;
    ChangeNumber dbMaxCn;
    final AttributeType histType =
      DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
    // Retrieves the entries that have changed since the
    // maxCn stored in the serverState
    synchronized (this)
    {
      serverStateMaxCn = this.getMaxChangeNumber(serverId);
      if (serverStateMaxCn == null)
        return;
      try {
        op = ReplicationBroker.searchForChangedEntries(baseDn,
            serverStateMaxCn, null);
      }
      catch (Exception  e)
      {
        return;
      }
      if (op.getResultCode() != ResultCode.SUCCESS)
      {
        // An error happened trying to search for the updates
        // Log an error
        message = ERR_CANNOT_RECOVER_CHANGES.get(
            baseDn.toNormalizedString());
        logError(message);
      }
      else
      {
        dbMaxCn = serverStateMaxCn;
        for (SearchResultEntry resEntry : op.getSearchEntries())
        {
          List<Attribute> attrs = resEntry.getAttribute(histType);
          Iterator<AttributeValue> iav = attrs.get(0).getValues().iterator();
          try
          {
            while (true)
            {
              AttributeValue attrVal = iav.next();
              HistVal histVal = new HistVal(attrVal.getStringValue());
              ChangeNumber cn = histVal.getCn();
              if ((cn != null) && (cn.getServerId() == serverId))
              {
                // compare the csn regarding the maxCn we know and
                // store the biggest
                if (ChangeNumber.compare(dbMaxCn, cn) < 0)
                {
                  dbMaxCn = cn;
                }
              }
            }
          }
          catch(Exception e)
          {
          }
        }
        if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0)
        {
          // Update the serverState with the new maxCn
          // present in the database
          this.update(dbMaxCn);
          message = NOTE_SERVER_STATE_RECOVERY.get(
              baseDn.toNormalizedString(), dbMaxCn.toString());
          logError(message);
        }
      }
    }
  }
}
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -276,7 +276,7 @@
               * replication server and populate the replayOperations
               * list.
               */
              InternalSearchOperation op = seachForChangedEntries(
              InternalSearchOperation op = searchForChangedEntries(
                baseDn, replServerMaxChangeNumber, this);
              if (op.getResultCode() != ResultCode.SUCCESS)
              {
@@ -709,7 +709,7 @@
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public static InternalSearchOperation seachForChangedEntries(
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -439,7 +439,7 @@
     * Create a new Persistent Server State that will be used to store
     * the last ChangeNmber seen from all LDAP servers in the topology.
     */
    state = new PersistentServerState(baseDN);
    state = new PersistentServerState(baseDN, serverId);
    /*
     * Create a replication monitor object responsible for publishing
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -193,7 +193,7 @@
  {
    ServerState state;
    if (emptyOldChanges)
       state = new PersistentServerState(baseDn);
       state = new PersistentServerState(baseDn, serverId);
    else
       state = new ServerState();
@@ -297,7 +297,7 @@
  {
    ServerState state;
    if (emptyOldChanges)
       state = new PersistentServerState(baseDn);
       state = new PersistentServerState(baseDn, serverId);
    else
       state = new ServerState();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -275,7 +275,7 @@
    // Retrieves the entries that have changed since the first modification
    InternalSearchOperation op =
      ReplicationBroker.seachForChangedEntries(baseDn, fromChangeNumber, null);
      ReplicationBroker.searchForChangedEntries(baseDn, fromChangeNumber, null);
    // The expected result is one entry .. the one previously modified
    assertTrue(op.getResultCode() == ResultCode.SUCCESS);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
@@ -100,7 +100,7 @@
     * 2 ChangeNumbers have been saved in this new PersistentServerState.
     */
    DN baseDn = DN.decode(dn);
    PersistentServerState state = new PersistentServerState(baseDn);
    PersistentServerState state = new PersistentServerState(baseDn, (short) 1);
    ChangeNumberGenerator gen1 = new ChangeNumberGenerator((short) 1, state);
    ChangeNumberGenerator gen2 = new ChangeNumberGenerator((short) 2, state);
@@ -112,7 +112,7 @@
    state.save();
    PersistentServerState stateSaved = new PersistentServerState(baseDn);
    PersistentServerState stateSaved = new PersistentServerState(baseDn, (short) 1);
    ChangeNumber cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
    ChangeNumber cn2Saved = stateSaved.getMaxChangeNumber((short) 2);
@@ -122,7 +122,7 @@
        "cn2 has not been saved or loaded correctly for " + dn);
    
    state.clear();
    stateSaved = new PersistentServerState(baseDn);
    stateSaved = new PersistentServerState(baseDn, (short) 1);
    cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
    assertEquals(cn1Saved, null,
        "cn1 has not been saved after clear for " + dn);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java
@@ -100,7 +100,7 @@
     * 2 ChangeNumbers have been saved in this new PersistentServerState.
     */
    DN baseDn = DN.decode(dn);
    PersistentServerState state = new PersistentServerState(baseDn);
    PersistentServerState state = new PersistentServerState(baseDn, (short) 1);
    ChangeNumberGenerator gen1 = new ChangeNumberGenerator((short) 1, state);
    ChangeNumberGenerator gen2 = new ChangeNumberGenerator((short) 2, state);
@@ -112,7 +112,7 @@
    state.save();
    PersistentServerState stateSaved = new PersistentServerState(baseDn);
    PersistentServerState stateSaved = new PersistentServerState(baseDn, (short) 1);
    ChangeNumber cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
    ChangeNumber cn2Saved = stateSaved.getMaxChangeNumber((short) 2);