From 6b9fc13f71a6285b51fd2cb05a3cbbe9fdb9cc96 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 04 Oct 2013 09:31:56 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |   39 ++---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java          |  224 ++++++++++++++----------------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java  |   26 +--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java    |   22 --
 4 files changed, 119 insertions(+), 192 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index d576bb7..08e1ce3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -80,15 +80,14 @@
              BackupTaskListener, RestoreTaskListener, ImportTaskListener,
              ExportTaskListener
 {
-  private int serverId;
   private String serverURL;
 
   private ServerSocket listenSocket;
   private Thread listenThread;
   private Thread connectThread;
 
-  /** The list of replication server URLs configured by the administrator. */
-  private Collection<String> replicationServerUrls;
+  /** The current configuration of this replication server. */
+  private ReplicationServerCfg config;
 
   /**
    * This table is used to store the list of dn for which we are currently
@@ -97,18 +96,8 @@
   private final Map<DN, ReplicationServerDomain> baseDNs =
       new HashMap<DN, ReplicationServerDomain>();
 
-  private volatile boolean shutdown = false;
-  private int rcvWindow;
-  private int queueSize;
   private final ChangelogDB changelogDB;
-
-  /**
-   * The delay (in sec) after which the changes must be deleted from the
-   * persistent storage.
-   */
-  private long purgeDelay;
-
-  private int replicationPort;
+  private volatile boolean shutdown = false;
   private boolean stopListen = false;
   private ReplSessionSecurity replSessionSecurity;
 
@@ -120,27 +109,6 @@
   /** ID of the backend. */
   private static final String backendId = "replicationChanges";
 
-  /*
-   * Assured mode properties
-   */
-  /** Timeout (in milliseconds) when waiting for acknowledgments. */
-  private long assuredTimeout = 1000;
-
-  /** Group id. */
-  private byte groupId = 1;
-
-  /**
-   * Number of pending changes for a DS, considered as threshold value to put
-   * the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled.
-   */
-  private int degradedStatusThreshold = 5000;
-
-  /**
-   * Number of milliseconds to wait before sending new monitoring messages. If
-   * value is 0, monitoring publisher is disabled.
-   */
-  private long monitoringPublisherPeriod = 3000;
-
   /**
    * The tracer object for the debug logger.
    */
@@ -163,21 +131,6 @@
   private long domainTicket = 0L;
 
   /**
-   * The weight affected to the replication server.
-   * Each replication server of the topology has a weight. When combined
-   * together, the weights of the replication servers of a same group can be
-   * translated to a percentage that determines the quantity of directory
-   * servers of the topology that should be connected to a replication server.
-   * For instance imagine a topology with 3 replication servers (with the same
-   * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that
-   * RS1 should have 25% of the directory servers connected in the topology,
-   * RS2 25%, and RS3 50%. This may be useful if the replication servers of the
-   * topology have a different power and one wants to spread the load between
-   * the replication servers according to their power.
-   */
-  private int weight = 1;
-
-  /**
    * Holds the list of all replication servers instantiated in this VM.
    * This allows to perform clean up of the RS databases in unit tests.
    */
@@ -193,24 +146,11 @@
   public ReplicationServer(ReplicationServerCfg configuration)
     throws ConfigException
   {
-    replicationPort = configuration.getReplicationPort();
-    serverId = configuration.getReplicationServerId();
-    replicationServerUrls = configuration.getReplicationServer();
-    if (replicationServerUrls == null)
-      replicationServerUrls = new ArrayList<String>();
-    queueSize = configuration.getQueueSize();
-    purgeDelay = configuration.getReplicationPurgeDelay();
-    rcvWindow = configuration.getWindowSize();
+    this.config = configuration;
 
     this.changelogDB =
         new JEChangelogDB(this, configuration.getReplicationDBDirectory());
 
-    groupId = (byte)configuration.getGroupId();
-    weight = configuration.getWeight();
-    assuredTimeout = configuration.getAssuredTimeout();
-    degradedStatusThreshold = configuration.getDegradedStatusThreshold();
-    monitoringPublisherPeriod = configuration.getMonitoringPeriod();
-
     replSessionSecurity = new ReplSessionSecurity();
     initialize();
     configuration.addChangeListener(this);
@@ -218,7 +158,7 @@
     {
       backendConfigEntryDN =
          DN.decode("ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config");
-    } catch (Exception e) { /* do nothing */ }
+    } catch (DirectoryException e) { /* do nothing */ }
 
     // Creates the backend associated to this ReplicationServer
     // if it does not exist.
@@ -229,16 +169,16 @@
     DirectoryServer.registerExportTaskListener(this);
     DirectoryServer.registerImportTaskListener(this);
 
-    localPorts.add(replicationPort);
+    localPorts.add(getReplicationPort());
 
     // Keep track of this new instance
     allInstances.add(this);
   }
 
-  private Set<HostPort> toHostPorts(Collection<String> serverAddresses)
+  private Set<HostPort> getConfiguredRSAddresses()
   {
     final Set<HostPort> results = new HashSet<HostPort>();
-    for (String serverAddress : serverAddresses)
+    for (String serverAddress : this.config.getReplicationServer())
     {
       results.add(HostPort.valueOf(serverAddress));
     }
@@ -302,6 +242,8 @@
 
         ReplicationMsg msg = session.receive();
 
+        final int queueSize = this.config.getQueueSize();
+        final int rcvWindow = this.config.getWindowSize();
         if (msg instanceof ServerStartMsg)
         {
           DataServerHandler dsHandler = new DataServerHandler(
@@ -360,7 +302,7 @@
     {
       while (!shutdown)
       {
-        final HostPort localAddress = HostPort.localAddress(replicationPort);
+        HostPort localAddress = HostPort.localAddress(getReplicationPort());
         for (ReplicationServerDomain domain : getReplicationServerDomains())
         {
           /*
@@ -371,7 +313,7 @@
            */
           final Set<HostPort> connectedRSAddresses =
               getConnectedRSAddresses(domain);
-          for (HostPort rsAddress : toHostPorts(replicationServerUrls))
+          for (HostPort rsAddress : getConfiguredRSAddresses())
           {
             if (connectedRSAddresses.contains(rsAddress))
             {
@@ -448,7 +390,7 @@
       session = replSessionSecurity.createClientSession(socket, timeoutMS);
 
       ReplicationServerHandler rsHandler = new ReplicationServerHandler(
-          session, queueSize, this, rcvWindow);
+          session, config.getQueueSize(), this, config.getWindowSize());
       rsHandler.connect(baseDN, sslEncryption);
     }
     catch (Exception e)
@@ -473,7 +415,7 @@
 
       setServerURL();
       listenSocket = new ServerSocket();
-      listenSocket.bind(new InetSocketAddress(replicationPort));
+      listenSocket.bind(new InetSocketAddress(getReplicationPort()));
 
       // creates working threads: we must first connect, then start to listen.
       if (debugEnabled())
@@ -506,8 +448,8 @@
       logError(ERR_UNKNOWN_HOSTNAME.get());
     } catch (IOException e)
     {
-      Message message =
-          ERR_COULD_NOT_BIND_CHANGELOG.get(replicationPort, e.getMessage());
+      Message message = ERR_COULD_NOT_BIND_CHANGELOG.get(
+          getReplicationPort(), e.getMessage());
       logError(message);
     } catch (DirectoryException e)
     {
@@ -721,7 +663,7 @@
    */
   public void shutdown()
   {
-    localPorts.remove(replicationPort);
+    localPorts.remove(getReplicationPort());
 
     if (shutdown)
       return;
@@ -785,7 +727,7 @@
    */
   public long getTrimAge()
   {
-    return purgeDelay * 1000;
+    return this.config.getReplicationPurgeDelay() * 1000;
   }
 
   /**
@@ -828,29 +770,22 @@
     // Some of those properties change don't need specific code.
     // They will be applied for next connections. Some others have immediate
     // effect
+    final Set<HostPort> oldRSAddresses = getConfiguredRSAddresses();
 
-    disconnectRemovedReplicationServers(
-        toHostPorts(configuration.getReplicationServer()));
+    final ReplicationServerCfg oldConfig = this.config;
+    this.config = configuration;
 
-    replicationServerUrls = configuration.getReplicationServer();
-    if (replicationServerUrls == null)
-      replicationServerUrls = new ArrayList<String>();
+    disconnectRemovedReplicationServers(oldRSAddresses);
 
-    queueSize = configuration.getQueueSize();
-    long newPurgeDelay = configuration.getReplicationPurgeDelay();
-    if (newPurgeDelay != purgeDelay)
+    final long newPurgeDelay = config.getReplicationPurgeDelay();
+    if (newPurgeDelay != oldConfig.getReplicationPurgeDelay())
     {
-      purgeDelay = newPurgeDelay;
-      this.changelogDB.setPurgeDelay(purgeDelay * 1000);
+      this.changelogDB.setPurgeDelay(getTrimAge());
     }
 
-    rcvWindow = configuration.getWindowSize();
-    assuredTimeout = configuration.getAssuredTimeout();
-
     // changing the listen port requires to stop the listen thread
     // and restart it.
-    int newPort = configuration.getReplicationPort();
-    if (newPort != replicationPort)
+    if (getReplicationPort() != oldConfig.getReplicationPort())
     {
       stopListen = true;
       try
@@ -859,10 +794,9 @@
         listenThread.join();
         stopListen = false;
 
-        replicationPort = newPort;
         setServerURL();
         listenSocket = new ServerSocket();
-        listenSocket.bind(new InetSocketAddress(replicationPort));
+        listenSocket.bind(new InetSocketAddress(getReplicationPort()));
 
         listenThread = new ReplicationServerListenThread(this);
         listenThread.start();
@@ -877,33 +811,28 @@
       }
     }
 
-    // Update threshold value for status analyzers (stop them if requested
-    // value is 0)
-    if (degradedStatusThreshold != configuration.getDegradedStatusThreshold())
+    // Update threshold value for status analyzers
+    final int newThreshold = config.getDegradedStatusThreshold();
+    if (oldConfig.getDegradedStatusThreshold() != newThreshold)
     {
-      degradedStatusThreshold = configuration.getDegradedStatusThreshold();
       for (ReplicationServerDomain domain : getReplicationServerDomains())
       {
-        domain.updateDegradedStatusThreshold(degradedStatusThreshold);
+        domain.updateDegradedStatusThreshold(newThreshold);
       }
     }
 
-    // Update period value for monitoring publishers (stop them if requested
-    // value is 0)
-    if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
+    // Update period value for monitoring publishers
+    if (oldConfig.getMonitoringPeriod() != config.getMonitoringPeriod())
     {
-      monitoringPublisherPeriod = configuration.getMonitoringPeriod();
       for (ReplicationServerDomain domain : getReplicationServerDomains())
       {
-        domain.updateMonitoringPeriod(monitoringPublisherPeriod);
+        domain.updateMonitoringPeriod(config.getMonitoringPeriod());
       }
     }
 
     // Changed the group id ?
-    byte newGroupId = (byte) configuration.getGroupId();
-    if (newGroupId != groupId)
+    if (config.getGroupId() != oldConfig.getGroupId())
     {
-      groupId = newGroupId;
       // Have a new group id: Disconnect every servers.
       for (ReplicationServerDomain domain : getReplicationServerDomains())
       {
@@ -912,17 +841,16 @@
     }
 
     // Set a potential new weight
-    if (weight != configuration.getWeight())
+    if (oldConfig.getWeight() != config.getWeight())
     {
-      weight = configuration.getWeight();
       // Broadcast the new weight the the whole topology. This will make some
       // DSs reconnect (if needed) to other RSs according to the new weight of
       // this RS.
       broadcastConfigChange();
     }
 
-    final String newDir = configuration.getReplicationDBDirectory();
-    if (newDir != null && !this.changelogDB.getDBDirectoryName().equals(newDir))
+    final String newDir = config.getReplicationDBDirectory();
+    if (newDir != null && !newDir.equals(oldConfig.getReplicationDBDirectory()))
     {
       return new ConfigChangeResult(ResultCode.SUCCESS, true);
     }
@@ -944,13 +872,14 @@
      * First try the set of configured replication servers to see if one of them
      * is this replication server (this should always be the case).
      */
-    for (HostPort rsAddress : toHostPorts(replicationServerUrls))
+    for (HostPort rsAddress : getConfiguredRSAddresses())
     {
       /*
        * No need validate the string format because the admin framework has
        * already done it.
        */
-      if (rsAddress.getPort() == replicationPort && rsAddress.isLocalAddress())
+      if (rsAddress.getPort() == getReplicationPort()
+          && rsAddress.isLocalAddress())
       {
         serverURL = rsAddress.toString();
         return;
@@ -960,7 +889,7 @@
     // Fall-back to the machine hostname.
     final String host = InetAddress.getLocalHost().getHostName();
     // Ensure correct formatting of IPv6 addresses by using a HostPort instance.
-    serverURL = new HostPort(host, replicationPort).toString();
+    serverURL = new HostPort(host, getReplicationPort()).toString();
   }
 
   /**
@@ -1008,7 +937,7 @@
    */
   public int getServerId()
   {
-    return serverId;
+    return this.config.getReplicationServerId();
   }
 
   /**
@@ -1019,15 +948,14 @@
    */
   public int getQueueSize()
   {
-    return queueSize;
+    return this.config.getQueueSize();
   }
 
   /**
    * Creates the backend associated to this replication server.
    * @throws ConfigException
    */
-  private void createBackend()
-  throws ConfigException
+  private void createBackend() throws ConfigException
   {
     try
     {
@@ -1213,11 +1141,14 @@
 
   /**
    * Get the assured mode timeout.
+   * <p>
+   * It is the Timeout (in milliseconds) when waiting for acknowledgments.
+   *
    * @return The assured mode timeout.
    */
   public long getAssuredTimeout()
   {
-    return assuredTimeout;
+    return this.config.getAssuredTimeout();
   }
 
   /**
@@ -1226,43 +1157,53 @@
    */
   public byte getGroupId()
   {
-    return groupId;
+    return (byte) this.config.getGroupId();
   }
 
   /**
-   * Get the threshold value for status analyzer.
-   * @return The threshold value for status analyzer.
+   * Get the degraded status threshold value for status analyzer.
+   * <p>
+   * The degraded status threshold is the number of pending changes for a DS,
+   * considered as threshold value to put the DS in DEGRADED_STATUS. If value is
+   * 0, status analyzer is disabled.
+   *
+   * @return The degraded status threshold value for status analyzer.
    */
   public int getDegradedStatusThreshold()
   {
-    return degradedStatusThreshold;
+    return this.config.getDegradedStatusThreshold();
   }
 
   /**
    * Get the monitoring publisher period value.
+   * <p>
+   * It is the number of milliseconds to wait before sending new monitoring
+   * messages. If value is 0, monitoring publisher is disabled.
+   *
    * @return the monitoring publisher period value.
    */
   public long getMonitoringPublisherPeriod()
   {
-    return monitoringPublisherPeriod;
+    return this.config.getMonitoringPeriod();
   }
 
   /**
    * Compute the list of replication servers that are not any more connected to
    * this Replication Server and stop the corresponding handlers.
    *
-   * @param newRSAddresses
-   *          the list of addresses of the newly configured replication servers.
+   * @param oldRSAddresses
+   *          the old list of configured replication servers addresses.
    */
-  private void disconnectRemovedReplicationServers(Set<HostPort> newRSAddresses)
+  private void disconnectRemovedReplicationServers(Set<HostPort> oldRSAddresses)
   {
     final Collection<HostPort> serversToDisconnect = new ArrayList<HostPort>();
 
-    for (HostPort rsAddress : toHostPorts(replicationServerUrls))
+    final Set<HostPort> newRSAddresses = getConfiguredRSAddresses();
+    for (HostPort oldRSAddress : oldRSAddresses)
     {
-      if (!newRSAddresses.contains(rsAddress))
+      if (!newRSAddresses.contains(oldRSAddress))
       {
-        serversToDisconnect.add(rsAddress);
+        serversToDisconnect.add(oldRSAddress);
       }
     }
 
@@ -1282,7 +1223,7 @@
    */
   public String getMonitorInstanceName()
   {
-    return "Replication Server " + replicationPort + " " + serverId;
+    return "Replication Server " + getReplicationPort() + " " + getServerId();
   }
 
   /**
@@ -1292,7 +1233,7 @@
    */
   public int getReplicationPort()
   {
-    return replicationPort;
+    return config.getReplicationPort();
   }
 
   /**
@@ -1577,16 +1518,27 @@
   }
 
   /**
-   * Gets the weight.
+   * Gets the weight affected to the replication server.
+   * <p>
+   * Each replication server of the topology has a weight. When combined
+   * together, the weights of the replication servers of a same group can be
+   * translated to a percentage that determines the quantity of directory
+   * servers of the topology that should be connected to a replication server.
+   * <p>
+   * For instance imagine a topology with 3 replication servers (with the same
+   * group id) with the following weights: RS1=1, RS2=1, RS3=2. This means that
+   * RS1 should have 25% of the directory servers connected in the topology, RS2
+   * 25%, and RS3 50%. This may be useful if the replication servers of the
+   * topology have a different power and one wants to spread the load between
+   * the replication servers according to their power.
+   *
    * @return the weight
    */
   public int getWeight()
   {
-    return weight;
+    return this.config.getWeight();
   }
 
-
-
   private Collection<ReplicationServerDomain> getReplicationServerDomains()
   {
     synchronized (baseDNs)
@@ -1609,7 +1561,7 @@
   @Override
   public String toString()
   {
-    return "RS(" + serverId + ") on " + serverURL + ", domains="
+    return "RS(" + getServerId() + ") on " + serverURL + ", domains="
         + baseDNs.keySet();
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 5961572..ebe4ada 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -30,7 +30,6 @@
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.*;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -1721,17 +1720,7 @@
    */
   public ServerState getLatestServerState()
   {
-    return toServerState(changelogDB.getDomainNewestCSNs(baseDN).values());
-  }
-
-  private ServerState toServerState(Collection<CSN> csns)
-  {
-    ServerState serverState = new ServerState();
-    for (CSN csn : csns)
-    {
-      serverState.update(csn);
-    }
-    return serverState;
+    return changelogDB.getDomainNewestCSNs(baseDN);
   }
 
   /**
@@ -2646,7 +2635,7 @@
    */
   public ServerState getStartState()
   {
-    return toServerState(changelogDB.getDomainOldestCSNs(baseDN).values());
+    return changelogDB.getDomainOldestCSNs(baseDN);
   }
 
   /**
@@ -2662,12 +2651,11 @@
   {
     CSN eligibleCSN = null;
 
-    for (Entry<Integer, CSN> entry :
-      changelogDB.getDomainNewestCSNs(baseDN).entrySet())
+    final ServerState newestCSNs = changelogDB.getDomainNewestCSNs(baseDN);
+    for (final int serverId : newestCSNs)
     {
       // Consider this producer (DS/db).
-      final int serverId = entry.getKey();
-      final CSN changelogNewestCSN = entry.getValue();
+      final CSN changelogNewestCSN = newestCSNs.getCSN(serverId);
 
       // Should it be considered for eligibility ?
       CSN heartbeatLastCSN =
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
index cab92e2..34f8644 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -26,10 +26,10 @@
  */
 package org.opends.server.replication.server.changelog.api;
 
-import java.util.Map;
 import java.util.Set;
 
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.types.DN;
 
@@ -51,14 +51,6 @@
   // DB control methods
 
   /**
-   * Get the replication server database directory. This is used by tests to do
-   * some cleanup.
-   *
-   * @return the database directory name
-   */
-  String getDBDirectoryName();
-
-  /**
    * Initializes the replication database by reading its previous state and
    * building the relevant ReplicaDBs according to the previous state. This
    * method must be called once before using the ChangelogDB.
@@ -109,7 +101,7 @@
    *
    * @param baseDN
    *          the replication domain baseDN
-   * @return a set of integers holding the serverIds
+   * @return an unmodifiable set of integers holding the serverIds
    */
   Set<Integer> getDomainServerIds(DN baseDN);
 
@@ -128,10 +120,11 @@
    *
    * @param baseDN
    *          the replication domain baseDN
-   * @return a {serverId => oldest CSN} Map. If a replica DB is empty or closed,
-   *         the oldest CSN will be null for that replica.
+   * @return a new ServerState object holding the {serverId => oldest CSN}
+   *         mapping. If a replica DB is empty or closed, the oldest CSN will be
+   *         null for that replica. The caller owns the generated ServerState.
    */
-  Map<Integer, CSN> getDomainOldestCSNs(DN baseDN);
+  ServerState getDomainOldestCSNs(DN baseDN);
 
   /**
    * Returns the newest {@link CSN}s of each serverId for the specified
@@ -139,10 +132,11 @@
    *
    * @param baseDN
    *          the replication domain baseDN
-   * @return a {serverId => newest CSN} Map. If a replica DB is empty or closed,
-   *         the newest CSN will be null for that replica.
+   * @return a new ServerState object holding the {serverId => newest CSN} Map.
+   *         If a replica DB is empty or closed, the newest CSN will be null for
+   *         that replica. The caller owns the generated ServerState.
    */
-  Map<Integer, CSN> getDomainNewestCSNs(DN baseDN);
+  ServerState getDomainNewestCSNs(DN baseDN);
 
   /**
    * Retrieves the latest trim date for the specified replication domain.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index dff2b0b..0ec74e6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,13 +27,17 @@
 package org.opends.server.replication.server.changelog.je;
 
 import java.io.File;
-import java.util.*;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.config.ConfigException;
 import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
@@ -369,7 +373,7 @@
   @Override
   public Set<Integer> getDomainServerIds(DN baseDN)
   {
-    return getDomainMap(baseDN).keySet();
+    return Collections.unmodifiableSet(getDomainMap(baseDN).keySet());
   }
 
   /** {@inheritDoc} */
@@ -418,30 +422,26 @@
 
   /** {@inheritDoc} */
   @Override
-  public Map<Integer, CSN> getDomainOldestCSNs(DN baseDN)
+  public ServerState getDomainOldestCSNs(DN baseDN)
   {
-    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
-    final Map<Integer, CSN> results =
-        new HashMap<Integer, CSN>(domainMap.size());
-    for (DbHandler dbHandler : domainMap.values())
+    final ServerState result = new ServerState();
+    for (DbHandler dbHandler : getDomainMap(baseDN).values())
     {
-      results.put(dbHandler.getServerId(), dbHandler.getOldestCSN());
+      result.update(dbHandler.getOldestCSN());
     }
-    return results;
+    return result;
   }
 
   /** {@inheritDoc} */
   @Override
-  public Map<Integer, CSN> getDomainNewestCSNs(DN baseDN)
+  public ServerState getDomainNewestCSNs(DN baseDN)
   {
-    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
-    final Map<Integer, CSN> results =
-        new HashMap<Integer, CSN>(domainMap.size());
-    for (DbHandler dbHandler : domainMap.values())
+    final ServerState result = new ServerState();
+    for (DbHandler dbHandler : getDomainMap(baseDN).values())
     {
-      results.put(dbHandler.getServerId(), dbHandler.getNewestCSN());
+      result.update(dbHandler.getNewestCSN());
     }
-    return results;
+    return result;
   }
 
   /** {@inheritDoc} */
@@ -588,13 +588,6 @@
 
   /** {@inheritDoc} */
   @Override
-  public String getDBDirectoryName()
-  {
-    return this.dbDirectoryName;
-  }
-
-  /** {@inheritDoc} */
-  @Override
   public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId,
       CSN startAfterCSN)
   {

--
Gitblit v1.10.0