mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.55.2009 d19acb303c4ff90e48fd98ce2d7ba739ca9ea2db
Fix for Issue 4300 : stop replication server cause OutOfMemoryError

This problem happens in the following conditions :
- use Directory Servers that do not have Replication Servers in the same JVM
- use only 2 Replication Servers
- apply a heavy load of updates on one Directory Server
- stop the first Replication Server
- wait some time long enough to perform millions of change
- Restart the First Replication Server that will therefore have millions of
change to retrieve from the second
- quickly stop the second Replication Server (before it has time to replicate
the missing changes to the first RS)

In such case, The DS will connect to the first RS, see that it missing lots of change and will attempt to re-generate them from the historical information
in the database. Unfortunately this process needs to fetch all the changes
in memory because it needs to send them to the RS in the order of the
ChangeNumbers and therefore currently sort them in memory before sending them.

This change fixes the problem by searching for changes by interval. This avoid the memory
problem because in this case, there is only the need to sort a limited number of changes and
this can fit in memory.

However this fix is not enough because this whole process is done in the replication Listener thread and this thread is also responsible for managing the replication protocol window.
Unfortunately while this thread is busy sending a lot of changes to the RS it is not able to also do the job of managing the window and this can therefore fall into a deadlock.

So a second level of changes is necessary to move the code in a separated new thread that is
created only when necessary.

This lead to the last problem that I met : the creation of this new thread caused some concurrency
problems that I had to fix by introducing some synchronization code between this new thread, the listener thread and the worker thread.
7 files modified
873 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 372 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java 52 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java 12 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 72 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 12 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java 18 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java 335 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -114,6 +114,7 @@
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.service.ReplicationMonitor;
import org.opends.server.tasks.TaskUtils;
@@ -174,9 +175,62 @@
 */
public class LDAPReplicationDomain extends ReplicationDomain
       implements ConfigurationChangeListener<ReplicationDomainCfg>,
                  AlertGenerator, InternalSearchListener
                  AlertGenerator
{
  /**
   * This class is used in the session establishment phase
   * when no Replication Server with all the local changes has been found
   * and we therefore need to recover them.
   * A search is then performed on the database using this
   * internalSearchListener.
   */
  private class ScanSearchListener implements InternalSearchListener
  {
    private ChangeNumber startingChangeNumber = null;
    private ChangeNumber endChangeNumber = null;
    public ScanSearchListener(
        ChangeNumber startingChangeNumber,
        ChangeNumber endChangeNumber)
    {
      this.startingChangeNumber = startingChangeNumber;
      this.endChangeNumber = endChangeNumber;
    }
    @Override
    public void handleInternalSearchEntry(
        InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
        throws DirectoryException
    {
      // Build the list of Operations that happened on this entry
      // after startingChangeNumber and before endChangeNumber and
      // add them to the replayOperations list
      Iterable<FakeOperation> updates =
        Historical.generateFakeOperations(searchEntry);
      for (FakeOperation op : updates)
      {
        ChangeNumber cn = op.getChangeNumber();
        if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber)))
        {
          synchronized (replayOperations)
          {
            replayOperations.put(cn, op);
          }
        }
      }
    }
    @Override
    public void handleInternalSearchReference(
        InternalSearchOperation searchOperation,
        SearchResultReference searchReference) throws DirectoryException
    {
       // Nothing to do.
    }
  }
  /**
   * The fully-qualified name of this class.
   */
  private static final String CLASS_NAME =
@@ -398,6 +452,80 @@
  }
  /**
   * The thread that is responsible to update the RS to which this domain is
   * connected in case it is late and there is no RS which is up to date.
   */
  private class RSUpdater extends DirectoryThread
  {
    private ChangeNumber startChangeNumber;
    protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
    {
      super("Replication Server Updater for server id " +
            serverId + " and domain " + baseDn.toString());
      this.startChangeNumber = replServerMaxChangeNumber;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void run()
    {
      // Replication server is missing some of our changes: let's
      // send them to him.
      Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
      logError(message);
      /*
       * Get all the changes that have not been seen by this
       * replication server and publish them.
       */
      try
      {
        if (buildAndPublishMissingChanges(startChangeNumber, broker))
        {
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
          synchronized(replayOperations)
          {
            replayOperations.clear();
          }
        }
        else
        {
          /*
           * An error happened trying to search for the updates
           * This server will start accepting again new updates but
           * some inconsistencies will stay between servers.
           * Log an error for the repair tool
           * that will need to re-synchronize the servers.
           */
          message = ERR_CANNOT_RECOVER_CHANGES.get(
              baseDn.toNormalizedString());
          logError(message);
        }
      } catch (Exception e)
      {
        /*
         * An error happened trying to search for the updates
         * This server will start accepting again new updates but
         * some inconsistencies will stay between servers.
         * Log an error for the repair tool
         * that will need to re-synchronize the servers.
         */
        message = ERR_CANNOT_RECOVER_CHANGES.get(
            baseDn.toNormalizedString());
        logError(message);
      }
      finally
      {
        broker.setRecoveryRequired(false);
      }
    }
  }
  /**
   * Creates a new ReplicationDomain using configuration from configEntry.
   *
   * @param configuration    The configuration of this ReplicationDomain.
@@ -490,9 +618,6 @@
      saveGenerationId(generationId);
    }
    startPublishService(replicationServers, window, heartbeatInterval,
        configuration.getChangetimeHeartbeatInterval());
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on this replication domain.
@@ -505,6 +630,9 @@
    pendingChanges =
      new PendingChanges(generator, this);
    startPublishService(replicationServers, window, heartbeatInterval,
        configuration.getChangetimeHeartbeatInterval());
    remotePendingChanges = new RemotePendingChanges(getServerState());
    // listen for changes on the configuration
@@ -4356,74 +4484,9 @@
        if ((ourMaxChangeNumber != null) &&
            (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
        {
          // Replication server is missing some of our changes: let's
          // send them to him.
          Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
          logError(message);
          /*
           * Get all the changes that have not been seen by this
           * replication server and populate the replayOperations
           * list.
           */
          InternalSearchOperation op = searchForChangedEntries(
              baseDn, replServerMaxChangeNumber, this);
          if (op.getResultCode() != ResultCode.SUCCESS)
          {
            /*
             * An error happened trying to search for the updates
             * This server will start accepting again new updates but
             * some inconsistencies will stay between servers.
             * Log an error for the repair tool
             * that will need to re-synchronize the servers.
             */
            message = ERR_CANNOT_RECOVER_CHANGES.get(
                baseDn.toNormalizedString());
            logError(message);
          } else
          {
            for (FakeOperation replayOp :
              replayOperations.tailMap(replServerMaxChangeNumber).values())
            {
              ChangeNumber cn = replayOp.getChangeNumber();
              /*
               * Because the entry returned by the search operation
               * can contain old historical information, it is
               * possible that some of the FakeOperation are
               * actually older than the last ChangeNumber known by
               * the Replication Server.
               * In such case don't send the operation.
               */
              if (!cn.newer(replServerMaxChangeNumber))
              {
                continue;
              }
              /*
               * Check if the DeleteOperation has been abandoned before
               * being processed. This is necessary because the replayOperation
               *
               */
              if (replayOp instanceof FakeDelOperation)
              {
                FakeDelOperation delOp = (FakeDelOperation) replayOp;
                if (findEntryDN(delOp.getUUID()) != null)
                {
                  continue;
                }
              }
              message =
                DEBUG_SENDING_CHANGE.get(
                    replayOp.getChangeNumber().toString());
              logError(message);
              session.publish(replayOp.generateMessage());
            }
            message = DEBUG_CHANGES_SENT.get();
            logError(message);
          }
          replayOperations.clear();
          pendingChanges.setRecovering(true);
          broker.setRecoveryRequired(true);
          new RSUpdater(replServerMaxChangeNumber).start();
        }
      }
    } catch (Exception e)
@@ -4437,19 +4500,124 @@
  }
  /**
   * Build the list of changes that have been processed by this server
   * after the ChangeNumber given as a parameter and publish them
   * using the given session.
   *
   * @param startingChangeNumber  The ChangeNumber whe we need to start the
   *                              search
   * @param session               The session to use to publish the changes
   *
   * @return                      A boolean indicating he success of the
   *                              operation.
   * @throws Exception            if an Exception happens during the search.
   */
  public boolean buildAndPublishMissingChanges(
      ChangeNumber startingChangeNumber,
      ReplicationBroker session)
      throws Exception
  {
    // Trim the changes in replayOperations that are older than
    // the startingChangeNumber.
    synchronized (replayOperations)
    {
      Iterator<ChangeNumber> it = replayOperations.keySet().iterator();
      while (it.hasNext())
      {
        if (it.next().olderOrEqual(startingChangeNumber))
        {
          it.remove();
        }
        else
        {
          break;
        }
      }
    }
    ChangeNumber lastRetrievedChange = null;
    long missingChangesDelta;
    InternalSearchOperation op;
    ChangeNumber currentStartChangeNumber = startingChangeNumber;
    do
    {
      lastRetrievedChange = null;
      // We can't do the search in one go because we need to
      // store the results so that we are sure we send the operations
      // in order and because the list might be large
      // So we search by interval of 10 seconds
      // and store the results in the replayOperations list
      // so that they are sorted before sending them.
      missingChangesDelta = currentStartChangeNumber.getTime() + 10000;
      ChangeNumber endChangeNumber =
        new ChangeNumber(
            missingChangesDelta, 0xffffffff, serverId);
      ScanSearchListener listener =
        new ScanSearchListener(currentStartChangeNumber, endChangeNumber);
      op = searchForChangedEntries(
          baseDn, currentStartChangeNumber, endChangeNumber, listener);
      // Publish and remove all the changes from the replayOperations list
      // that are older than the endChangeNumber.
      LinkedList<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
      synchronized (replayOperations)
      {
        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
        while (itOp.hasNext())
        {
          FakeOperation fakeOp = itOp.next();
          if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber))
              && state.cover(fakeOp.getChangeNumber()))
          {
            lastRetrievedChange = fakeOp.getChangeNumber();
            opsToSend.add(fakeOp);
            itOp.remove();
          }
          else
          {
            break;
          }
        }
      }
      for (FakeOperation opToSend : opsToSend)
      {
          session.publishRecovery(opToSend.generateMessage());
      }
      opsToSend.clear();
      if (lastRetrievedChange != null)
      {
        currentStartChangeNumber = lastRetrievedChange;
      }
      else
      {
        currentStartChangeNumber = endChangeNumber;
      }
    } while (pendingChanges.RecoveryUntil(lastRetrievedChange) &&
             (op.getResultCode().equals(ResultCode.SUCCESS)));
    return op.getResultCode().equals(ResultCode.SUCCESS);
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute. The only changes that will
   * be send will be the one generated on the serverId provided in
   * fromChangeNumber.
   * @param baseDn the base DN
   * @param fromChangeNumber The change number from which we want the changes
   * @param resultListener that will process the entries returned.
   * @param fromChangeNumber The ChangeNumber from which we want the changes
   * @param lastChangeNumber The max ChangeNumber that the search should return
   * @param resultListener   The listener that will process the entries returned
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    ChangeNumber lastChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  {
@@ -4457,8 +4625,16 @@
      InternalClientConnection.getRootConnection();
    Integer serverId = fromChangeNumber.getServerId();
    String maxValueForId = "ffffffffffffffff" +
      String.format("%04x", serverId) + "ffffffff";
    String maxValueForId;
    if (lastChangeNumber == null)
    {
      maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId)
                      + "ffffffff";
    }
    else
    {
      maxValueForId = lastChangeNumber.toString();
    }
    LDAPFilter filter = LDAPFilter.decode(
       "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
@@ -4479,36 +4655,24 @@
  }
  /**
   * {@inheritDoc}
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute. The only changes that will
   * be send will be the one generated on the serverId provided in
   * fromChangeNumber.
   * @param baseDn the base DN
   * @param fromChangeNumber The change number from which we want the changes
   * @param resultListener that will process the entries returned.
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public void handleInternalSearchEntry(
    InternalSearchOperation searchOperation,
    SearchResultEntry searchEntry)
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  {
    /*
     * This call back is called at session establishment phase
     * for each entry that has been changed by this server and the changes
     * have not been sent to any Replication Server.
     * The role of this method is to build equivalent operation from
     * the historical information and add them in the replayOperations
     * table.
     */
    Iterable<FakeOperation> updates =
      Historical.generateFakeOperations(searchEntry);
    for (FakeOperation op : updates)
    {
      replayOperations.put(op.getChangeNumber(), op);
    }
  }
  /**
   * {@inheritDoc}
   */
  public void handleInternalSearchReference(
    InternalSearchOperation searchOperation,
    SearchResultReference searchReference)
  {
    // TODO to be implemented
    return searchForChangedEntries(
        baseDn, fromChangeNumber, null, resultListener);
  }
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -212,7 +212,14 @@
      {
        numSentUpdates++;
        LDAPUpdateMsg updateMsg = firstChange.getMsg();
        domain.publish(updateMsg);
        if (!recoveringOldChanges)
        {
          domain.publish(updateMsg);
        }
        else
        {
          domain.getServerState().update(updateMsg.getChangeNumber());
        }
      }
      pendingChanges.remove(firstChangeNumber);
@@ -248,4 +255,47 @@
    _commit(changeNumber, msg);
    return _pushCommittedChanges();
  }
  private boolean recoveringOldChanges = false;
  /**
   * Set the PendingChangesList structure in a mode where it is
   * waiting for the RS to receive all the previous changes to
   * be sent before starting to process the changes normally.
   * In this mode, The Domain does not publish the changes from
   * the pendingChanges because there are older changes that
   * need to be published before.
   *
   * @param b The recovering status that must be set.
   */
  public void setRecovering(boolean b)
  {
    recoveringOldChanges = b;
  }
  /**
   * Allows to update the recovery situation by comparing the ChangeNumber of
   * the last change that was sent to the ReplicationServer with the
   * ChangeNumber of the last operation that was taken out of the
   * PendingChanges list.
   * If he two match then the recovery is completed and normal procedure can
   * restart. Otherwise the RSUpdate thread must continue to look for
   * older changes and no changes can be committed from the pendingChanges list.
   *
   * @param recovered  The ChangeNumber of the last change that was published
   *                   to the ReplicationServer.
   *
   * @return           A boolean indicating if the recovery is completed (false)
   *                   or must continue (true).
   */
  public synchronized boolean RecoveryUntil(ChangeNumber recovered)
  {
    ChangeNumber lastLocalChange = domain.getLastLocalChange();
    if ((recovered != null) && (recovered.newerOrEquals(lastLocalChange)))
    {
      recoveringOldChanges = false;
    }
    return recoveringOldChanges;
  }
}
opends/src/server/org/opends/server/replication/plugin/PersistentServerState.java
@@ -123,6 +123,18 @@
  }
  /**
   * Checks that the ChangeNumber given as a parameter is in this ServerState.
   *
   * @param   covered The ChangeNumber that should be checked.
   * @return  A boolean indicating if this ServerState contains the ChangeNumber
   *          given in parameter.
   */
  public boolean cover(ChangeNumber covered)
  {
    return state.cover(covered);
  }
  /**
   * Update the Server State with a ChangeNumber.
   * All operations with smaller CSN and the same serverID must be committed
   * before calling this method.
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -188,6 +188,7 @@
  private long generationID;
  private int updateDoneCount = 0;
  private boolean connectRequiresRecovery = false;
  /**
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
@@ -694,6 +695,21 @@
                rsServerId = serverInfo.getServerId();
                rsServerUrl = bestServer;
                receiveTopo(topologyMsg);
                // Log a message to let the administrator know that the failure
                // was resolved.
                // Wakeup all the thread that were waiting on the window
                // on the previous connection.
                connectionError = false;
                if (sendWindow != null)
                {
                  sendWindow.release(Integer.MAX_VALUE);
                }
                sendWindow = new Semaphore(maxSendWindow);
                rcvWindow = maxRcvWindow;
                connected = true;
                // May have created a broker with null replication domain for
                // unit test purpose.
                if (domain != null)
@@ -703,8 +719,7 @@
                      serverInfo.getGenerationId(),
                      session);
                }
                receiveTopo(topologyMsg);
                connected = true;
                if (getRsGroupId() != groupId)
                {
                 // Connected to replication server with wrong group id:
@@ -766,17 +781,6 @@
      if (connected)
      {
        // Log a message to let the administrator know that the failure was
        // resolved.
        // Wakeup all the thread that were waiting on the window
        // on the previous connection.
        connectionError = false;
        if (sendWindow != null)
        {
          sendWindow.release(Integer.MAX_VALUE);
        }
        sendWindow = new Semaphore(maxSendWindow);
        rcvWindow = maxRcvWindow;
        connectPhaseLock.notify();
        if ((serverInfo.getGenerationId() == this.getGenerationID()) ||
@@ -1786,6 +1790,25 @@
   */
  public void publish(ReplicationMsg msg)
  {
    _publish(msg, false);
  }
  /**
   * Publish a recovery message to the other servers.
   * @param msg the message to publish
   */
  public void publishRecovery(ReplicationMsg msg)
  {
    _publish(msg, true);
  }
  /**
   * Publish a message to the other servers.
   * @param msg the message to publish
   * @param recoveryMsg the message is a recovery Message
   */
  void _publish(ReplicationMsg msg, boolean recoveryMsg)
  {
    boolean done = false;
    while (!done && !shutdown)
@@ -1825,6 +1848,15 @@
          currentWindowSemaphore = sendWindow;
        }
        // If the Replication domain has decided that there is a need to
        // recover some changes then it is not allowed to send this
        // change but it will be the responsibility of the recovery thread to
        // do it.
        if (!recoveryMsg & connectRequiresRecovery)
        {
          return;
        }
        if (msg instanceof UpdateMsg)
        {
          // Acquiring the window credit must be done outside of the
@@ -2548,4 +2580,18 @@
      ctHeartbeatPublisherThread = null;
    }
  }
  /**
   * Set the connectRequiresRecovery to the provided value.
   * This flag is used to indicate if a recovery of Update is necessary
   * after a reconnection to a RS.
   * It is the responsibility of the ReplicationDomain to set it during the
   * sessionInitiated phase.
   *
   * @param b the new value of the connectRequiresRecovery.
   */
  public void setRecoveryRequired(boolean b)
  {
    connectRequiresRecovery = b;
  }
}
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -2909,4 +2909,16 @@
  {
    return eClIncludes;
  }
  /**
   * Returns the ChangeNUmber of the last Change that was fully processed
   * by this ReplicationDomain.
   *
   * @return The ChangeNUmber of the last Change that was fully processed
   *         by this ReplicationDomain.
   */
  public ChangeNumber getLastLocalChange()
  {
    return state.getMaxChangeNumber(serverID);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1231,17 +1231,31 @@
   * Deletes the provided entry from the Directory Server using an
   * internal operation.
   *
   * @param  entry  The entry to be added.
   * @param  entry  The entry to be deleted.
   *
   * @throws  Exception  If an unexpected problem occurs.
   */
  public static void deleteEntry(Entry entry)
         throws Exception
  {
    deleteEntry(entry.getDN());
  }
  /**
   * Deletes the provided entry from the Directory Server using an
   * internal operation.
   *
   * @param  dn  The dn of entry to be deleted
   *
   * @throws  Exception  If an unexpected problem occurs.
   */
  public static void deleteEntry(DN dn)
         throws Exception
  {
    InternalClientConnection conn =
         InternalClientConnection.getRootConnection();
    DeleteOperation deleteOperation = conn.processDelete(entry.getDN());
    DeleteOperation deleteOperation = conn.processDelete(dn);
    assertEquals(deleteOperation.getResultCode(), ResultCode.SUCCESS);
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -26,127 +26,71 @@
 */
package org.opends.server.replication.plugin;
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchResultEntry;
import org.testng.annotations.BeforeClass;
import org.opends.server.util.TimeThread;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
/**
 * Test the usage of the historical data of the replication.
 */
public class HistoricalCsnOrderingTest
extends ReplicationTestCase
       extends ReplicationTestCase
{
  /**
   * A "person" entry
   */
  protected Entry personEntry;
  private int replServerPort;
  final int serverId = 123;
  /**
   * Set up the environment for performing the tests in this Class.
   *
   * @throws Exception
   *           If the environment could not be set up.
   */
  @BeforeClass
  @Override
  public void setUp() throws Exception
  public class TestBroker extends ReplicationBroker
  {
    super.setUp();
    LinkedList<ReplicationMsg> list = null;
    // Create necessary backend top level entry
    String topEntry = "dn: ou=People," + TEST_ROOT_DN_STRING + "\n"
        + "objectClass: top\n"
        + "objectClass: organizationalUnit\n"
        + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
    addEntry(TestCaseUtils.entryFromLdifString(topEntry));
    public TestBroker(LinkedList<ReplicationMsg> list)
    {
      super(null, null, null, 0, 0, (long) 0, (long) 0, null, (byte) 0, (long) 0);
      this.list = list;
    }
    // find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    replServerPort = socket.getLocalPort();
    socket.close();
    public void publishRecovery(ReplicationMsg msg)
    {
      list.add(msg);
    }
    // replication server
    String replServerLdif =
      "dn: cn=Replication Server, " + SYNCHRO_PLUGIN_DN + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-replication-server\n"
      + "cn: Replication Server\n"
      + "ds-cfg-replication-port: " + replServerPort + "\n"
      + "ds-cfg-replication-db-directory: HistoricalCsnOrderingTestDb\n"
      + "ds-cfg-replication-server-id: 101\n";
    replServerEntry = TestCaseUtils.entryFromLdifString(replServerLdif);
    // suffix synchronized
    String testName = "historicalCsnOrderingTest";
    String synchroServerLdif =
      "dn: cn=" + testName + ", cn=domains, " + SYNCHRO_PLUGIN_DN + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-replication-domain\n"
      + "cn: " + testName + "\n"
      + "ds-cfg-base-dn: ou=People," + TEST_ROOT_DN_STRING + "\n"
      + "ds-cfg-replication-server: localhost:" + replServerPort + "\n"
      + "ds-cfg-server-id: 1\n" + "ds-cfg-receive-status: true\n";
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    String personLdif = "dn: uid=user.1,ou=People," + TEST_ROOT_DN_STRING + "\n"
      + "objectClass: top\n" + "objectClass: person\n"
      + "objectClass: organizationalPerson\n"
      + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
      + "homePhone: 951-245-7634\n"
      + "description: This is the description for Aaccf Amar.\n" + "st: NC\n"
      + "mobile: 027-085-0537\n"
      + "postalAddress: Aaccf Amar$17984 Thirteenth Street"
      + "$Rockford, NC  85762\n" + "mail: user.1@example.com\n"
      + "cn: Aaccf Amar\n" + "l: Rockford\n" + "pager: 508-763-4246\n"
      + "street: 17984 Thirteenth Street\n"
      + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
      + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
      + "userPassword: password\n" + "initials: AA\n";
    personEntry = TestCaseUtils.entryFromLdifString(personLdif);
    configureReplication();
  }
  /**
   * Add an entry in the database
   *
   */
  private void addEntry(Entry entry) throws Exception
  {
    AddOperationBasis addOp = new AddOperationBasis(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection
        .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
        entry.getUserAttributes(), entry.getOperationalAttributes());
    addOp.setInternalOperation(true);
    addOp.run();
    assertNotNull(getEntry(entry.getDN(), 1000, true));
  }
  /**
@@ -182,10 +126,19 @@
   * informations.
   */
  @Test()
  public void changesCmpTest()
  public void buildAndPublishMissingChangesOneEntryTest()
  throws Exception
  {
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    final int serverId = 123;
    final DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
    TestCaseUtils.initializeTestBackend(true);
    ReplicationServer rs = createReplicationServer();
    // Create Replication Server and Domain
    LDAPReplicationDomain rd1 = createReplicationDomain(serverId);
    try
    {
      long startTime = TimeThread.getTime();
    final DN dn1 = DN.decode("cn=test1," + baseDn.toString());
    final AttributeType histType =
      DirectoryServer.getAttributeType(Historical.HISTORICALATTRIBUTENAME);
@@ -246,39 +199,183 @@
          "Second historical value:" + av.getValue().toString()));
    }
    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
    TestBroker session = new TestBroker(opList);
    boolean result =
      rd1.buildAndPublishMissingChanges(
          new ChangeNumber(startTime, 0, serverId),
          session);
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 3, "buildAndPublishMissingChanges should return 3 operations");
    assertTrue(opList.getFirst().getClass().equals(AddMsg.class));
    // Build a change number from the first modification
    String hv[] = histValue.split(":");
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        hv[1]));
    ChangeNumber fromChangeNumber =
      new ChangeNumber(hv[1]);
    logError(Message.raw(Category.SYNC, Severity.INFORMATION, hv[1]));
    ChangeNumber fromChangeNumber = new ChangeNumber(hv[1]);
    // Retrieves the entries that have changed since the first modification
    InternalSearchOperation op =
      LDAPReplicationDomain.searchForChangedEntries(
          baseDn, fromChangeNumber, null);
    opList = new LinkedList<ReplicationMsg>();
    session = new TestBroker(opList);
    // The expected result is one entry .. the one previously modified
    assertEquals(op.getResultCode(), ResultCode.SUCCESS);
    assertEquals(op.getSearchEntries().size(), 1);
    // From the historical of this entry, rebuild operations
    // Since there have been 2 modifications and 1 add, there should be 3
    // operations rebuild from this state.
    int updatesCnt = 0;
    for (SearchResultEntry searchEntry : op.getSearchEntries())
    {
      logError(Message.raw(Category.SYNC, Severity.INFORMATION,
          searchEntry.toString()));
      Iterable<FakeOperation> updates =
        Historical.generateFakeOperations(searchEntry);
      for (FakeOperation fop : updates)
      {
        logError(Message.raw(Category.SYNC, Severity.INFORMATION,
            fop.generateMessage().toString()));
        updatesCnt++;
      }
    result =
      rd1.buildAndPublishMissingChanges(
          fromChangeNumber,
          session);
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 1, "buildAndPublishMissingChanges should return 1 operation");
    assertTrue(opList.getFirst().getClass().equals(ModifyMsg.class));
    }
    assertTrue(updatesCnt == 3);
    finally
    {
      MultimasterReplication.deleteDomain(baseDn);
      rs.remove();
    }
  }
  /**
   * Test that we can retrieve the entries that were missed by
   * a replication server and can  re-build operations from the historical
   * informations.
   */
  @Test()
  public void buildAndPublishMissingChangesSeveralEntriesTest()
  throws Exception
  {
    final DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
    TestCaseUtils.initializeTestBackend(true);
    ReplicationServer rs = createReplicationServer();
    // Create Replication Server and Domain
    LDAPReplicationDomain rd1 = createReplicationDomain(serverId);
    long startTime = TimeThread.getTime();
    try
    {
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
    "Starting replication test : changesCmpTest"));
    // Add 3 entries.
    String dnTest1 = "cn=test1," + baseDn.toString();
    String dnTest2 = "cn=test2," + baseDn.toString();
    String dnTest3 = "cn=test3," + baseDn.toString();
    TestCaseUtils.addEntry(
        "dn: " + dnTest3,
        "displayname: Test1",
        "objectClass: top",
        "objectClass: person",
        "objectClass: organizationalPerson",
        "objectClass: inetOrgPerson",
        "cn: test1",
        "sn: test"
    );
    TestCaseUtils.addEntry(
        "dn: " + dnTest1,
        "displayname: Test1",
        "objectClass: top",
        "objectClass: person",
        "objectClass: organizationalPerson",
        "objectClass: inetOrgPerson",
        "cn: test1",
        "sn: test"
    );
    TestCaseUtils.deleteEntry(DN.decode(dnTest3));
    TestCaseUtils.addEntry(
        "dn: " + dnTest2,
        "displayname: Test1",
        "objectClass: top",
        "objectClass: person",
        "objectClass: organizationalPerson",
        "objectClass: inetOrgPerson",
        "cn: test1",
        "sn: test"
    );
    // Perform modifications on the 2 entries
    int resultCode = TestCaseUtils.applyModifications(false,
        "dn: cn=test2," + baseDn.toString(),
        "changetype: modify",
        "add: description",
    "description: foo");
    resultCode = TestCaseUtils.applyModifications(false,
        "dn: cn=test1," + baseDn.toString(),
        "changetype: modify",
        "add: description",
    "description: foo");
    assertEquals(resultCode, 0);
    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
    TestBroker session = new TestBroker(opList);
    // Call the buildAndPublishMissingChanges and check that this method
    // correctly generates the 4 operations in the correct order.
    boolean result =
      rd1.buildAndPublishMissingChanges(
          new ChangeNumber(startTime, 0, serverId),
          session);
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 5, "buildAndPublishMissingChanges should return 5 operations");
    ReplicationMsg msg = opList.removeFirst();
    assertTrue(msg.getClass().equals(AddMsg.class));
    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest1);
    msg = opList.removeFirst();
    assertTrue(msg.getClass().equals(DeleteMsg.class));
    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest3);
    msg = opList.removeFirst();
    assertTrue(msg.getClass().equals(AddMsg.class));
    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest2);
    msg = opList.removeFirst();
    assertTrue(msg.getClass().equals(ModifyMsg.class));
    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest2);
    msg = opList.removeFirst();
    assertTrue(msg.getClass().equals(ModifyMsg.class));
    assertEquals(((LDAPUpdateMsg) msg).getDn(), dnTest1);
    }
    finally
    {
      MultimasterReplication.deleteDomain(baseDn);
      rs.remove();
    }
  }
  SortedSet<String> replServers = new TreeSet<String>();
  private ReplicationServer createReplicationServer() throws ConfigException
  {
    int rsPort;
    try
    {
      ServerSocket socket1 = TestCaseUtils.bindFreePort();
      rsPort = socket1.getLocalPort();
      socket1.close();
      replServers.add("localhost:" + rsPort);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(rsPort, "HistoricalCsnOrdering",
            0, 1, 0, 100, replServers, 1, 1000, 5000);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      replicationServer.clearDb();
      return replicationServer;
    }
    catch (IOException e)
    {
      fail("Unable to determinate some free ports " +
          stackTraceToSingleLineString(e));
      return null;
    }
  }
  private LDAPReplicationDomain createReplicationDomain(int dsId)
          throws DirectoryException, ConfigException
  {
    DN baseDn = DN.decode(TEST_ROOT_DN_STRING);
    DomainFakeCfg domainConf =
      new DomainFakeCfg(baseDn, dsId, replServers, AssuredType.NOT_ASSURED,
      2, 1, 0, null);
    LDAPReplicationDomain replicationDomain =
      MultimasterReplication.createNewDomain(domainConf);
    replicationDomain.start();
    return replicationDomain;
  }
}