mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.55.2009 d19acb303c4ff90e48fd98ce2d7ba739ca9ea2db
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -114,6 +114,7 @@
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.service.ReplicationMonitor;
import org.opends.server.tasks.TaskUtils;
@@ -174,9 +175,62 @@
 */
public class LDAPReplicationDomain extends ReplicationDomain
       implements ConfigurationChangeListener<ReplicationDomainCfg>,
                  AlertGenerator, InternalSearchListener
                  AlertGenerator
{
  /**
   * This class is used in the session establishment phase
   * when no Replication Server with all the local changes has been found
   * and we therefore need to recover them.
   * A search is then performed on the database using this
   * internalSearchListener.
   */
  private class ScanSearchListener implements InternalSearchListener
  {
    private ChangeNumber startingChangeNumber = null;
    private ChangeNumber endChangeNumber = null;
    public ScanSearchListener(
        ChangeNumber startingChangeNumber,
        ChangeNumber endChangeNumber)
    {
      this.startingChangeNumber = startingChangeNumber;
      this.endChangeNumber = endChangeNumber;
    }
    @Override
    public void handleInternalSearchEntry(
        InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
        throws DirectoryException
    {
      // Build the list of Operations that happened on this entry
      // after startingChangeNumber and before endChangeNumber and
      // add them to the replayOperations list
      Iterable<FakeOperation> updates =
        Historical.generateFakeOperations(searchEntry);
      for (FakeOperation op : updates)
      {
        ChangeNumber cn = op.getChangeNumber();
        if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber)))
        {
          synchronized (replayOperations)
          {
            replayOperations.put(cn, op);
          }
        }
      }
    }
    @Override
    public void handleInternalSearchReference(
        InternalSearchOperation searchOperation,
        SearchResultReference searchReference) throws DirectoryException
    {
       // Nothing to do.
    }
  }
  /**
   * The fully-qualified name of this class.
   */
  private static final String CLASS_NAME =
@@ -398,6 +452,80 @@
  }
  /**
   * The thread that is responsible to update the RS to which this domain is
   * connected in case it is late and there is no RS which is up to date.
   */
  private class RSUpdater extends DirectoryThread
  {
    private ChangeNumber startChangeNumber;
    protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
    {
      super("Replication Server Updater for server id " +
            serverId + " and domain " + baseDn.toString());
      this.startChangeNumber = replServerMaxChangeNumber;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      // Replication server is missing some of our changes: let's
      // send them to him.
      Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
      logError(message);
      /*
       * Get all the changes that have not been seen by this
       * replication server and publish them.
       */
      try
      {
        if (buildAndPublishMissingChanges(startChangeNumber, broker))
        {
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
          synchronized(replayOperations)
          {
            replayOperations.clear();
          }
        }
        else
        {
          /*
           * An error happened trying to search for the updates
           * This server will start accepting again new updates but
           * some inconsistencies will stay between servers.
           * Log an error for the repair tool
           * that will need to re-synchronize the servers.
           */
          message = ERR_CANNOT_RECOVER_CHANGES.get(
              baseDn.toNormalizedString());
          logError(message);
        }
      } catch (Exception e)
      {
        /*
         * An error happened trying to search for the updates
         * This server will start accepting again new updates but
         * some inconsistencies will stay between servers.
         * Log an error for the repair tool
         * that will need to re-synchronize the servers.
         */
        message = ERR_CANNOT_RECOVER_CHANGES.get(
            baseDn.toNormalizedString());
        logError(message);
      }
      finally
      {
        broker.setRecoveryRequired(false);
      }
    }
  }
  /**
   * Creates a new ReplicationDomain using configuration from configEntry.
   *
   * @param configuration    The configuration of this ReplicationDomain.
@@ -490,9 +618,6 @@
      saveGenerationId(generationId);
    }
    startPublishService(replicationServers, window, heartbeatInterval,
        configuration.getChangetimeHeartbeatInterval());
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on this replication domain.
@@ -505,6 +630,9 @@
    pendingChanges =
      new PendingChanges(generator, this);
    startPublishService(replicationServers, window, heartbeatInterval,
        configuration.getChangetimeHeartbeatInterval());
    remotePendingChanges = new RemotePendingChanges(getServerState());
    // listen for changes on the configuration
@@ -4356,74 +4484,9 @@
        if ((ourMaxChangeNumber != null) &&
            (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
        {
          // Replication server is missing some of our changes: let's
          // send them to him.
          Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
          logError(message);
          /*
           * Get all the changes that have not been seen by this
           * replication server and populate the replayOperations
           * list.
           */
          InternalSearchOperation op = searchForChangedEntries(
              baseDn, replServerMaxChangeNumber, this);
          if (op.getResultCode() != ResultCode.SUCCESS)
          {
            /*
             * An error happened trying to search for the updates
             * This server will start accepting again new updates but
             * some inconsistencies will stay between servers.
             * Log an error for the repair tool
             * that will need to re-synchronize the servers.
             */
            message = ERR_CANNOT_RECOVER_CHANGES.get(
                baseDn.toNormalizedString());
            logError(message);
          } else
          {
            for (FakeOperation replayOp :
              replayOperations.tailMap(replServerMaxChangeNumber).values())
            {
              ChangeNumber cn = replayOp.getChangeNumber();
              /*
               * Because the entry returned by the search operation
               * can contain old historical information, it is
               * possible that some of the FakeOperation are
               * actually older than the last ChangeNumber known by
               * the Replication Server.
               * In such case don't send the operation.
               */
              if (!cn.newer(replServerMaxChangeNumber))
              {
                continue;
              }
              /*
               * Check if the DeleteOperation has been abandoned before
               * being processed. This is necessary because the replayOperation
               *
               */
              if (replayOp instanceof FakeDelOperation)
              {
                FakeDelOperation delOp = (FakeDelOperation) replayOp;
                if (findEntryDN(delOp.getUUID()) != null)
                {
                  continue;
                }
              }
              message =
                DEBUG_SENDING_CHANGE.get(
                    replayOp.getChangeNumber().toString());
              logError(message);
              session.publish(replayOp.generateMessage());
            }
            message = DEBUG_CHANGES_SENT.get();
            logError(message);
          }
          replayOperations.clear();
          pendingChanges.setRecovering(true);
          broker.setRecoveryRequired(true);
          new RSUpdater(replServerMaxChangeNumber).start();
        }
      }
    } catch (Exception e)
@@ -4437,19 +4500,124 @@
  }
  /**
   * Build the list of changes that have been processed by this server
   * after the ChangeNumber given as a parameter and publish them
   * using the given session.
   *
   * @param startingChangeNumber  The ChangeNumber whe we need to start the
   *                              search
   * @param session               The session to use to publish the changes
   *
   * @return                      A boolean indicating he success of the
   *                              operation.
   * @throws Exception            if an Exception happens during the search.
   */
  public boolean buildAndPublishMissingChanges(
      ChangeNumber startingChangeNumber,
      ReplicationBroker session)
      throws Exception
  {
    // Trim the changes in replayOperations that are older than
    // the startingChangeNumber.
    synchronized (replayOperations)
    {
      Iterator<ChangeNumber> it = replayOperations.keySet().iterator();
      while (it.hasNext())
      {
        if (it.next().olderOrEqual(startingChangeNumber))
        {
          it.remove();
        }
        else
        {
          break;
        }
      }
    }
    ChangeNumber lastRetrievedChange = null;
    long missingChangesDelta;
    InternalSearchOperation op;
    ChangeNumber currentStartChangeNumber = startingChangeNumber;
    do
    {
      lastRetrievedChange = null;
      // We can't do the search in one go because we need to
      // store the results so that we are sure we send the operations
      // in order and because the list might be large
      // So we search by interval of 10 seconds
      // and store the results in the replayOperations list
      // so that they are sorted before sending them.
      missingChangesDelta = currentStartChangeNumber.getTime() + 10000;
      ChangeNumber endChangeNumber =
        new ChangeNumber(
            missingChangesDelta, 0xffffffff, serverId);
      ScanSearchListener listener =
        new ScanSearchListener(currentStartChangeNumber, endChangeNumber);
      op = searchForChangedEntries(
          baseDn, currentStartChangeNumber, endChangeNumber, listener);
      // Publish and remove all the changes from the replayOperations list
      // that are older than the endChangeNumber.
      LinkedList<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
      synchronized (replayOperations)
      {
        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
        while (itOp.hasNext())
        {
          FakeOperation fakeOp = itOp.next();
          if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber))
              && state.cover(fakeOp.getChangeNumber()))
          {
            lastRetrievedChange = fakeOp.getChangeNumber();
            opsToSend.add(fakeOp);
            itOp.remove();
          }
          else
          {
            break;
          }
        }
      }
      for (FakeOperation opToSend : opsToSend)
      {
          session.publishRecovery(opToSend.generateMessage());
      }
      opsToSend.clear();
      if (lastRetrievedChange != null)
      {
        currentStartChangeNumber = lastRetrievedChange;
      }
      else
      {
        currentStartChangeNumber = endChangeNumber;
      }
    } while (pendingChanges.RecoveryUntil(lastRetrievedChange) &&
             (op.getResultCode().equals(ResultCode.SUCCESS)));
    return op.getResultCode().equals(ResultCode.SUCCESS);
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute. The only changes that will
   * be send will be the one generated on the serverId provided in
   * fromChangeNumber.
   * @param baseDn the base DN
   * @param fromChangeNumber The change number from which we want the changes
   * @param resultListener that will process the entries returned.
   * @param fromChangeNumber The ChangeNumber from which we want the changes
   * @param lastChangeNumber The max ChangeNumber that the search should return
   * @param resultListener   The listener that will process the entries returned
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    ChangeNumber lastChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  {
@@ -4457,8 +4625,16 @@
      InternalClientConnection.getRootConnection();
    Integer serverId = fromChangeNumber.getServerId();
    String maxValueForId = "ffffffffffffffff" +
      String.format("%04x", serverId) + "ffffffff";
    String maxValueForId;
    if (lastChangeNumber == null)
    {
      maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId)
                      + "ffffffff";
    }
    else
    {
      maxValueForId = lastChangeNumber.toString();
    }
    LDAPFilter filter = LDAPFilter.decode(
       "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
@@ -4479,36 +4655,24 @@
  }
  /**
   * {@inheritDoc}
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute. The only changes that will
   * be send will be the one generated on the serverId provided in
   * fromChangeNumber.
   * @param baseDn the base DN
   * @param fromChangeNumber The change number from which we want the changes
   * @param resultListener that will process the entries returned.
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public void handleInternalSearchEntry(
    InternalSearchOperation searchOperation,
    SearchResultEntry searchEntry)
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  {
    /*
     * This call back is called at session establishment phase
     * for each entry that has been changed by this server and the changes
     * have not been sent to any Replication Server.
     * The role of this method is to build equivalent operation from
     * the historical information and add them in the replayOperations
     * table.
     */
    Iterable<FakeOperation> updates =
      Historical.generateFakeOperations(searchEntry);
    for (FakeOperation op : updates)
    {
      replayOperations.put(op.getChangeNumber(), op);
    }
  }
  /**
   * {@inheritDoc}
   */
  public void handleInternalSearchReference(
    InternalSearchOperation searchOperation,
    SearchResultReference searchReference)
  {
    // TODO to be implemented
    return searchForChangedEntries(
        baseDn, fromChangeNumber, null, resultListener);
  }
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -212,7 +212,14 @@
      {
        numSentUpdates++;
        LDAPUpdateMsg updateMsg = firstChange.getMsg();
        domain.publish(updateMsg);
        if (!recoveringOldChanges)
        {
          domain.publish(updateMsg);
        }
        else
        {
          domain.getServerState().update(updateMsg.getChangeNumber());
        }
      }
      pendingChanges.remove(firstChangeNumber);
@@ -248,4 +255,47 @@
    _commit(changeNumber, msg);
    return _pushCommittedChanges();
  }
  private boolean recoveringOldChanges = false;
  /**
   * Set the PendingChangesList structure in a mode where it is
   * waiting for the RS to receive all the previous changes to
   * be sent before starting to process the changes normally.
   * In this mode, The Domain does not publish the changes from
   * the pendingChanges because there are older changes that
   * need to be published before.
   *
   * @param b The recovering status that must be set.
   */
  public void setRecovering(boolean b)
  {
    recoveringOldChanges = b;
  }
  /**
   * Allows to update the recovery situation by comparing the ChangeNumber of
   * the last change that was sent to the ReplicationServer with the
   * ChangeNumber of the last operation that was taken out of the
   * PendingChanges list.
   * If he two match then the recovery is completed and normal procedure can
   * restart. Otherwise the RSUpdate thread must continue to look for
   * older changes and no changes can be committed from the pendingChanges list.
   *
   * @param recovered  The ChangeNumber of the last change that was published
   *                   to the ReplicationServer.
   *
   * @return           A boolean indicating if the recovery is completed (false)
   *                   or must continue (true).
   */
  public synchronized boolean RecoveryUntil(ChangeNumber recovered)
  {
    ChangeNumber lastLocalChange = domain.getLastLocalChange();
    if ((recovered != null) && (recovered.newerOrEquals(lastLocalChange)))
    {
      recoveringOldChanges = false;
    }
    return recoveringOldChanges;
  }
}
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -123,6 +123,18 @@
  }
  /**
   * Checks that the ChangeNumber given as a parameter is in this ServerState.
   *
   * @param   covered The ChangeNumber that should be checked.
   * @return  A boolean indicating if this ServerState contains the ChangeNumber
   *          given in parameter.
   */
  public boolean cover(ChangeNumber covered)
  {
    return state.cover(covered);
  }
  /**
   * Update the Server State with a ChangeNumber.
   * All operations with smaller CSN and the same serverID must be committed
   * before calling this method.
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -188,6 +188,7 @@
  private long generationID;
  private int updateDoneCount = 0;
  private boolean connectRequiresRecovery = false;
  /**
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -694,6 +695,21 @@
                rsServerId = serverInfo.getServerId();
                rsServerUrl = bestServer;
                receiveTopo(topologyMsg);
                // Log a message to let the administrator know that the failure
                // was resolved.
                // Wakeup all the thread that were waiting on the window
                // on the previous connection.
                connectionError = false;
                if (sendWindow != null)
                {
                  sendWindow.release(Integer.MAX_VALUE);
                }
                sendWindow = new Semaphore(maxSendWindow);
                rcvWindow = maxRcvWindow;
                connected = true;
                // May have created a broker with null replication domain for
                // unit test purpose.
                if (domain != null)
@@ -703,8 +719,7 @@
                      serverInfo.getGenerationId(),
                      session);
                }
                receiveTopo(topologyMsg);
                connected = true;
                if (getRsGroupId() != groupId)
                {
                 // Connected to replication server with wrong group id:
@@ -766,17 +781,6 @@
      if (connected)
      {
        // Log a message to let the administrator know that the failure was
        // resolved.
        // Wakeup all the thread that were waiting on the window
        // on the previous connection.
        connectionError = false;
        if (sendWindow != null)
        {
          sendWindow.release(Integer.MAX_VALUE);
        }
        sendWindow = new Semaphore(maxSendWindow);
        rcvWindow = maxRcvWindow;
        connectPhaseLock.notify();
        if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
@@ -1786,6 +1790,25 @@
   */
  public void publish(ReplicationMsg msg)
  {
    _publish(msg, false);
  }
  /**
   * Publish a recovery message to the other servers.
   * @param msg the message to publish
   */
  public void publishRecovery(ReplicationMsg msg)
  {
    _publish(msg, true);
  }
  /**
   * Publish a message to the other servers.
   * @param msg the message to publish
   * @param recoveryMsg the message is a recovery Message
   */
  void _publish(ReplicationMsg msg, boolean recoveryMsg)
  {
    boolean done = false;
    while (!done && !shutdown)
@@ -1825,6 +1848,15 @@
          currentWindowSemaphore = sendWindow;
        }
        // If the Replication domain has decided that there is a need to
        // recover some changes then it is not allowed to send this
        // change but it will be the responsibility of the recovery thread to
        // do it.
        if (!recoveryMsg & connectRequiresRecovery)
        {
          return;
        }
        if (msg instanceof UpdateMsg)
        {
          // Acquiring the window credit must be done outside of the
@@ -2548,4 +2580,18 @@
      ctHeartbeatPublisherThread = null;
    }
  }
  /**
   * Set the connectRequiresRecovery to the provided value.
   * This flag is used to indicate if a recovery of Update is necessary
   * after a reconnection to a RS.
   * It is the responsibility of the ReplicationDomain to set it during the
   * sessionInitiated phase.
   *
   * @param b the new value of the connectRequiresRecovery.
   */
  public void setRecoveryRequired(boolean b)
  {
    connectRequiresRecovery = b;
  }
}
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -2909,4 +2909,16 @@
  {
    return eClIncludes;
  }
  /**
   * Returns the ChangeNUmber of the last Change that was fully processed
   * by this ReplicationDomain.
   *
   * @return The ChangeNUmber of the last Change that was fully processed
   *         by this ReplicationDomain.
   */
  public ChangeNumber getLastLocalChange()
  {
    return state.getMaxChangeNumber(serverID);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1231,17 +1231,31 @@
   * Deletes the provided entry from the Directory Server using an
   * internal operation.
   *
   * @param  entry  The entry to be added.
   * @param  entry  The entry to be deleted.
   *
   * @throws  Exception  If an unexpected problem occurs.
   */
  public static void deleteEntry(Entry entry)
         throws Exception
  {
    deleteEntry(entry.getDN());
  }
  /**
   * Deletes the provided entry from the Directory Server using an
   * internal operation.
   *
   * @param  dn  The dn of entry to be deleted
   *
   * @throws  Exception  If an unexpected problem occurs.
   */
  public static void deleteEntry(DN dn)
         throws Exception
  {
    InternalClientConnection conn =
         InternalClientConnection.getRootConnection();
    DeleteOperation deleteOperation = conn.processDelete(entry.getDN());
    DeleteOperation deleteOperation = conn.processDelete(dn);
    assertEquals(deleteOperation.getResultCode(), ResultCode.SUCCESS);
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -26,127 +26,71 @@
 */
package org.opends.server.replication.plugin;
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchResultEntry;
import org.testng.annotations.BeforeClass;
import org.opends.server.util.TimeThread;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
/**
 * Test the usage of the historical data of the replication.
 */
public class HistoricalCsnOrderingTest
extends ReplicationTestCase
       extends ReplicationTestCase
{
  /**
   * A "person" entry
   */
  protected Entry personEntry;
  private int replServerPort;
  final int serverId = 123;
  /**
   * Set up the environment for performing the tests in this Class.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  @Override
  public void setUp() throws Exception
  public class TestBroker extends ReplicationBroker
  {
    super.setUp();
    LinkedList<ReplicationMsg> list = null;
    // Create necessary backend top level entry
    String topEntry = "dn: ou=People," + TEST_ROOT_DN_STRING + "\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
    addEntry(TestCaseUtils.entryFromLdifString(topEntry));
    public TestBroker(LinkedList<ReplicationMsg> list)
    {
      super(null, null, null, 0, 0, (long) 0, (long) 0, null, (byte) 0, (long) 0);
      this.list = list;
    }
    // find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    replServerPort = socket.getLocalPort();
    socket.close();
    public void publishRecovery(ReplicationMsg msg)
    {
      list.add(msg);
    }
    // replication server
    String replServerLdif =
      "dn: cn=Replication Server, " + SYNCHRO_PLUGIN_DN + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-replication-server\n"
      + "cn: Replication Server\n"
      + "ds-cfg-replication-port: " + replServerPort + "\n"
      + "ds-cfg-replication-db-directory: HistoricalCsnOrderingTestDb\n"
      + "ds-cfg-replication-server-id: 101\n";
    replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif);
    // suffix synchronized
    String testName = "historicalCsnOrderingTest";
    String synchroServerLdif =
      "dn: cn=" + testName + ", cn=domains, " + SYNCHRO_PLUGIN_DN + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-replication-domain\n"
      + "cn: " + testName + "\n"
      + "ds-cfg-base-dn: ou=People," + TEST_ROOT_DN_STRING + "\n"
      + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
      + "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n";
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    String personLdif = "dn: uid=user.1,ou=People," + TEST_ROOT_DN_STRING + "\n"
      + "objectClass: top\n" + "objectClass: person\n"
      + "objectClass: organizationalPerson\n"
      + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
      + "homePhone: 951-245-7634\n"
      + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
      + "mobile: 027-085-0537\n"
      + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
      + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
      + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
      + "street: 17984 Thirteenth Street\n"
      + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
      + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
      + "userPassword: password\n" + "initials: AA\n";
    personEntry = TestCaseUtils.entryFromLdifString(personLdif);
    configureReplication();
  }
  /**
   * Add an entry in the database
   *
   */
  private void addEntry(Entry entry) throws Exception
  {
    AddOperationBasis addOp = new AddOperationBasis(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
        .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
        entry.getUserAttributes(), entry.getOperationalAttributes());
    addOp.setInternalOperation(true);
    addOp.run();
    assertNotNull(getEntry(entry.getDN(), 1000, true));
  }
  /**
@@ -182,10 +126,19 @@
   * informations.
   */
  @Test()
  public void changesCmpTest()
  public void buildAndPublishMissingChangesOneEntryTest()
  throws Exception
  {
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    final int serverId = 123;
    final DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
    TestCaseUtils.initializeTestBackend(true);
    ReplicationServer rs = createReplicationServer();
    // Create Replication Server and Domain
    LDAPReplicationDomain rd1 = createReplicationDomain(serverId);
    try
    {
      long startTime = TimeThread.getTime();
    final DN dn1 = DN.decode("cn=test1," + baseDn.toString());
    final AttributeType histType =
      DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
@@ -246,39 +199,183 @@
          "Second historical value:" + av.getValue().toString()));
    }
    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
    TestBroker session = new TestBroker(opList);
    boolean result =
      rd1.buildAndPublishMissingChanges(
          new ChangeNumber(startTime, 0, serverId),
          session);
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 3, "buildAndPublishMissingChanges should return 3 operations");
    assertTrue(opList.getFirst().getClass().equals(AddMsg.class));
    // Build a change number from the first modification
    String hv[] = histValue.split(":");
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        hv[1]));
    ChangeNumber fromChangeNumber =
      new ChangeNumber(hv[1]);
    logError(Message.raw(Category.SYNC, Severity.INFORMATION, hv[1]));
    ChangeNumber fromChangeNumber = new ChangeNumber(hv[1]);
    // Retrieves the entries that have changed since the first modification
    InternalSearchOperation op =
      LDAPReplicationDomain.searchForChangedEntries(
          baseDn, fromChangeNumber, null);
    opList = new LinkedList<ReplicationMsg>();
    session = new TestBroker(opList);
    // The expected result is one entry .. the one previously modified
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    assertEquals(op.getSearchEntries().size(), 1);
    // From the historical of this entry, rebuild operations
    // Since there have been 2 modifications and 1 add, there should be 3
    // operations rebuild from this state.
    int updatesCnt = 0;
    for (SearchResultEntry searchEntry : op.getSearchEntries())
    {
      logError(Message.raw(Category.SYNC, Severity.INFORMATION,
          searchEntry.toString()));
      Iterable<FakeOperation> updates =
        Historical.generateFakeOperations(searchEntry);
      for (FakeOperation fop : updates)
      {
        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
            fop.generateMessage().toString()));
        updatesCnt++;
      }
    result =
      rd1.buildAndPublishMissingChanges(
          fromChangeNumber,
          session);
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 1, "buildAndPublishMissingChanges should return 1 operation");
    assertTrue(opList.getFirst().getClass().equals(ModifyMsg.class));
    }
    assertTrue(updatesCnt == 3);
    finally
    {
      MultimasterReplication.deleteDomain(baseDn);
      rs.remove();
    }
  }
  /**
   * Test that we can retrieve the entries that were missed by
   * a replication server and can  re-build operations from the historical
   * informations.
   */
  @Test()
  public void buildAndPublishMissingChangesSeveralEntriesTest()
  throws Exception
  {
    final DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
    TestCaseUtils.initializeTestBackend(true);
    ReplicationServer rs = createReplicationServer();
    // Create Replication Server and Domain
    LDAPReplicationDomain rd1 = createReplicationDomain(serverId);
    long startTime = TimeThread.getTime();
    try
    {
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
    "Starting replication test : changesCmpTest"));
    // Add 3 entries.
    String dnTest1 = "cn=test1," + baseDn.toString();
    String dnTest2 = "cn=test2," + baseDn.toString();
    String dnTest3 = "cn=test3," + baseDn.toString();
    TestCaseUtils.addEntry(
        "dn: " + dnTest3,
        "displayname: Test1",
        "objectClass: top",
        "objectClass: person",
        "objectClass: organizationalPerson",
        "objectClass: inetOrgPerson",
        "cn: test1",
        "sn: test"
    );
    TestCaseUtils.addEntry(
        "dn: " + dnTest1,
        "displayname: Test1",
        "objectClass: top",
        "objectClass: person",
        "objectClass: organizationalPerson",
        "objectClass: inetOrgPerson",
        "cn: test1",
        "sn: test"
    );
    TestCaseUtils.deleteEntry(DN.decode(dnTest3));
    TestCaseUtils.addEntry(
        "dn: " + dnTest2,
        "displayname: Test1",
        "objectClass: top",
        "objectClass: person",
        "objectClass: organizationalPerson",
        "objectClass: inetOrgPerson",
        "cn: test1",
        "sn: test"
    );
    // Perform modifications on the 2 entries
    int resultCode = TestCaseUtils.applyModifications(false,
        "dn: cn=test2," + baseDn.toString(),
        "changetype: modify",
        "add: description",
    "description: foo");
    resultCode = TestCaseUtils.applyModifications(false,
        "dn: cn=test1," + baseDn.toString(),
        "changetype: modify",
        "add: description",
    "description: foo");
    assertEquals(resultCode, 0);
    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
    TestBroker session = new TestBroker(opList);
    // Call the buildAndPublishMissingChanges and check that this method
    // correctly generates the 4 operations in the correct order.
    boolean result =
      rd1.buildAndPublishMissingChanges(
          new ChangeNumber(startTime, 0, serverId),
          session);
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 5, "buildAndPublishMissingChanges should return 5 operations");
    ReplicationMsg msg = opList.removeFirst();
    assertTrue(msg.getClass().equals(AddMsg.class));
    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest1);
    msg = opList.removeFirst();
    assertTrue(msg.getClass().equals(DeleteMsg.class));
    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest3);
    msg = opList.removeFirst();
    assertTrue(msg.getClass().equals(AddMsg.class));
    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest2);
    msg = opList.removeFirst();
    assertTrue(msg.getClass().equals(ModifyMsg.class));
    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest2);
    msg = opList.removeFirst();
    assertTrue(msg.getClass().equals(ModifyMsg.class));
    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest1);
    }
    finally
    {
      MultimasterReplication.deleteDomain(baseDn);
      rs.remove();
    }
  }
  SortedSet<String> replServers = new TreeSet<String>();
  private ReplicationServer createReplicationServer() throws ConfigException
  {
    int rsPort;
    try
    {
      ServerSocket socket1 = TestCaseUtils.bindFreePort();
      rsPort = socket1.getLocalPort();
      socket1.close();
      replServers.add("localhost:" + rsPort);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(rsPort, "HistoricalCsnOrdering",
            0, 1, 0, 100, replServers, 1, 1000, 5000);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      replicationServer.clearDb();
      return replicationServer;
    }
    catch (IOException e)
    {
      fail("Unable to determinate some free ports " +
          stackTraceToSingleLineString(e));
      return null;
    }
  }
  private LDAPReplicationDomain createReplicationDomain(int dsId)
          throws DirectoryException, ConfigException
  {
    DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
    DomainFakeCfg domainConf =
      new DomainFakeCfg(baseDn, dsId, replServers, AssuredType.NOT_ASSURED,
      2, 1, 0, null);
    LDAPReplicationDomain replicationDomain =
      MultimasterReplication.createNewDomain(domainConf);
    replicationDomain.start();
    return replicationDomain;
  }
}