mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.28.2014 10fa44e632dec82a2fb6100f2cc383f42a2f3113
Code cleanup:
- hid ReplicationDomain internals
- moved test code out of production code
- removed duplicated test code


ReplicationDomain.java:
In ctor, delegate to other ctor.
In startPublishService(), removed ReplicationDomainCfg parameter and get the config from field instead.
Removed getConfig(), not used anymore.
Moved publish(byte[]) to ReplicationDomainTest.

LDAPReplicationDomain.java, AssuredReplicationServerTest.java, FakeStressReplicationDomain.java:
Consequence of the change to ReplicationDomain.startPublishService().


FakeReplicationDomain.java:
Expanded the code to support what FractionalReplicationTest.FakeReplicationDomain was doing.
Consequence of the change to ReplicationDomain.startPublishService().

FractionalReplicationTest.java:
Removed FakeReplicationDomain inner class, replaced by org.opends.server.replication.service.FakeReplicationDomain.
Removed newConfig(), now unused.

ReplicationDomainTest.java:
Moved ReplicationDomain.publish(byte[]) here.
Extracted method publishRepeatedly().
7 files modified
321 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 63 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java 114 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 19 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java 32 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java 8 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java 79 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -43,7 +43,7 @@
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.AlertGenerator;
@@ -76,7 +76,7 @@
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import org.opends.server.workflowelement.localbackend.*;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
@@ -511,7 +511,7 @@
    // register as an AlertGenerator
    DirectoryServer.registerAlertGenerator(this);
    startPublishService(configuration);
    startPublishService();
  }
  /**
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -70,7 +70,7 @@
 *   The startup phase of the ReplicationDomain subclass,
 *   should read the list of replication servers from the configuration,
 *   instantiate a {@link ServerState} then start the publish service
 *   by calling {@link #startPublishService(ReplicationDomainCfg)}.
 *   by calling {@link #startPublishService()}.
 *   At this point it can start calling the {@link #publish(UpdateMsg)}
 *   method if needed.
 * <p>
@@ -399,11 +399,7 @@
   */
  public ReplicationDomain(ReplicationDomainCfg config, long generationId)
  {
    this.config = config;
    this.assuredConfig = config;
    this.generationId = generationId;
    this.state = new ServerState();
    this.generator = new CSNGenerator(getServerId(), state);
    this(config, generationId, new ServerState());
  }
  /**
@@ -528,16 +524,6 @@
  }
  /**
   * Returns the current config of this ReplicationDomain.
   *
   * @return the config
   */
  protected ReplicationDomainCfg getConfig()
  {
    return config;
  }
  /**
   * Returns the base DN of this ReplicationDomain. All Replication Domain using
   * this baseDN will be connected through the Replication Service.
   *
@@ -2967,13 +2953,10 @@
   * has been called, the publish service can be used by calling the
   * {@link #publish(UpdateMsg)} method.
   *
   * @param config
   *          The configuration that should be used.
   * @throws ConfigException
   *           If the DirectoryServer configuration was incorrect.
   */
  public void startPublishService(ReplicationDomainCfg config)
      throws ConfigException
  public void startPublishService() throws ConfigException
  {
    synchronized (sessionLock)
    {
@@ -2994,7 +2977,7 @@
   * calling the {@link #processUpdate(UpdateMsg)}.
   * <p>
   * This method must be called once and must be called after the
   * {@link #startPublishService(ReplicationDomainCfg)}.
   * {@link #startPublishService()}.
   */
  public void startListenService()
  {
@@ -3104,7 +3087,7 @@
   * <p>
   * The Replication Service will restart from the point indicated by the
   * {@link ServerState} that was given as a parameter to the
   * {@link #startPublishService(ReplicationDomainCfg)} at startup time.
   * {@link #startPublishService()} at startup time.
   * <p>
   * If some data have changed in the repository during the period of time when
   * the Replication Service was disabled, this {@link ServerState} should
@@ -3459,42 +3442,6 @@
  }
  /**
   * Publish information to the Replication Service (not assured mode).
   *
   * @param msg  The byte array containing the information that should
   *             be sent to the remote entities.
   */
  void publish(byte[] msg)
  {
    UpdateMsg update;
    synchronized (this)
    {
      update = new UpdateMsg(generator.newCSN(), msg);
      /*
      If assured replication is configured, this will prepare blocking
      mechanism. If assured replication is disabled, this returns immediately
      */
      prepareWaitForAckIfAssuredEnabled(update);
      publish(update);
    }
    try
    {
      /*
      If assured replication is enabled, this will wait for the matching ack or
      time out. If assured replication is disabled, this returns immediately
      */
      waitForAckIfAssuredEnabled(update);
    } catch (TimeoutException ex)
    {
      // This exception may only be raised if assured replication is enabled
      logError(NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
          Long.toString(getAssuredTimeout()), update.toString()));
    }
  }
  /**
   * This method should return the generationID to use for this
   * ReplicationDomain.
   * This method can be called at any time after the ReplicationDomain
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -26,19 +26,13 @@
 */
package org.opends.server.replication.plugin;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
@@ -47,16 +41,15 @@
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.FakeReplicationDomain;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.types.*;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -104,7 +97,7 @@
  private static final String ENTRY_UUID3 =
    "33333333-3333-3333-3333-333333333333";
  /** Dn of the manipulated entry */
  private static String ENTRY_DN = "uid=1," + TEST_ROOT_DN_STRING;
  private static final String ENTRY_DN = "uid=1," + TEST_ROOT_DN_STRING;
  /**
   * Optional attribute not part of concerned attributes of the fractional
@@ -124,7 +117,7 @@
  /** Second test backend */
  private static final String TEST2_ROOT_DN_STRING = "dc=example,dc=com";
  private static final String TEST2_ORG_DN_STRING = "o=test2," + TEST2_ROOT_DN_STRING;
  private static String ENTRY_DN2 = "uid=1," + TEST2_ORG_DN_STRING;
  private static final String ENTRY_DN2 = "uid=1," + TEST2_ORG_DN_STRING;
  private void debugInfo(String s) {
    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
@@ -539,107 +532,6 @@
    replicationServer = new ReplicationServer(conf);
  }
  private static DomainFakeCfg newConfig(DN baseDN, int serverID,
      SortedSet<String> replicationServers, long heartbeatInterval)
  {
    DomainFakeCfg fakeCfg =
        new DomainFakeCfg(baseDN, serverID, replicationServers);
    fakeCfg.setHeartbeatInterval(heartbeatInterval);
    fakeCfg.setChangetimeHeartbeatInterval(500);
    return fakeCfg;
  }
  /**
   * This class is the minimum implementation of a Concrete ReplicationDomain
   * used to be able to connect to the RS with a known genid. Also to be able
   * to send updates
   */
  private class FakeReplicationDomain extends ReplicationDomain
  {
    /**
     * A blocking queue that is used to receive updates from the Replication
     * Service.
     */
    private BlockingQueue<UpdateMsg> queue =
        new LinkedBlockingQueue<UpdateMsg>();
    /** A string that will be exported should exportBackend be called. */
    private String exportString;
    /**
     * A StringBuilder that will be used to build a new String should the import
     * be called.
     */
    private StringBuilder importString;
    private int exportedEntryCount;
    public FakeReplicationDomain(DN baseDN, int serverID,
        SortedSet<String> replicationServers, long heartbeatInterval,
        long generationId) throws ConfigException
    {
      super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval),
          generationId);
      startPublishService(getConfig());
      startListenService();
    }
    public void initExport(String exportString, int exportedEntryCount)
    {
      this.exportString = exportString;
      this.exportedEntryCount = exportedEntryCount;
    }
    @Override
    public long countEntries() throws DirectoryException
    {
      return exportedEntryCount;
    }
    @Override
    protected void exportBackend(OutputStream output) throws DirectoryException
    {
      try
      {
        output.write(exportString.getBytes());
        output.flush();
        output.close();
      } catch (IOException e)
      {
        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
          ERR_BACKEND_EXPORT_ENTRY.get("", ""));
      }
    }
    @Override
    protected void importBackend(InputStream input) throws DirectoryException
    {
      byte[] buffer = new byte[1000];
      int ret;
      do
      {
        try
        {
          ret = input.read(buffer, 0, 1000);
        } catch (IOException e)
        {
          throw new DirectoryException(
            ResultCode.OPERATIONS_ERROR,
            ERR_BACKEND_EXPORT_ENTRY.get("", ""));
        }
        importString.append(new String(buffer, 0, ret));
      } while (ret >= 0);
    }
    @Override
    public boolean processUpdate(UpdateMsg updateMsg)
    {
      if (queue != null)
        queue.add(updateMsg);
      return true;
    }
  }
  private static final String REPLICATION_GENERATION_ID =
    "ds-sync-generation-id";
  private static final Task NO_INIT_TASK = null;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -133,9 +133,9 @@
  private static final int OTHER_GID_BIS = 3;
  /** Default generation id */
  private static long DEFAULT_GENID = EMPTY_DN_GENID;
  private static final long DEFAULT_GENID = EMPTY_DN_GENID;
  /** Other generation id */
  private static long OTHER_GENID = 500L;
  private static final long OTHER_GENID = 500L;
  /*
   * Definitions for the scenario of the fake DS
@@ -302,7 +302,7 @@
    FakeReplicationDomain fakeReplicationDomain =
        new FakeReplicationDomain(config, generationId, scenario, serverState);
    fakeReplicationDomain.startPublishService(config);
    fakeReplicationDomain.startPublishService();
    if (startListen)
      fakeReplicationDomain.startListenService();
@@ -445,18 +445,17 @@
   * According to the configured scenario, it will answer to updates with acks
   * as the scenario is requesting.
   */
  public class FakeReplicationDomain extends ReplicationDomain
  private class FakeReplicationDomain extends ReplicationDomain
  {
    /** The scenario this DS is expecting */
    private int scenario = -1;
    private final int scenario;
    private CSNGenerator gen;
    private final CSNGenerator gen;
    /** False if a received update had assured parameters not as expected */
    private boolean everyUpdatesAreOk = true;
    /** Number of received updates */
    private int nReceivedUpdates = 0;
    private int nWrongReceivedUpdates = 0;
    /**
@@ -473,7 +472,7 @@
     * behavior upon reception of updates)
     * @throws org.opends.server.config.ConfigException
     */
    public FakeReplicationDomain(ReplicationDomainCfg config,
    private FakeReplicationDomain(ReplicationDomainCfg config,
        long generationId, int scenario, ServerState serverState)
        throws ConfigException
    {
@@ -483,7 +482,7 @@
      gen = new CSNGenerator(config.getServerId(), 0L);
    }
    public boolean receivedUpdatesOk()
    private boolean receivedUpdatesOk()
    {
      return everyUpdatesAreOk;
    }
@@ -591,7 +590,7 @@
     * Sends a new update from this DS
     * @throws TimeoutException If timeout waiting for an assured ack
     */
    public void sendNewFakeUpdate() throws TimeoutException
    private void sendNewFakeUpdate() throws TimeoutException
    {
      // Create a new delete update message (the simplest to create)
      DeleteMsg delMsg = new DeleteMsg(getBaseDN(), gen.newCSN(), UUID.randomUUID().toString());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -58,20 +58,19 @@
  private String exportString;
  /**
   * A StringBuilder that will be used to build a build a new String should the
   * import be called.
   * A StringBuilder that will be used to build a new String should the import
   * be called.
   */
  private StringBuilder importString;
  private int exportedEntryCount;
  private FakeReplicationDomain(DN baseDN, int serverID,
      SortedSet<String> replicationServers, int window, long heartbeatInterval)
      throws ConfigException
      SortedSet<String> replicationServers, int window, long heartbeatInterval,
      long generationId) throws ConfigException
  {
    super(newConfig(baseDN, serverID, replicationServers, window,
        heartbeatInterval), 1);
    startPublishService(getConfig());
    super(newConfig(baseDN, serverID, replicationServers, window, heartbeatInterval), generationId);
    startPublishService();
    startListenService();
  }
@@ -86,24 +85,37 @@
  }
  public FakeReplicationDomain(DN baseDN, int serverID,
      SortedSet<String> replicationServers, long heartbeatInterval,
      long generationId) throws ConfigException
  {
    this(baseDN, serverID, replicationServers, 100, heartbeatInterval, generationId);
  }
  FakeReplicationDomain(DN baseDN, int serverID,
      SortedSet<String> replicationServers, int window, long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    this(baseDN, serverID, replicationServers, window, heartbeatInterval);
    this(baseDN, serverID, replicationServers, window, heartbeatInterval, 1);
    this.queue = queue;
  }
  public FakeReplicationDomain(DN baseDN, int serverID,
  FakeReplicationDomain(DN baseDN, int serverID,
      SortedSet<String> replicationServers, long heartbeatInterval,
      String exportString, StringBuilder importString, int exportedEntryCount)
      throws ConfigException
  {
    this(baseDN, serverID, replicationServers, 100, heartbeatInterval);
    this(baseDN, serverID, replicationServers, 100, heartbeatInterval, 1);
    this.exportString = exportString;
    this.importString = importString;
    this.exportedEntryCount = exportedEntryCount;
  }
  public void initExport(String exportString, int exportedEntryCount)
  {
    this.exportString = exportString;
    this.exportedEntryCount = exportedEntryCount;
  }
  @Override
  public long countEntries() throws DirectoryException
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -46,20 +46,20 @@
 * used to test the Generic Replication Service.
 */
@SuppressWarnings("javadoc")
public class FakeStressReplicationDomain extends ReplicationDomain
class FakeStressReplicationDomain extends ReplicationDomain
{
  /**
   * A blocking queue that is used to send the UpdateMsg received from the
   * Replication Service.
   */
  private BlockingQueue<UpdateMsg> queue;
  private final BlockingQueue<UpdateMsg> queue;
  public FakeStressReplicationDomain(DN baseDN, int serverID,
  FakeStressReplicationDomain(DN baseDN, int serverID,
      SortedSet<String> replicationServers, long heartbeatInterval,
      BlockingQueue<UpdateMsg> queue) throws ConfigException
  {
    super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval), 1);
    startPublishService(getConfig());
    startPublishService();
    startListenService();
    this.queue = queue;
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ReplicationDomainTest.java
@@ -33,6 +33,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.Task;
@@ -44,13 +45,15 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationDomain.*;
import org.opends.server.replication.service.ReplicationDomain.ImportExportContext;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.testng.Assert.*;
/**
@@ -117,7 +120,7 @@
       * Check that domain2 receives it shortly after.
       */
      byte[] test = {1, 2, 3 ,4, 0, 1, 2, 3, 4, 5};
      domain1.publish(test);
      publish(domain1, test);
      UpdateMsg rcvdMsg = rcvQueue2.poll(20, TimeUnit.SECONDS);
      assertNotNull(rcvdMsg);
@@ -185,6 +188,40 @@
    }
  }
  /**
   * Publish information to the Replication Service (not assured mode).
   *
   * @param msg  The byte array containing the information that should
   *             be sent to the remote entities.
   */
  void publish(FakeReplicationDomain domain, byte[] msg)
  {
    UpdateMsg updateMsg;
    synchronized (this)
    {
      updateMsg = new UpdateMsg(domain.getGenerator().newCSN(), msg);
      // If assured replication is configured,
      // this will prepare blocking mechanism.
      // If assured replication is disabled, this returns immediately
      domain.prepareWaitForAckIfAssuredEnabled(updateMsg);
      domain.publish(updateMsg);
    }
    try
    {
      // If assured replication is enabled,
      // this will wait for the matching ack or time out.
      // If assured replication is disabled, this returns immediately
      domain.waitForAckIfAssuredEnabled(updateMsg);
    }
    catch (TimeoutException ex)
    {
      // This exception may only be raised if assured replication is enabled
      logError(NOTE_DS_ACK_TIMEOUT.get(domain.getBaseDNString(), Long
          .toString(domain.getAssuredTimeout()), updateMsg.toString()));
    }
  }
  private void assertExpectedServerStatuses(Map<Integer, DSInfo> dsInfos,
      int domain1ServerId, int domain2ServerId)
  {
@@ -236,29 +273,11 @@
       */
      byte[] test = {1, 2, 3 ,4, 0, 1, 2, 3, 4, 5};
      long timeStart = System.nanoTime();
      for (int i=0; i< 100000; i++)
        domain1.publish(test);
      long timeNow = System.nanoTime();
      System.out.println(timeNow - timeStart);
      timeStart = timeNow;
      for (int i=0; i< 100000; i++)
        domain1.publish(test);
      timeNow = System.nanoTime();
      System.out.println(timeNow - timeStart);
      timeStart = timeNow;
      for (int i=0; i< 100000; i++)
        domain1.publish(test);
      timeNow = System.nanoTime();
      System.out.println(timeNow - timeStart);
      timeStart = timeNow;
      for (int i=0; i< 100000; i++)
        domain1.publish(test);
      timeNow = System.nanoTime();
      System.out.println(timeNow - timeStart);
      timeNow = publishRepeatedly(domain1, test, timeNow);
      timeNow = publishRepeatedly(domain1, test, timeNow);
      timeNow = publishRepeatedly(domain1, test, timeNow);
      timeNow = publishRepeatedly(domain1, test, timeNow);
    }
    finally
    {
@@ -267,6 +286,18 @@
    }
  }
  private long publishRepeatedly(FakeReplicationDomain domain1, byte[] test, long timeNow)
  {
    long timeStart = timeNow;
    for (int i = 0; i < 100000; i++)
    {
      publish(domain1, test);
    }
    timeNow = System.nanoTime();
    System.out.println(timeNow - timeStart);
    return timeNow;
  }
  private ReplicationServer createReplicationServer(int serverId,
      int replicationPort, String dirName, int windowSize,
      String... replServers) throws Exception