mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

fdorson
11.14.2008 2b90d968d139530054b8b0f29924f5bfc49f767d
opends/src/messages/messages/replication.properties
@@ -274,3 +274,5 @@
for domain %s for replication server %s : %s
NOTICE_IGNORING_REMOTE_MONITOR_DATA_116=Some monitor data have been received \
 from the server with server ID %s too late and are ignored
NOTICE_SERVER_STATE_RECOVERY_117=ServerState recovery for domain %s, \
updated with changeNumber %s
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -34,6 +34,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Iterator;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperationBasis;
@@ -72,6 +73,7 @@
   private InternalClientConnection conn =
       InternalClientConnection.getRootConnection();
   private ASN1OctetString asn1BaseDn;
   private short serverId;
   /**
    * The attribute name used to store the state in the backend.
@@ -81,10 +83,12 @@
  /**
   * create a new ServerState.
   * @param baseDn The baseDN for which the ServerState is created
   *  @param serverId The serverId
   */
  public PersistentServerState(DN baseDn)
  public PersistentServerState(DN baseDn, short serverId)
  {
    this.baseDn = baseDn;
    this.serverId = serverId;
    asn1BaseDn = new ASN1OctetString(baseDn.toString());
    loadState();
  }
@@ -139,16 +143,12 @@
    }
    /*
     * TODO : The ServerState is saved to the database periodically,
     * therefore in case of crash it is possible that is does not contain
     * the latest changes that have been processed and saved to the
     * database.
     * In order to make sure that we don't loose them, search all the entries
     * that have been updated after this entry.
     * This is done by using the HistoricalCsnOrderingMatchingRule
     * and an ordering index for historical attribute
     * In order to make sure that the replication never looses changes,
     * the server needs to search all the entries that have been
     * updated after the last write of the ServerState.
     * Inconsistencies may append after a crash.
     */
    checkAndUpdateServerState();
  }
  /**
@@ -362,4 +362,91 @@
    clearInMemory();
    save();
  }
}
  /**
   * The ServerState is saved to the database periodically,
   * therefore in case of crash it is possible that is does not contain
   * the latest changes that have been processed and saved to the
   * database.
   * In order to make sure that we don't loose them, search all the entries
   * that have been updated after this entry.
   * This is done by using the HistoricalCsnOrderingMatchingRule
   * and an ordering index for historical attribute
   */
  public final void checkAndUpdateServerState() {
    Message message;
    InternalSearchOperation op;
    ChangeNumber serverStateMaxCn;
    ChangeNumber dbMaxCn;
    final AttributeType histType =
      DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
    // Retrieves the entries that have changed since the
    // maxCn stored in the serverState
    synchronized (this)
    {
      serverStateMaxCn = this.getMaxChangeNumber(serverId);
      if (serverStateMaxCn == null)
        return;
      try {
        op = ReplicationBroker.searchForChangedEntries(baseDn,
            serverStateMaxCn, null);
      }
      catch (Exception  e)
      {
        return;
      }
      if (op.getResultCode() != ResultCode.SUCCESS)
      {
        // An error happened trying to search for the updates
        // Log an error
        message = ERR_CANNOT_RECOVER_CHANGES.get(
            baseDn.toNormalizedString());
        logError(message);
      }
      else
      {
        dbMaxCn = serverStateMaxCn;
        for (SearchResultEntry resEntry : op.getSearchEntries())
        {
          List<Attribute> attrs = resEntry.getAttribute(histType);
          Iterator<AttributeValue> iav = attrs.get(0).getValues().iterator();
          try
          {
            while (true)
            {
              AttributeValue attrVal = iav.next();
              HistVal histVal = new HistVal(attrVal.getStringValue());
              ChangeNumber cn = histVal.getCn();
              if ((cn != null) && (cn.getServerId() == serverId))
              {
                // compare the csn regarding the maxCn we know and
                // store the biggest
                if (ChangeNumber.compare(dbMaxCn, cn) < 0)
                {
                  dbMaxCn = cn;
                }
              }
            }
          }
          catch(Exception e)
          {
          }
        }
        if (ChangeNumber.compare(dbMaxCn, serverStateMaxCn) > 0)
        {
          // Update the serverState with the new maxCn
          // present in the database
          this.update(dbMaxCn);
          message = NOTE_SERVER_STATE_RECOVERY.get(
              baseDn.toNormalizedString(), dbMaxCn.toString());
          logError(message);
        }
      }
    }
  }
}
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -276,7 +276,7 @@
               * replication server and populate the replayOperations
               * list.
               */
              InternalSearchOperation op = seachForChangedEntries(
              InternalSearchOperation op = searchForChangedEntries(
                baseDn, replServerMaxChangeNumber, this);
              if (op.getResultCode() != ResultCode.SUCCESS)
              {
@@ -709,7 +709,7 @@
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public static InternalSearchOperation seachForChangedEntries(
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -439,7 +439,7 @@
     * Create a new Persistent Server State that will be used to store
     * the last ChangeNmber seen from all LDAP servers in the topology.
     */
    state = new PersistentServerState(baseDN);
    state = new PersistentServerState(baseDN, serverId);
    /*
     * Create a replication monitor object responsible for publishing
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -193,7 +193,7 @@
  {
    ServerState state;
    if (emptyOldChanges)
       state = new PersistentServerState(baseDn);
       state = new PersistentServerState(baseDn, serverId);
    else
       state = new ServerState();
@@ -297,7 +297,7 @@
  {
    ServerState state;
    if (emptyOldChanges)
       state = new PersistentServerState(baseDn);
       state = new PersistentServerState(baseDn, serverId);
    else
       state = new ServerState();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -275,7 +275,7 @@
    // Retrieves the entries that have changed since the first modification
    InternalSearchOperation op =
      ReplicationBroker.seachForChangedEntries(baseDn, fromChangeNumber, null);
      ReplicationBroker.searchForChangedEntries(baseDn, fromChangeNumber, null);
    // The expected result is one entry .. the one previously modified
    assertTrue(op.getResultCode() == ResultCode.SUCCESS);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentServerStateTest.java
@@ -100,7 +100,7 @@
     * 2 ChangeNumbers have been saved in this new PersistentServerState.
     */
    DN baseDn = DN.decode(dn);
    PersistentServerState state = new PersistentServerState(baseDn);
    PersistentServerState state = new PersistentServerState(baseDn, (short) 1);
    ChangeNumberGenerator gen1 = new ChangeNumberGenerator((short) 1, state);
    ChangeNumberGenerator gen2 = new ChangeNumberGenerator((short) 2, state);
@@ -112,7 +112,7 @@
    state.save();
    PersistentServerState stateSaved = new PersistentServerState(baseDn);
    PersistentServerState stateSaved = new PersistentServerState(baseDn, (short) 1);
    ChangeNumber cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
    ChangeNumber cn2Saved = stateSaved.getMaxChangeNumber((short) 2);
@@ -122,7 +122,7 @@
        "cn2 has not been saved or loaded correctly for " + dn);
    
    state.clear();
    stateSaved = new PersistentServerState(baseDn);
    stateSaved = new PersistentServerState(baseDn, (short) 1);
    cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
    assertEquals(cn1Saved, null,
        "cn1 has not been saved after clear for " + dn);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/PersistentStateTest.java
@@ -100,7 +100,7 @@
     * 2 ChangeNumbers have been saved in this new PersistentServerState.
     */
    DN baseDn = DN.decode(dn);
    PersistentServerState state = new PersistentServerState(baseDn);
    PersistentServerState state = new PersistentServerState(baseDn, (short) 1);
    ChangeNumberGenerator gen1 = new ChangeNumberGenerator((short) 1, state);
    ChangeNumberGenerator gen2 = new ChangeNumberGenerator((short) 2, state);
@@ -112,7 +112,7 @@
    state.save();
    PersistentServerState stateSaved = new PersistentServerState(baseDn);
    PersistentServerState stateSaved = new PersistentServerState(baseDn, (short) 1);
    ChangeNumber cn1Saved = stateSaved.getMaxChangeNumber((short) 1);
    ChangeNumber cn2Saved = stateSaved.getMaxChangeNumber((short) 2);