mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
02.58.2007 b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -28,13 +28,13 @@
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import org.opends.server.config.ConfigException;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -42,13 +42,14 @@
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Message;
import org.opends.messages.Category;
import org.opends.messages.Severity;
import org.opends.server.backends.task.TaskState;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperationBasis;
import org.opends.server.core.DirectoryServer;
@@ -58,7 +59,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.schema.IntegerSyntax;
import org.opends.server.types.Attribute;
@@ -136,13 +140,50 @@
  }
  /**
   * Open a replicationServer session to the local ReplicationServer.
   * Retrieves the domain associated to the baseDn, and the value of the generationId
   * of this domain. If the domain does not exist, returns the default hard-coded\
   * value of the generationId corresponding to 'no entry'.
   *
   * @param baseDn The baseDn for which we want the generationId
   * @return The value of the generationId.
   */
  static protected long getGenerationId(DN baseDn)
  {
    // This is the value of the generationId computed by the server when the
    // suffix is empty.
    long genId = 3276850;
    try
    {
      ReplicationDomain replDomain = ReplicationDomain.retrievesReplicationDomain(baseDn);
      genId = replDomain.getGenerationId();
    }
    catch(Exception e) {}
    return genId;
  }
  /**
   * Open a replicationServer session to the local ReplicationServer.
   * The generation is read from the replicationDomain object. If it
   * does not exist, take the 'empty backend' generationID.
   */
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, boolean emptyOldChanges)
          throws Exception
          throws Exception, SocketException
  {
    return openReplicationSession(baseDn, serverId, window_size,
        port, timeout, emptyOldChanges, getGenerationId(baseDn));
  }
  /**
   * Open a replicationServer session to the local ReplicationServer
   * providing the generationId.
   */
  protected ReplicationBroker openReplicationSession(
        final DN baseDn, short serverId, int window_size,
        int port, int timeout, boolean emptyOldChanges,
        long generationId)
  throws Exception, SocketException
  {
    ServerState state;
    if (emptyOldChanges)
@@ -151,8 +192,8 @@
       state = new ServerState();
    ReplicationBroker broker = new ReplicationBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0,
        getReplSessionSecurity());
        state, baseDn, serverId, 0, 0, 0, 0,
        window_size, 0, generationId, getReplSessionSecurity());
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
@@ -170,7 +211,15 @@
      {
        while (true)
        {
          broker.receive();
          ReplicationMessage rMsg = broker.receive();
          if (rMsg instanceof ErrorMessage)
          {
            ErrorMessage eMsg = (ErrorMessage)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
@@ -184,16 +233,30 @@
  }
  /**
   * Open a replicationServer session to the local ReplicationServer
   * with a default value generationId.
   *
   */
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, ServerState state)
    throws Exception, SocketException
  {
    return openReplicationSession(baseDn, serverId, window_size,
        port, timeout, state, getGenerationId(baseDn));
  }
  /**
   * Open a new session to the ReplicationServer
   * starting with a given ServerState.
   */
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, ServerState state)
          throws Exception
      int port, int timeout, ServerState state, long generationId)
          throws Exception, SocketException
  {
    ReplicationBroker broker = new ReplicationBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0,
        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0, generationId,
        getReplSessionSecurity());
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
@@ -213,7 +276,18 @@
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, int maxSendQueue, int maxRcvQueue,
      boolean emptyOldChanges)
          throws Exception
      throws Exception, SocketException
  {
    return openReplicationSession(baseDn, serverId, window_size,
        port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges,
        getGenerationId(baseDn));
  }
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, short serverId, int window_size,
        int port, int timeout, int maxSendQueue, int maxRcvQueue,
        boolean emptyOldChanges, long generationId)
            throws Exception, SocketException
  {
    ServerState state;
    if (emptyOldChanges)
@@ -223,7 +297,7 @@
    ReplicationBroker broker = new ReplicationBroker(
        state, baseDn, serverId, maxRcvQueue, 0,
        maxSendQueue, 0, window_size, 0,
        maxSendQueue, 0, window_size, 0, generationId,
        getReplSessionSecurity());
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
@@ -240,7 +314,15 @@
      {
        while (true)
        {
          broker.receive();
          ReplicationMessage rMsg = broker.receive();
          if (rMsg instanceof ErrorMessage)
          {
            ErrorMessage eMsg = (ErrorMessage)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
@@ -264,13 +346,22 @@
      while (true)
      {
        DN dn = configEntryList.removeLast();
             logError(Message.raw(Category.SYNC, Severity.NOTICE,
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
                 "cleaning config entry " + dn));
        op = new DeleteOperationBasis(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
        op.run();
        if ((op.getResultCode() != ResultCode.SUCCESS) &&
            (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
        {
          logError(Message.raw(Category.SYNC, Severity.NOTICE,
                   "ReplicationTestCase/Cleaning config entries" +
                   "DEL " + dn +
                   " failed " + op.getResultCode().getResultCodeName()));
        }
      }
    }
    catch (NoSuchElementException e) {
@@ -293,14 +384,23 @@
      while (true)
      {
        DN dn = entryList.removeLast();
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
            "cleaning entry " + dn));
        op = new DeleteOperationBasis(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
        op = new DeleteOperationBasis(connection,
               InternalClientConnection.nextOperationID(),
               InternalClientConnection.nextMessageID(),
               null,
               dn);
        op.run();
        if ((op.getResultCode() != ResultCode.SUCCESS) &&
            (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
        {
          logError(Message.raw(Category.SYNC, Severity.NOTICE,
                   "ReplicationTestCase/Cleaning entries" +
                   "DEL " + dn +
                   " failed " + op.getResultCode().getResultCodeName()));
        }
      }
    }
    catch (NoSuchElementException e) {