mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
05.31.2007 17beeae33bb7d73dee3f1a4f9bdf18e5645717d7
Fix for #2655: Renaming ReplicationCache into ReplicationServerDomain
1 files renamed
12 files modified
405 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/DbHandler.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplServerAckMessageList.java 22 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java 50 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDB.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 86 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 14 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 118 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 58 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 20 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/package-info.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 10 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 12 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -344,8 +344,8 @@
  /**
   * Run method for this class.
   * Periodically Flushes the ReplicationCache from memory to the stable storage
   * and trims the old updates.
   * Periodically Flushes the ReplicationServerDomain cache from memory to the
   * stable storage and trims the old updates.
   */
  public void run()
  {
opends/src/server/org/opends/server/replication/server/ReplServerAckMessageList.java
@@ -35,7 +35,7 @@
public class ReplServerAckMessageList extends AckMessageList
{
  private short replicationServerId;
  private ReplicationCache replicationCache;
  private ReplicationServerDomain replicationServerDomain;
  /**
   * Creates a new AckMessageList for a given ChangeNumber.
@@ -45,17 +45,17 @@
   *                        original change.
   * @param replicationServerId The Identifier of the replication server
   *                          from which the change was received.
   * @param replicationCache The ReplicationCache from which he change
   *                         was received.
   * @param replicationServerDomain The ReplicationServerDomain from which he
   *                         change was received.
   */
  public ReplServerAckMessageList(ChangeNumber changeNumber,
                                 int numExpectedAcks,
                                 short replicationServerId,
                                 ReplicationCache replicationCache)
                                ReplicationServerDomain replicationServerDomain)
  {
    super(changeNumber, numExpectedAcks);
    this.replicationServerId = replicationServerId;
    this.replicationCache = replicationCache;
    this.replicationServerDomain = replicationServerDomain;
  }
  /**
@@ -70,14 +70,14 @@
  }
  /**
   * Get the replicationCache of the replication server from which we received
   * the change.
   * @return Returns the replicationCache of the replication server from which
   *         we received the change .
   * Get the replicationServerDomain of the replication server from which we
   * received the change.
   * @return Returns the replicationServerDomain of the replication server from
   *         which we received the change .
   */
  public ReplicationCache getChangelogCache()
  public ReplicationServerDomain getChangelogCache()
  {
    return replicationCache;
    return replicationServerDomain;
  }
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -347,13 +347,13 @@
    //This method only returns the number of actual change entries, the
    //domain and any baseDN entries are not counted.
    long retNum=0;
    Iterator<ReplicationCache> rcachei = server.getCacheIterator();
    Iterator<ReplicationServerDomain> rcachei = server.getCacheIterator();
    if (rcachei != null)
    {
      while (rcachei.hasNext())
      {
        ReplicationCache rc = rcachei.next();
        retNum += rc.getChangesCount();
        ReplicationServerDomain rsd = rcachei.next();
        retNum += rsd.getChangesCount();
      }
    }
    return retNum;
@@ -531,18 +531,18 @@
  {
    List<DN> includeBranches = exportConfig.getIncludeBranches();
    DN baseDN;
    ArrayList<ReplicationCache> exportContainers =
      new ArrayList<ReplicationCache>();
    ArrayList<ReplicationServerDomain> exportContainers =
      new ArrayList<ReplicationServerDomain>();
    if(server == null) {
       Message message = ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED.get();
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,message);
    }
    Iterator<ReplicationCache> rcachei = server.getCacheIterator();
    if (rcachei != null)
    Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator();
    if (rsdi != null)
    {
      while (rcachei.hasNext())
      while (rsdi.hasNext())
      {
        ReplicationCache rc = rcachei.next();
        ReplicationServerDomain rc = rsdi.next();
        // Skip containers that are not covered by the include branches.
        baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN);
@@ -598,7 +598,7 @@
    // Iterate through the containers.
    try
    {
      for (ReplicationCache exportContainer : exportContainers)
      for (ReplicationServerDomain exportContainer : exportContainers)
      {
        if (exportConfig.isCancelled())
        {
@@ -642,7 +642,7 @@
  /*
   * Exports the root changes of the export, and one entry by domain.
   */
  private void exportRootChanges(List<ReplicationCache> exportContainers,
  private void exportRootChanges(List<ReplicationServerDomain> exportContainers,
      LDIFExportConfig exportConfig, LDIFWriter ldifWriter)
  {
    Map<AttributeType,List<Attribute>> attributes =
@@ -668,7 +668,7 @@
    }
    catch (Exception e) {}
    for (ReplicationCache exportContainer : exportContainers)
    for (ReplicationServerDomain exportContainer : exportContainers)
    {
      if (exportConfig != null && exportConfig.isCancelled())
      {
@@ -725,21 +725,21 @@
  }
  /**
   * Processes the changes for a given ReplicationCache.
   * Processes the changes for a given ReplicationServerDomain.
   */
  private void processContainer(ReplicationCache rc,
  private void processContainer(ReplicationServerDomain rsd,
      LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
      SearchOperation searchOperation)
  {
    // Walk through the servers
    for (Short serverId : rc.getServers())
    for (Short serverId : rsd.getServers())
    {
      if (exportConfig != null && exportConfig.isCancelled())
      {
        break;
      }
      ReplicationIterator ri = rc.getChangelogIterator(serverId,
      ReplicationIterator ri = rsd.getChangelogIterator(serverId,
          null);
      if (ri != null)
@@ -1139,8 +1139,8 @@
    // Get the base DN, scope, and filter for the search.
    DN           searchBaseDN = searchOperation.getBaseDN();
    DN baseDN;
    ArrayList<ReplicationCache> searchContainers =
      new ArrayList<ReplicationCache>();
    ArrayList<ReplicationServerDomain> searchContainers =
      new ArrayList<ReplicationServerDomain>();
    //This check is for GroupManager initialization. It currently doesn't
    //come into play because the replication server variable is null in
@@ -1202,25 +1202,25 @@
    }
    // Walk through all entries and send the ones that match.
    Iterator<ReplicationCache> rcachei = server.getCacheIterator();
    if (rcachei != null)
    Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator();
    if (rsdi != null)
    {
      while (rcachei.hasNext())
      while (rsdi.hasNext())
      {
        ReplicationCache rc = rcachei.next();
        ReplicationServerDomain rsd = rsdi.next();
        // Skip containers that are not covered by the include branches.
        baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN);
        baseDN = DN.decode(rsd.getBaseDn().toString() + "," + EXPORT_BASE_DN);
            if (searchBaseDN.isDescendantOf(baseDN) ||
                searchBaseDN.isAncestorOf(baseDN))
            {
              searchContainers.add(rc);
              searchContainers.add(rsd);
            }
      }
    }
    for (ReplicationCache exportContainer : searchContainers)
    for (ReplicationServerDomain exportContainer : searchContainers)
    {
      processContainer(exportContainer, null, null, searchOperation);
    }
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -78,9 +78,10 @@
    this.dbenv = dbenv;
    this.replicationServer = replicationServer;
    // Get or create the associated Replicationcache and Db.
    // Get or create the associated ReplicationServerDomain and Db.
    db = dbenv.getOrAddDb(serverId, baseDn,
        replicationServer.getReplicationCache(baseDn, true).getGenerationId());
        replicationServer.getReplicationServerDomain(baseDn,
        true).getGenerationId());
  }
  /**
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -184,7 +184,7 @@
                " Has read baseDn=" + baseDn
                + " generationId=" + generationId);
            replicationServer.getReplicationCache(baseDn, true).
            replicationServer.getReplicationServerDomain(baseDn, true).
            setGenerationId(generationId, true);
          }
        }
@@ -259,7 +259,7 @@
          DbHandler dbHandler =
            new DbHandler(serverId, baseDn, replicationServer, this, 1);
          replicationServer.getReplicationCache(baseDn, true).
          replicationServer.getReplicationServerDomain(baseDn, true).
          setDbHandler(serverId, dbHandler);
        }
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -88,7 +88,8 @@
 * and publisher objects for
 * connection with LDAP servers and with replication servers
 *
 * It is responsible for creating the replication server cache and managing it
 * It is responsible for creating the replication server replicationServerDomain
 * and managing it
 */
public class ReplicationServer extends MonitorProvider<MonitorProviderCfg>
  implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>,
@@ -108,8 +109,8 @@
  /* This table is used to store the list of dn for which we are currently
   * handling servers.
   */
  private ConcurrentHashMap<DN, ReplicationCache> baseDNs =
          new ConcurrentHashMap<DN, ReplicationCache>();
  private ConcurrentHashMap<DN, ReplicationServerDomain> baseDNs =
          new ConcurrentHashMap<DN, ReplicationServerDomain>();
  private String localURL = "null";
  private boolean shutdown = false;
@@ -279,9 +280,10 @@
       * periodically check that we are connected to all other
       * replication servers and if not establish the connection
       */
      for (ReplicationCache replicationCache: baseDNs.values())
      for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
      {
        Set<String> connectedReplServers = replicationCache.getChangelogs();
        Set<String> connectedReplServers =
                replicationServerDomain.getChangelogs();
        /*
         * check that all replication server in the config are in the connected
         * Set. If not create the connection
@@ -301,7 +303,7 @@
                && (serverAddress.compareTo(this.localURL) != 0)
                && (!connectedReplServers.contains(serverAddress)))
            {
              this.connect(serverURL, replicationCache.getBaseDn());
              this.connect(serverURL, replicationServerDomain.getBaseDn());
            }
          }
          catch (IOException e)
@@ -396,7 +398,7 @@
          this);
      /*
       * create replicationServer cache
       * create replicationServer replicationServerDomain
       */
      serverId = changelogId;
@@ -461,28 +463,32 @@
  }
  /**
   * Get the ReplicationCache associated to the base DN given in parameter.
   * Get the ReplicationServerDomain associated to the base DN given in
   * parameter.
   *
   * @param baseDn The base Dn for which the ReplicationCache must be returned.
   * @param create Specifies whether to create the ReplicationCache if it does
   *        not already exist.
   * @return The ReplicationCache associated to the base DN given in parameter.
   * @param baseDn The base Dn for which the ReplicationServerDomain must be
   * returned.
   * @param create Specifies whether to create the ReplicationServerDomain if
   *        it does not already exist.
   * @return The ReplicationServerDomain associated to the base DN given in
   *         parameter.
   */
  public ReplicationCache getReplicationCache(DN baseDn, boolean create)
  public ReplicationServerDomain getReplicationServerDomain(DN baseDn,
          boolean create)
  {
    ReplicationCache replicationCache;
    ReplicationServerDomain replicationServerDomain;
    synchronized (baseDNs)
    {
      replicationCache = baseDNs.get(baseDn);
      if ((replicationCache == null) && (create))
      replicationServerDomain = baseDNs.get(baseDn);
      if ((replicationServerDomain == null) && (create))
      {
        replicationCache = new ReplicationCache(baseDn, this);
        baseDNs.put(baseDn, replicationCache);
        replicationServerDomain = new ReplicationServerDomain(baseDn, this);
        baseDNs.put(baseDn, replicationServerDomain);
      }
    }
    return replicationCache;
    return replicationServerDomain;
  }
  /**
@@ -520,9 +526,9 @@
    }
    // shutdown all the ChangelogCaches
    for (ReplicationCache replicationCache : baseDNs.values())
    for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
    {
      replicationCache.shutdown();
      replicationServerDomain.shutdown();
    }
    if (dbEnv != null)
@@ -539,7 +545,8 @@
   *
   * @param id The serverId for which the dbHandler must be created.
   * @param baseDn The DN for which the dbHandler muste be created.
   * @param generationId The generationId for this server and this domain.
   * @param generationId The generationId for this server and this
   *        replicationServerDomain.
   * @return The new DB handler for this ReplicationServer and the serverId and
   *         DN given in parameter.
   * @throws DatabaseException in case of underlying database problem.
@@ -551,7 +558,8 @@
  }
  /**
   * Clears the generationId for the domain related to the provided baseDn.
   * Clears the generationId for the replicationServerDomain related to the
   * provided baseDn.
   * @param  baseDn The baseDn for which to delete the generationId.
   * @throws DatabaseException When it occurs.
   */
@@ -755,7 +763,7 @@
    Attribute bases = new Attribute(baseType, "base-dn", baseValues);
    attributes.add(bases);
    // Publish to monitor the generation ID by domain
    // Publish to monitor the generation ID by replicationServerDomain
    AttributeType generationIdType=
      DirectoryServer.getAttributeType("base-dn-generation-id", true);
    LinkedHashSet<AttributeValue> generationIdValues =
@@ -763,9 +771,10 @@
    for (DN base : baseDNs.keySet())
    {
      long generationId=-1;
      ReplicationCache cache = getReplicationCache(base, false);
      if (cache != null)
        generationId = cache.getGenerationId();
      ReplicationServerDomain replicationServerDomain =
              getReplicationServerDomain(base, false);
      if (replicationServerDomain != null)
        generationId = replicationServerDomain.getGenerationId();
      generationIdValues.add(new AttributeValue(generationIdType,
          base.toString() + " " + generationId));
    }
@@ -777,17 +786,18 @@
  }
  /**
   * Get the value of generationId for the replication domain
   * Get the value of generationId for the replication replicationServerDomain
   * associated with the provided baseDN.
   *
   * @param baseDN The baseDN of the domain.
   * @param baseDN The baseDN of the replicationServerDomain.
   * @return The value of the generationID.
   */
  public long getGenerationId(DN baseDN)
  {
    ReplicationCache rc = this.getReplicationCache(baseDN, false);
    if (rc!=null)
      return rc.getGenerationId();
    ReplicationServerDomain rsd =
            this.getReplicationServerDomain(baseDN, false);
    if (rsd!=null)
      return rsd.getGenerationId();
    return -1;
  }
@@ -962,7 +972,7 @@
          " Export starts");
    if (backend.getBackendID().equals(backendId))
    {
      // Retrieves the backend related to this domain
      // Retrieves the backend related to this replicationServerDomain
      // backend =
      ReplicationBackend b =
      (ReplicationBackend)DirectoryServer.getBackend(backendId);
@@ -980,11 +990,11 @@
  }
  /**
   * Returns an iterator on the list of replicationCache.
   * Returns an iterator on the list of replicationServerDomain.
   * Returns null if none.
   * @return the iterator.
   */
  public Iterator<ReplicationCache> getCacheIterator()
  public Iterator<ReplicationServerDomain> getCacheIterator()
  {
    if (!baseDNs.isEmpty())
      return baseDNs.values().iterator();
@@ -997,13 +1007,13 @@
   */
  public void clearDb()
  {
    Iterator<ReplicationCache> rcachei = getCacheIterator();
    Iterator<ReplicationServerDomain> rcachei = getCacheIterator();
    if (rcachei != null)
    {
      while (rcachei.hasNext())
      {
        ReplicationCache rc = rcachei.next();
        rc.clearDbs();
        ReplicationServerDomain rsd = rcachei.next();
        rsd.clearDbs();
      }
    }
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
File was renamed from opends/src/server/org/opends/server/replication/server/ReplicationCache.java
@@ -70,7 +70,7 @@
 * received to the disk and for trimming them
 * Decision to trim can be based on disk space or age of the message
 */
public class ReplicationCache
public class ReplicationServerDomain
{
  private Object flowControlLock = new Object();
  private DN baseDn = null;
@@ -119,13 +119,13 @@
  private static final DebugTracer TRACER = getTracer();
  /**
   * Creates a new ReplicationCache associated to the DN baseDn.
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
   *
   * @param baseDn The baseDn associated to the ReplicationCache.
   * @param baseDn The baseDn associated to the ReplicationServerDomain.
   * @param replicationServer the ReplicationServer that created this
   *                          replicationServer cache.
   */
  public ReplicationCache(DN baseDn, ReplicationServer replicationServer)
  public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer)
  {
    this.baseDn = baseDn;
    this.replicationServer = replicationServer;
@@ -531,7 +531,7 @@
  }
  /**
   * Returns the change count for that ReplicationCache.
   * Returns the change count for that ReplicationServerDomain.
   *
   * @return the change count.
   */
@@ -842,7 +842,7 @@
    }
    /**
     * Shutdown this ReplicationCache.
     * Shutdown this ReplicationServerDomain.
     */
    public void shutdown()
    {
@@ -890,7 +890,7 @@
    @Override
    public String toString()
    {
      return "ReplicationCache " + baseDn;
      return "ReplicationServerDomain " + baseDn;
    }
    /**
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -111,7 +111,7 @@
  private MsgQueue lateQueue = new MsgQueue();
  private final Map<ChangeNumber, AckMessageList> waitingAcks  =
          new HashMap<ChangeNumber, AckMessageList>();
  private ReplicationCache replicationCache = null;
  private ReplicationServerDomain replicationServerDomain = null;
  private String serverURL;
  private int outCount = 0; // number of update sent to the server
  private int inCount = 0;  // number of updates received from the server
@@ -227,11 +227,13 @@
        // This is an outgoing connection. Publish our start message.
        this.baseDn = baseDn;
        // Get or create the ReplicationCache
        replicationCache = replicationServer.getReplicationCache(baseDn, true);
        localGenerationId = replicationCache.getGenerationId();
        // Get or create the ReplicationServerDomain
        replicationServerDomain =
                replicationServer.getReplicationServerDomain(baseDn, true);
        localGenerationId = replicationServerDomain.getGenerationId();
        ServerState localServerState = replicationCache.getDbServerState();
        ServerState localServerState =
                replicationServerDomain.getDbServerState();
        ReplServerStartMessage msg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    baseDn, windowSize, localServerState,
@@ -298,12 +300,13 @@
        serverIsLDAPserver = true;
        // Get or Create the ReplicationCache
        replicationCache = replicationServer.getReplicationCache(this.baseDn,
            true);
        localGenerationId = replicationCache.getGenerationId();
        // Get or Create the ReplicationServerDomain
        replicationServerDomain =
                replicationServer.getReplicationServerDomain(this.baseDn, true);
        localGenerationId = replicationServerDomain.getGenerationId();
        ServerState localServerState = replicationCache.getDbServerState();
        ServerState localServerState =
                replicationServerDomain.getDbServerState();
        // This an incoming connection. Publish our start message
        ReplServerStartMessage myStartMsg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
@@ -322,9 +325,10 @@
        if (debugEnabled())
        {
          Set<String> ss = this.serverState.toStringSet();
          Set<String> lss = replicationCache.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
                   getMonitorInstanceName() +
          Set<String> lss =
                  replicationServerDomain.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationServerDomain.
                   getReplicationServer().getMonitorInstanceName() +
                   ", SH received START from LS serverId=" + serverId +
                   " baseDN=" + this.baseDn +
                   " generationId=" + generationId +
@@ -376,7 +380,7 @@
          }
          else
          {
            replicationCache.setGenerationId(generationId, false);
            replicationServerDomain.setGenerationId(generationId, false);
          }
        }
      }
@@ -396,11 +400,11 @@
        this.baseDn = receivedMsg.getBaseDn();
        if (baseDn == null)
        {
          // Get or create the ReplicationCache
          replicationCache = replicationServer.getReplicationCache(this.baseDn,
              true);
          localGenerationId = replicationCache.getGenerationId();
          ServerState serverState = replicationCache.getDbServerState();
          // Get or create the ReplicationServerDomain
          replicationServerDomain = replicationServer.
                  getReplicationServerDomain(this.baseDn, true);
          localGenerationId = replicationServerDomain.getGenerationId();
          ServerState serverState = replicationServerDomain.getDbServerState();
          // The session initiator decides whether to use SSL.
          sslEncryption = receivedMsg.getSSLEncryption();
@@ -431,9 +435,10 @@
        if (debugEnabled())
        {
          Set<String> ss = this.serverState.toStringSet();
          Set<String> lss = replicationCache.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
                   getMonitorInstanceName() +
          Set<String> lss =
                  replicationServerDomain.getDbServerState().toStringSet();
          TRACER.debugInfo("In " + replicationServerDomain.
                   getReplicationServer().getMonitorInstanceName() +
                   ", SH received START from RS serverId=" + serverId +
                   " baseDN=" + this.baseDn +
                   " generationId=" + generationId +
@@ -448,7 +453,8 @@
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationCache.getReplicationServer().
            TRACER.debugInfo("In " +
                    replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + " RS with serverID=" + serverId +
              " is connected with the right generation ID");
          }
@@ -464,7 +470,7 @@
              if (generationId != localGenerationId)
              {
                // if the 2 RS have different generationID
                if (replicationCache.getGenerationIdSavedStatus())
                if (replicationServerDomain.getGenerationIdSavedStatus())
                {
                  // it the present RS has received changes regarding its
                  //     gen ID and so won't change without a reset
@@ -497,7 +503,8 @@
                  //         set the gen ID received from the peer RS
                  //         specially if the peer has a non nul state and
                  //         we have a nul state ?
                  // replicationCache.setGenerationId(generationId, false);
                  // replicationServerDomain.
                  // setGenerationId(generationId, false);
                  Message message = NOTE_BAD_GENERATION_ID.get(
                      this.baseDn.toNormalizedString(),
                      Short.toString(receivedMsg.getServerId()),
@@ -519,7 +526,7 @@
          else
          {
            // The local RS is not initialized - take the one received
            replicationCache.setGenerationId(generationId, false);
            replicationServerDomain.setGenerationId(generationId, false);
          }
        }
      }
@@ -529,18 +536,18 @@
        return;   // we did not recognize the message, ignore it
      }
      // Get or create the ReplicationCache
      replicationCache = replicationServer.getReplicationCache(this.baseDn,
          true);
      // Get or create the ReplicationServerDomain
      replicationServerDomain = replicationServer.
              getReplicationServerDomain(this.baseDn,true);
      boolean started;
      if (serverIsLDAPserver)
      {
        started = replicationCache.startServer(this);
        started = replicationServerDomain.startServer(this);
      }
      else
      {
        started = replicationCache.startReplicationServer(this);
        started = replicationServerDomain.startReplicationServer(this);
      }
      if (started)
@@ -548,8 +555,10 @@
        // sendWindow MUST be created before starting the writer
        sendWindow = new Semaphore(sendWindowSize);
        writer = new ServerWriter(session, serverId, this, replicationCache);
        reader = new ServerReader(session, serverId, this, replicationCache);
        writer = new ServerWriter(session, serverId,
                this, replicationServerDomain);
        reader = new ServerReader(session, serverId,
                this, replicationServerDomain);
        reader.start();
        writer.start();
@@ -575,7 +584,8 @@
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationCache.getReplicationServer().
            TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() + " RS failed to start locally " +
              " the connection from serverID="+serverId);
          }
@@ -812,7 +822,7 @@
        * the sum of the number of missing changes for every dbHandler.
        */
       int totalCount = 0;
       ServerState dbState = replicationCache.getDbServerState();
       ServerState dbState = replicationServerDomain.getDbServerState();
       for (short id : dbState)
       {
         int max = dbState.getMaxChangeNumber(id).getSeqnum();
@@ -926,7 +936,7 @@
     * Ignore updates from a server that is degraded due to
     * its inconsistent generationId
     */
    long referenceGenerationId = replicationCache.getGenerationId();
    long referenceGenerationId = replicationServerDomain.getGenerationId();
    if ((referenceGenerationId>0) &&
        (referenceGenerationId != generationId))
    {
@@ -993,7 +1003,7 @@
      saturationCount = 0;
      try
      {
        replicationCache.checkAllSaturation();
        replicationServerDomain.checkAllSaturation();
      }
      catch (IOException e)
      {
@@ -1059,11 +1069,11 @@
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
          /* fill the lateQueue */
          for (short serverId : replicationCache.getServers())
          for (short serverId : replicationServerDomain.getServers())
          {
            ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
            ReplicationIterator iterator =
              replicationCache.getChangelogIterator(serverId, lastCsn);
              replicationServerDomain.getChangelogIterator(serverId, lastCsn);
            if ((iterator != null) && (iterator.getChange() != null))
            {
              iteratorSortedSet.add(iterator);
@@ -1244,7 +1254,7 @@
    }
    if (completedFlag)
    {
      replicationCache.sendAck(changeNumber, true);
      replicationServerDomain.sendAck(changeNumber, true);
    }
  }
@@ -1274,8 +1284,9 @@
    }
    if (completedFlag)
    {
      ReplicationCache replicationCache = ackList.getChangelogCache();
      replicationCache.sendAck(changeNumber, false,
      ReplicationServerDomain replicationServerDomain =
              ackList.getChangelogCache();
      replicationServerDomain.sendAck(changeNumber, false,
                             ackList.getReplicationServerId());
    }
  }
@@ -1304,20 +1315,22 @@
   * @param update The update that must be added to the list.
   * @param ChangelogServerId The identifier of the replicationServer that sent
   *                          the update.
   * @param replicationCache The ReplicationCache from which the change was
   *                         processed and to which the ack must later be sent.
   * @param replicationServerDomain The ReplicationServerDomain from which the
   *                                change was processed and to which the ack
   *                                must later be sent.
   * @param nbWaitedAck The number of ack that must be received before
   *                    the update is fully acked.
   */
  public static void addWaitingAck(
      UpdateMessage update,
      short ChangelogServerId, ReplicationCache replicationCache,
      short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
      int nbWaitedAck)
  {
    ReplServerAckMessageList ackList =
          new ReplServerAckMessageList(update.getChangeNumber(),
                                      nbWaitedAck,
                                      ChangelogServerId, replicationCache);
                                      ChangelogServerId,
                                      replicationServerDomain);
    synchronized(changelogsWaitingAcks)
    {
      changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
@@ -1561,7 +1574,7 @@
    {
      if (flowControl)
      {
        if (replicationCache.restartAfterSaturation(this))
        if (replicationServerDomain.restartAfterSaturation(this))
        {
          flowControl = false;
        }
@@ -1605,11 +1618,11 @@
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
                 getMonitorInstanceName() +
                 " SH for remote server " + this.getMonitorInstanceName() +
                 " processes received msg=" + msg);
    replicationCache.process(msg, this);
    replicationServerDomain.process(msg, this);
  }
  /**
@@ -1623,7 +1636,7 @@
   throws IOException
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sends message=" + info);
@@ -1640,7 +1653,7 @@
   public void receiveReplServerInfo(ReplServerInfoMessage infoMsg)
   {
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationCache.getReplicationServer().
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() +
           " SH for remote server " + this.getMonitorInstanceName() +
           " sets replServerInfo " + "<" + infoMsg + ">");
@@ -1691,7 +1704,8 @@
  public void send(RoutableMessage msg) throws IOException
  {
    if (debugEnabled())
          TRACER.debugInfo("In " + replicationCache.getReplicationServer().
          TRACER.debugInfo("In " +
              replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
              " SH for remote server " + this.getMonitorInstanceName() +
              " sends message=" + msg);
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -72,7 +72,7 @@
  private short serverId;
  private ProtocolSession session;
  private ServerHandler handler;
  private ReplicationCache replicationCache;
  private ReplicationServerDomain replicationServerDomain;
  /**
   * Constructor for the LDAP server reader part of the replicationServer.
@@ -80,16 +80,18 @@
   * @param session The ProtocolSession from which to read the data.
   * @param serverId The server ID of the server from which we read messages.
   * @param handler The server handler for this server reader.
   * @param replicationCache The ReplicationCache for this server reader.
   * @param replicationServerDomain The ReplicationServerDomain for this server
   *        reader.
   */
  public ServerReader(ProtocolSession session, short serverId,
                      ServerHandler handler, ReplicationCache replicationCache)
                      ServerHandler handler,
                      ReplicationServerDomain replicationServerDomain)
  {
    super(handler.toString() + " reader");
    this.session = session;
    this.serverId = serverId;
    this.handler = handler;
    this.replicationCache = replicationCache;
    this.replicationServerDomain = replicationServerDomain;
  }
  /**
@@ -100,14 +102,15 @@
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "In RS " + replicationCache.getReplicationServer().
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" RS ":" LS")+
          " reader starting for serverId=" + serverId);
    }
    /*
     * wait on input stream
     * grab all incoming messages and publish them to the replicationCache
     * grab all incoming messages and publish them to the
     * replicationServerDomain
     */
    try
    {
@@ -118,7 +121,7 @@
        if (debugEnabled())
        {
          TRACER.debugInfo(
              "In RS " + replicationCache.getReplicationServer().
              "In RS " + replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
              (handler.isReplicationServer()?" From RS ":" From LS")+
              " with serverId=" + serverId + " receives " + msg);
@@ -127,13 +130,14 @@
        {
          AckMessage ack = (AckMessage) msg;
          handler.checkWindow();
          replicationCache.ack(ack, serverId);
          replicationServerDomain.ack(ack, serverId);
        }
        else if (msg instanceof UpdateMessage)
        {
          // Ignore update received from a replica with
          // a bad generation ID
          long referenceGenerationId = replicationCache.getGenerationId();
          long referenceGenerationId =
                  replicationServerDomain.getGenerationId();
          if ((referenceGenerationId>0) &&
              (referenceGenerationId != handler.getGenerationId()))
          {
@@ -145,7 +149,7 @@
          {
            UpdateMessage update = (UpdateMessage) msg;
            handler.decAndCheckWindow();
            replicationCache.put(update, handler);
            replicationServerDomain.put(update, handler);
          }
        }
        else if (msg instanceof WindowMessage)
@@ -182,7 +186,7 @@
        else if (msg instanceof ResetGenerationId)
        {
          ResetGenerationId genIdMsg = (ResetGenerationId) msg;
          replicationCache.resetGenerationId(this.handler, genIdMsg);
          replicationServerDomain.resetGenerationId(this.handler, genIdMsg);
        }
        else if (msg instanceof WindowProbe)
        {
@@ -198,19 +202,20 @@
          {
            if (handler.isReplicationServer())
              TRACER.debugInfo(
               "In RS " + replicationCache.getReplicationServer().
               "In RS " + replicationServerDomain.getReplicationServer().
               getServerId() +
               " Receiving replServerInfo from " + handler.getServerId() +
               " baseDn=" + replicationCache.getBaseDn() +
               " baseDn=" + replicationServerDomain.getBaseDn() +
               " genId=" + infoMsg.getGenerationId());
          }
          if (replicationCache.getGenerationId()<0)
          if (replicationServerDomain.getGenerationId()<0)
          {
            // Here is the case where a ReplicationServer receives from
            // another ReplicationServer the generationId for a domain
            // for which the generation ID has never been set.
            replicationCache.setGenerationId(infoMsg.getGenerationId(), false);
            replicationServerDomain.
                    setGenerationId(infoMsg.getGenerationId(),false);
          }
          else
          {
@@ -221,19 +226,20 @@
              // If we have generationId set locally and no server currently
              // connected for that domain in the topology then we may also
              // reset the generationId localy.
              replicationCache.mayResetGenerationId();
              replicationServerDomain.mayResetGenerationId();
            }
            if (replicationCache.getGenerationId() != infoMsg.getGenerationId())
            if (replicationServerDomain.getGenerationId() !=
                    infoMsg.getGenerationId())
            {
              Message message = NOTE_BAD_GENERATION_ID.get(
                  replicationCache.getBaseDn().toNormalizedString(),
                  replicationServerDomain.getBaseDn().toNormalizedString(),
                  Short.toString(handler.getServerId()),
                  Long.toString(infoMsg.getGenerationId()),
                  Long.toString(replicationCache.getGenerationId()));
                  Long.toString(replicationServerDomain.getGenerationId()));
              ErrorMessage errorMsg = new ErrorMessage(
                  replicationCache.getReplicationServer().getServerId(),
                  replicationServerDomain.getReplicationServer().getServerId(),
                  handler.getServerId(),
                  message);
              session.publish(errorMsg);
@@ -260,7 +266,7 @@
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS " + replicationCache.getReplicationServer().
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " reader IO EXCEPTION for serverID=" + serverId
          + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
@@ -270,7 +276,7 @@
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS <" + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " reader CNF EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e));
@@ -284,7 +290,7 @@
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationCache.getReplicationServer().
          "In RS <" + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " server reader EXCEPTION serverID=" + serverId
          + stackTraceToSingleLineString(e));
@@ -304,7 +310,7 @@
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS " + replicationCache.getReplicationServer().
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " server reader for serverID=" + serverId +
          " is closing the session");
@@ -315,11 +321,11 @@
      {
       // ignore
      }
      replicationCache.stopServer(handler);
      replicationServerDomain.stopServer(handler);
    }
    if (debugEnabled())
      TRACER.debugInfo(
          "In RS " + replicationCache.getReplicationServer().
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          (handler.isReplicationServer()?" RS":" LDAP") +
          " server reader stopped for serverID=" + serverId);
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -56,7 +56,7 @@
  private ProtocolSession session;
  private ServerHandler handler;
  private ReplicationCache replicationCache;
  private ReplicationServerDomain replicationServerDomain;
  private short serverId;
  /**
@@ -67,22 +67,24 @@
   * @param session the ProtocolSession that will be used to send updates.
   * @param serverId the Identifier of the server.
   * @param handler handler for which the ServerWriter is created.
   * @param replicationCache The ReplicationCache of this ServerWriter.
   * @param replicationServerDomain The ReplicationServerDomain of this
   *        ServerWriter.
   */
  public ServerWriter(ProtocolSession session, short serverId,
                      ServerHandler handler, ReplicationCache replicationCache)
                      ServerHandler handler,
                      ReplicationServerDomain replicationServerDomain)
  {
    super(handler.toString() + " writer");
    this.serverId = serverId;
    this.session = session;
    this.handler = handler;
    this.replicationCache = replicationCache;
    this.replicationServerDomain = replicationServerDomain;
  }
  /**
   * Run method for the ServerWriter.
   * Loops waiting for changes from the ReplicationCache and forward them
   * Loops waiting for changes from the ReplicationServerDomain and forward them
   * to the other servers
   */
  public void run()
@@ -102,12 +104,12 @@
    {
      while (true)
      {
        UpdateMessage update = replicationCache.take(this.handler);
        UpdateMessage update = replicationServerDomain.take(this.handler);
        if (update == null)
          return;       /* this connection is closing */
        // Ignore update to be sent to a replica with a bad generation ID
        long referenceGenerationId = replicationCache.getGenerationId();
        long referenceGenerationId = replicationServerDomain.getGenerationId();
        if ((referenceGenerationId != handler.getGenerationId())
            || (referenceGenerationId == -1)
            || (handler.getGenerationId() == -1))
@@ -121,7 +123,7 @@
        if (debugEnabled())
        {
          TRACER.debugInfo(
            "In " + replicationCache.getReplicationServer().
            "In " + replicationServerDomain.getReplicationServer().
              getMonitorInstanceName() +
            ", writer to " + this.handler.getMonitorInstanceName() +
            " publishes msg=" + update.toString() +
@@ -168,7 +170,7 @@
      {
       // Can't do much more : ignore
      }
      replicationCache.stopServer(handler);
      replicationServerDomain.stopServer(handler);
      if (debugEnabled())
      {
opends/src/server/org/opends/server/replication/server/package-info.java
@@ -53,7 +53,7 @@
 * ReplicationMessages objects. This class is used by both the
 * replicationServer and the replication package.
 * </li>
 * <li><A HREF="ReplicationCache.html"><B>ReplicationCache</B></A>
 * <li><A HREF="ReplicationServerDomain.html"><B>ReplicationServerDomain</B></A>
 * implements the multiplexing part of the replication
 * server. It contains method for forwarding all the received messages to
 * the ServerHandler and to the dbHandler objects.<br>
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -914,14 +914,14 @@
      rgenId = replServer1.getGenerationId(baseDn);
      assertEquals(genId, rgenId, "DS and replServer are expected to have same genId.");
      assertTrue(!replServer1.getReplicationCache(baseDn, false).
      assertTrue(!replServer1.getReplicationServerDomain(baseDn, false).
          isDegradedDueToGenerationId(server1ID),
      "Expecting that DS is not degraded since domain genId has been reset");
      assertTrue(replServer1.getReplicationCache(baseDn, false).
      assertTrue(replServer1.getReplicationServerDomain(baseDn, false).
          isDegradedDueToGenerationId(server2ID),
      "Expecting that broker2 is degraded since domain genId has been reset");
      assertTrue(replServer1.getReplicationCache(baseDn, false).
      assertTrue(replServer1.getReplicationServerDomain(baseDn, false).
          isDegradedDueToGenerationId(server3ID),
      "Expecting that broker3 is degraded since domain genId has been reset");
@@ -1106,7 +1106,7 @@
    }
    debugInfo("Expecting that broker2 is not degraded since it has a correct genId");
    assertTrue(!replServer1.getReplicationCache(baseDn, false).
    assertTrue(!replServer1.getReplicationServerDomain(baseDn, false).
        isDegradedDueToGenerationId(server2ID));
    debugInfo("Disconnecting DS from replServer1");
@@ -1132,7 +1132,7 @@
    }
    debugInfo("Expecting that broker3 is degraded since it has a bad genId");
    assertTrue(replServer1.getReplicationCache(baseDn, false).
    assertTrue(replServer1.getReplicationServerDomain(baseDn, false).
        isDegradedDueToGenerationId(server3ID));
    int found = testEntriesInDb();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -1090,25 +1090,25 @@
    // Check that the list of connected LDAP servers is correct
    // in each replication servers
    List<String> l1 = changelog1.getReplicationCache(baseDn, false).
    List<String> l1 = changelog1.getReplicationServerDomain(baseDn, false).
      getConnectedLDAPservers();
    assertEquals(l1.size(), 1);
    assertEquals(l1.get(0), String.valueOf(server1ID));
    
    List<String> l2;
    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 2);
    assertTrue(l2.contains(String.valueOf(server2ID)));
    assertTrue(l2.contains(String.valueOf(server3ID)));
        
    List<String> l3;
    l3 = changelog3.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    l3 = changelog3.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
    assertEquals(l3.size(), 0);
    // Test updates
    broker3.stop();
    Thread.sleep(1000);
    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server2ID));
@@ -1116,11 +1116,11 @@
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
    broker2.stop();
    Thread.sleep(1000);
    l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
    l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
    assertEquals(l2.size(), 1);
    assertEquals(l2.get(0), String.valueOf(server3ID));
    // TODO Test ReplicationCache.getDestinationServers method.
    // TODO Test ReplicationServerDomain.getDestinationServers method.
    broker2.stop();
    broker3.stop();