From c63679832734078879a1d45f16de55cab16b3a96 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Wed, 05 Dec 2007 09:31:02 +0000
Subject: [PATCH] Fix for #2655: Renaming ReplicationCache into ReplicationServerDomain
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java | 2
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 118 +++++++++-------
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 14 +-
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java | 58 ++++---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java | 50 +++---
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java | 10
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java | 4
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 86 ++++++-----
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java | 20 +-
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java | 12
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplServerAckMessageList.java | 22 +-
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java | 4
13 files changed, 219 insertions(+), 186 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
index a373b24..1c1b59a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -344,8 +344,8 @@
/**
* Run method for this class.
- * Periodically Flushes the ReplicationCache from memory to the stable storage
- * and trims the old updates.
+ * Periodically Flushes the ReplicationServerDomain cache from memory to the
+ * stable storage and trims the old updates.
*/
public void run()
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplServerAckMessageList.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplServerAckMessageList.java
index 505d803..2beb0a0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplServerAckMessageList.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplServerAckMessageList.java
@@ -35,7 +35,7 @@
public class ReplServerAckMessageList extends AckMessageList
{
private short replicationServerId;
- private ReplicationCache replicationCache;
+ private ReplicationServerDomain replicationServerDomain;
/**
* Creates a new AckMessageList for a given ChangeNumber.
@@ -45,17 +45,17 @@
* original change.
* @param replicationServerId The Identifier of the replication server
* from which the change was received.
- * @param replicationCache The ReplicationCache from which he change
- * was received.
+ * @param replicationServerDomain The ReplicationServerDomain from which he
+ * change was received.
*/
public ReplServerAckMessageList(ChangeNumber changeNumber,
int numExpectedAcks,
short replicationServerId,
- ReplicationCache replicationCache)
+ ReplicationServerDomain replicationServerDomain)
{
super(changeNumber, numExpectedAcks);
this.replicationServerId = replicationServerId;
- this.replicationCache = replicationCache;
+ this.replicationServerDomain = replicationServerDomain;
}
/**
@@ -70,14 +70,14 @@
}
/**
- * Get the replicationCache of the replication server from which we received
- * the change.
- * @return Returns the replicationCache of the replication server from which
- * we received the change .
+ * Get the replicationServerDomain of the replication server from which we
+ * received the change.
+ * @return Returns the replicationServerDomain of the replication server from
+ * which we received the change .
*/
- public ReplicationCache getChangelogCache()
+ public ReplicationServerDomain getChangelogCache()
{
- return replicationCache;
+ return replicationServerDomain;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index 8be1d81..74b3fd5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -347,13 +347,13 @@
//This method only returns the number of actual change entries, the
//domain and any baseDN entries are not counted.
long retNum=0;
- Iterator<ReplicationCache> rcachei = server.getCacheIterator();
+ Iterator<ReplicationServerDomain> rcachei = server.getCacheIterator();
if (rcachei != null)
{
while (rcachei.hasNext())
{
- ReplicationCache rc = rcachei.next();
- retNum += rc.getChangesCount();
+ ReplicationServerDomain rsd = rcachei.next();
+ retNum += rsd.getChangesCount();
}
}
return retNum;
@@ -531,18 +531,18 @@
{
List<DN> includeBranches = exportConfig.getIncludeBranches();
DN baseDN;
- ArrayList<ReplicationCache> exportContainers =
- new ArrayList<ReplicationCache>();
+ ArrayList<ReplicationServerDomain> exportContainers =
+ new ArrayList<ReplicationServerDomain>();
if(server == null) {
Message message = ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED.get();
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,message);
}
- Iterator<ReplicationCache> rcachei = server.getCacheIterator();
- if (rcachei != null)
+ Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator();
+ if (rsdi != null)
{
- while (rcachei.hasNext())
+ while (rsdi.hasNext())
{
- ReplicationCache rc = rcachei.next();
+ ReplicationServerDomain rc = rsdi.next();
// Skip containers that are not covered by the include branches.
baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN);
@@ -598,7 +598,7 @@
// Iterate through the containers.
try
{
- for (ReplicationCache exportContainer : exportContainers)
+ for (ReplicationServerDomain exportContainer : exportContainers)
{
if (exportConfig.isCancelled())
{
@@ -642,7 +642,7 @@
/*
* Exports the root changes of the export, and one entry by domain.
*/
- private void exportRootChanges(List<ReplicationCache> exportContainers,
+ private void exportRootChanges(List<ReplicationServerDomain> exportContainers,
LDIFExportConfig exportConfig, LDIFWriter ldifWriter)
{
Map<AttributeType,List<Attribute>> attributes =
@@ -668,7 +668,7 @@
}
catch (Exception e) {}
- for (ReplicationCache exportContainer : exportContainers)
+ for (ReplicationServerDomain exportContainer : exportContainers)
{
if (exportConfig != null && exportConfig.isCancelled())
{
@@ -725,21 +725,21 @@
}
/**
- * Processes the changes for a given ReplicationCache.
+ * Processes the changes for a given ReplicationServerDomain.
*/
- private void processContainer(ReplicationCache rc,
+ private void processContainer(ReplicationServerDomain rsd,
LDIFExportConfig exportConfig, LDIFWriter ldifWriter,
SearchOperation searchOperation)
{
// Walk through the servers
- for (Short serverId : rc.getServers())
+ for (Short serverId : rsd.getServers())
{
if (exportConfig != null && exportConfig.isCancelled())
{
break;
}
- ReplicationIterator ri = rc.getChangelogIterator(serverId,
+ ReplicationIterator ri = rsd.getChangelogIterator(serverId,
null);
if (ri != null)
@@ -1139,8 +1139,8 @@
// Get the base DN, scope, and filter for the search.
DN searchBaseDN = searchOperation.getBaseDN();
DN baseDN;
- ArrayList<ReplicationCache> searchContainers =
- new ArrayList<ReplicationCache>();
+ ArrayList<ReplicationServerDomain> searchContainers =
+ new ArrayList<ReplicationServerDomain>();
//This check is for GroupManager initialization. It currently doesn't
//come into play because the replication server variable is null in
@@ -1202,25 +1202,25 @@
}
// Walk through all entries and send the ones that match.
- Iterator<ReplicationCache> rcachei = server.getCacheIterator();
- if (rcachei != null)
+ Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator();
+ if (rsdi != null)
{
- while (rcachei.hasNext())
+ while (rsdi.hasNext())
{
- ReplicationCache rc = rcachei.next();
+ ReplicationServerDomain rsd = rsdi.next();
// Skip containers that are not covered by the include branches.
- baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN);
+ baseDN = DN.decode(rsd.getBaseDn().toString() + "," + EXPORT_BASE_DN);
if (searchBaseDN.isDescendantOf(baseDN) ||
searchBaseDN.isAncestorOf(baseDN))
{
- searchContainers.add(rc);
+ searchContainers.add(rsd);
}
}
}
- for (ReplicationCache exportContainer : searchContainers)
+ for (ReplicationServerDomain exportContainer : searchContainers)
{
processContainer(exportContainer, null, null, searchOperation);
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 4f4ea3c..e003d16 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -78,9 +78,10 @@
this.dbenv = dbenv;
this.replicationServer = replicationServer;
- // Get or create the associated Replicationcache and Db.
+ // Get or create the associated ReplicationServerDomain and Db.
db = dbenv.getOrAddDb(serverId, baseDn,
- replicationServer.getReplicationCache(baseDn, true).getGenerationId());
+ replicationServer.getReplicationServerDomain(baseDn,
+ true).getGenerationId());
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index fc7742c..e1b7373 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -184,7 +184,7 @@
" Has read baseDn=" + baseDn
+ " generationId=" + generationId);
- replicationServer.getReplicationCache(baseDn, true).
+ replicationServer.getReplicationServerDomain(baseDn, true).
setGenerationId(generationId, true);
}
}
@@ -259,7 +259,7 @@
DbHandler dbHandler =
new DbHandler(serverId, baseDn, replicationServer, this, 1);
- replicationServer.getReplicationCache(baseDn, true).
+ replicationServer.getReplicationServerDomain(baseDn, true).
setDbHandler(serverId, dbHandler);
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index ac342b1..397a00a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -88,7 +88,8 @@
* and publisher objects for
* connection with LDAP servers and with replication servers
*
- * It is responsible for creating the replication server cache and managing it
+ * It is responsible for creating the replication server replicationServerDomain
+ * and managing it
*/
public class ReplicationServer extends MonitorProvider<MonitorProviderCfg>
implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>,
@@ -108,8 +109,8 @@
/* This table is used to store the list of dn for which we are currently
* handling servers.
*/
- private ConcurrentHashMap<DN, ReplicationCache> baseDNs =
- new ConcurrentHashMap<DN, ReplicationCache>();
+ private ConcurrentHashMap<DN, ReplicationServerDomain> baseDNs =
+ new ConcurrentHashMap<DN, ReplicationServerDomain>();
private String localURL = "null";
private boolean shutdown = false;
@@ -279,9 +280,10 @@
* periodically check that we are connected to all other
* replication servers and if not establish the connection
*/
- for (ReplicationCache replicationCache: baseDNs.values())
+ for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
{
- Set<String> connectedReplServers = replicationCache.getChangelogs();
+ Set<String> connectedReplServers =
+ replicationServerDomain.getChangelogs();
/*
* check that all replication server in the config are in the connected
* Set. If not create the connection
@@ -301,7 +303,7 @@
&& (serverAddress.compareTo(this.localURL) != 0)
&& (!connectedReplServers.contains(serverAddress)))
{
- this.connect(serverURL, replicationCache.getBaseDn());
+ this.connect(serverURL, replicationServerDomain.getBaseDn());
}
}
catch (IOException e)
@@ -396,7 +398,7 @@
this);
/*
- * create replicationServer cache
+ * create replicationServer replicationServerDomain
*/
serverId = changelogId;
@@ -461,28 +463,32 @@
}
/**
- * Get the ReplicationCache associated to the base DN given in parameter.
+ * Get the ReplicationServerDomain associated to the base DN given in
+ * parameter.
*
- * @param baseDn The base Dn for which the ReplicationCache must be returned.
- * @param create Specifies whether to create the ReplicationCache if it does
- * not already exist.
- * @return The ReplicationCache associated to the base DN given in parameter.
+ * @param baseDn The base Dn for which the ReplicationServerDomain must be
+ * returned.
+ * @param create Specifies whether to create the ReplicationServerDomain if
+ * it does not already exist.
+ * @return The ReplicationServerDomain associated to the base DN given in
+ * parameter.
*/
- public ReplicationCache getReplicationCache(DN baseDn, boolean create)
+ public ReplicationServerDomain getReplicationServerDomain(DN baseDn,
+ boolean create)
{
- ReplicationCache replicationCache;
+ ReplicationServerDomain replicationServerDomain;
synchronized (baseDNs)
{
- replicationCache = baseDNs.get(baseDn);
- if ((replicationCache == null) && (create))
+ replicationServerDomain = baseDNs.get(baseDn);
+ if ((replicationServerDomain == null) && (create))
{
- replicationCache = new ReplicationCache(baseDn, this);
- baseDNs.put(baseDn, replicationCache);
+ replicationServerDomain = new ReplicationServerDomain(baseDn, this);
+ baseDNs.put(baseDn, replicationServerDomain);
}
}
- return replicationCache;
+ return replicationServerDomain;
}
/**
@@ -520,9 +526,9 @@
}
// shutdown all the ChangelogCaches
- for (ReplicationCache replicationCache : baseDNs.values())
+ for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
{
- replicationCache.shutdown();
+ replicationServerDomain.shutdown();
}
if (dbEnv != null)
@@ -539,7 +545,8 @@
*
* @param id The serverId for which the dbHandler must be created.
* @param baseDn The DN for which the dbHandler muste be created.
- * @param generationId The generationId for this server and this domain.
+ * @param generationId The generationId for this server and this
+ * replicationServerDomain.
* @return The new DB handler for this ReplicationServer and the serverId and
* DN given in parameter.
* @throws DatabaseException in case of underlying database problem.
@@ -551,7 +558,8 @@
}
/**
- * Clears the generationId for the domain related to the provided baseDn.
+ * Clears the generationId for the replicationServerDomain related to the
+ * provided baseDn.
* @param baseDn The baseDn for which to delete the generationId.
* @throws DatabaseException When it occurs.
*/
@@ -755,7 +763,7 @@
Attribute bases = new Attribute(baseType, "base-dn", baseValues);
attributes.add(bases);
- // Publish to monitor the generation ID by domain
+ // Publish to monitor the generation ID by replicationServerDomain
AttributeType generationIdType=
DirectoryServer.getAttributeType("base-dn-generation-id", true);
LinkedHashSet<AttributeValue> generationIdValues =
@@ -763,9 +771,10 @@
for (DN base : baseDNs.keySet())
{
long generationId=-1;
- ReplicationCache cache = getReplicationCache(base, false);
- if (cache != null)
- generationId = cache.getGenerationId();
+ ReplicationServerDomain replicationServerDomain =
+ getReplicationServerDomain(base, false);
+ if (replicationServerDomain != null)
+ generationId = replicationServerDomain.getGenerationId();
generationIdValues.add(new AttributeValue(generationIdType,
base.toString() + " " + generationId));
}
@@ -777,17 +786,18 @@
}
/**
- * Get the value of generationId for the replication domain
+ * Get the value of generationId for the replication replicationServerDomain
* associated with the provided baseDN.
*
- * @param baseDN The baseDN of the domain.
+ * @param baseDN The baseDN of the replicationServerDomain.
* @return The value of the generationID.
*/
public long getGenerationId(DN baseDN)
{
- ReplicationCache rc = this.getReplicationCache(baseDN, false);
- if (rc!=null)
- return rc.getGenerationId();
+ ReplicationServerDomain rsd =
+ this.getReplicationServerDomain(baseDN, false);
+ if (rsd!=null)
+ return rsd.getGenerationId();
return -1;
}
@@ -962,7 +972,7 @@
" Export starts");
if (backend.getBackendID().equals(backendId))
{
- // Retrieves the backend related to this domain
+ // Retrieves the backend related to this replicationServerDomain
// backend =
ReplicationBackend b =
(ReplicationBackend)DirectoryServer.getBackend(backendId);
@@ -980,11 +990,11 @@
}
/**
- * Returns an iterator on the list of replicationCache.
+ * Returns an iterator on the list of replicationServerDomain.
* Returns null if none.
* @return the iterator.
*/
- public Iterator<ReplicationCache> getCacheIterator()
+ public Iterator<ReplicationServerDomain> getCacheIterator()
{
if (!baseDNs.isEmpty())
return baseDNs.values().iterator();
@@ -997,13 +1007,13 @@
*/
public void clearDb()
{
- Iterator<ReplicationCache> rcachei = getCacheIterator();
+ Iterator<ReplicationServerDomain> rcachei = getCacheIterator();
if (rcachei != null)
{
while (rcachei.hasNext())
{
- ReplicationCache rc = rcachei.next();
- rc.clearDbs();
+ ReplicationServerDomain rsd = rcachei.next();
+ rsd.clearDbs();
}
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationCache.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
similarity index 98%
rename from opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
rename to opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 41df388..7eadd3e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationCache.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -70,7 +70,7 @@
* received to the disk and for trimming them
* Decision to trim can be based on disk space or age of the message
*/
-public class ReplicationCache
+public class ReplicationServerDomain
{
private Object flowControlLock = new Object();
private DN baseDn = null;
@@ -119,13 +119,13 @@
private static final DebugTracer TRACER = getTracer();
/**
- * Creates a new ReplicationCache associated to the DN baseDn.
+ * Creates a new ReplicationServerDomain associated to the DN baseDn.
*
- * @param baseDn The baseDn associated to the ReplicationCache.
+ * @param baseDn The baseDn associated to the ReplicationServerDomain.
* @param replicationServer the ReplicationServer that created this
* replicationServer cache.
*/
- public ReplicationCache(DN baseDn, ReplicationServer replicationServer)
+ public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer)
{
this.baseDn = baseDn;
this.replicationServer = replicationServer;
@@ -531,7 +531,7 @@
}
/**
- * Returns the change count for that ReplicationCache.
+ * Returns the change count for that ReplicationServerDomain.
*
* @return the change count.
*/
@@ -842,7 +842,7 @@
}
/**
- * Shutdown this ReplicationCache.
+ * Shutdown this ReplicationServerDomain.
*/
public void shutdown()
{
@@ -890,7 +890,7 @@
@Override
public String toString()
{
- return "ReplicationCache " + baseDn;
+ return "ReplicationServerDomain " + baseDn;
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index a6b3e74..174eba0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -111,7 +111,7 @@
private MsgQueue lateQueue = new MsgQueue();
private final Map<ChangeNumber, AckMessageList> waitingAcks =
new HashMap<ChangeNumber, AckMessageList>();
- private ReplicationCache replicationCache = null;
+ private ReplicationServerDomain replicationServerDomain = null;
private String serverURL;
private int outCount = 0; // number of update sent to the server
private int inCount = 0; // number of updates received from the server
@@ -227,11 +227,13 @@
// This is an outgoing connection. Publish our start message.
this.baseDn = baseDn;
- // Get or create the ReplicationCache
- replicationCache = replicationServer.getReplicationCache(baseDn, true);
- localGenerationId = replicationCache.getGenerationId();
+ // Get or create the ReplicationServerDomain
+ replicationServerDomain =
+ replicationServer.getReplicationServerDomain(baseDn, true);
+ localGenerationId = replicationServerDomain.getGenerationId();
- ServerState localServerState = replicationCache.getDbServerState();
+ ServerState localServerState =
+ replicationServerDomain.getDbServerState();
ReplServerStartMessage msg =
new ReplServerStartMessage(replicationServerId, replicationServerURL,
baseDn, windowSize, localServerState,
@@ -298,12 +300,13 @@
serverIsLDAPserver = true;
- // Get or Create the ReplicationCache
- replicationCache = replicationServer.getReplicationCache(this.baseDn,
- true);
- localGenerationId = replicationCache.getGenerationId();
+ // Get or Create the ReplicationServerDomain
+ replicationServerDomain =
+ replicationServer.getReplicationServerDomain(this.baseDn, true);
+ localGenerationId = replicationServerDomain.getGenerationId();
- ServerState localServerState = replicationCache.getDbServerState();
+ ServerState localServerState =
+ replicationServerDomain.getDbServerState();
// This an incoming connection. Publish our start message
ReplServerStartMessage myStartMsg =
new ReplServerStartMessage(replicationServerId, replicationServerURL,
@@ -322,9 +325,10 @@
if (debugEnabled())
{
Set<String> ss = this.serverState.toStringSet();
- Set<String> lss = replicationCache.getDbServerState().toStringSet();
- TRACER.debugInfo("In " + replicationCache.getReplicationServer().
- getMonitorInstanceName() +
+ Set<String> lss =
+ replicationServerDomain.getDbServerState().toStringSet();
+ TRACER.debugInfo("In " + replicationServerDomain.
+ getReplicationServer().getMonitorInstanceName() +
", SH received START from LS serverId=" + serverId +
" baseDN=" + this.baseDn +
" generationId=" + generationId +
@@ -376,7 +380,7 @@
}
else
{
- replicationCache.setGenerationId(generationId, false);
+ replicationServerDomain.setGenerationId(generationId, false);
}
}
}
@@ -396,11 +400,11 @@
this.baseDn = receivedMsg.getBaseDn();
if (baseDn == null)
{
- // Get or create the ReplicationCache
- replicationCache = replicationServer.getReplicationCache(this.baseDn,
- true);
- localGenerationId = replicationCache.getGenerationId();
- ServerState serverState = replicationCache.getDbServerState();
+ // Get or create the ReplicationServerDomain
+ replicationServerDomain = replicationServer.
+ getReplicationServerDomain(this.baseDn, true);
+ localGenerationId = replicationServerDomain.getGenerationId();
+ ServerState serverState = replicationServerDomain.getDbServerState();
// The session initiator decides whether to use SSL.
sslEncryption = receivedMsg.getSSLEncryption();
@@ -431,9 +435,10 @@
if (debugEnabled())
{
Set<String> ss = this.serverState.toStringSet();
- Set<String> lss = replicationCache.getDbServerState().toStringSet();
- TRACER.debugInfo("In " + replicationCache.getReplicationServer().
- getMonitorInstanceName() +
+ Set<String> lss =
+ replicationServerDomain.getDbServerState().toStringSet();
+ TRACER.debugInfo("In " + replicationServerDomain.
+ getReplicationServer().getMonitorInstanceName() +
", SH received START from RS serverId=" + serverId +
" baseDN=" + this.baseDn +
" generationId=" + generationId +
@@ -448,7 +453,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
getMonitorInstanceName() + " RS with serverID=" + serverId +
" is connected with the right generation ID");
}
@@ -464,7 +470,7 @@
if (generationId != localGenerationId)
{
// if the 2 RS have different generationID
- if (replicationCache.getGenerationIdSavedStatus())
+ if (replicationServerDomain.getGenerationIdSavedStatus())
{
// it the present RS has received changes regarding its
// gen ID and so won't change without a reset
@@ -497,7 +503,8 @@
// set the gen ID received from the peer RS
// specially if the peer has a non nul state and
// we have a nul state ?
- // replicationCache.setGenerationId(generationId, false);
+ // replicationServerDomain.
+ // setGenerationId(generationId, false);
Message message = NOTE_BAD_GENERATION_ID.get(
this.baseDn.toNormalizedString(),
Short.toString(receivedMsg.getServerId()),
@@ -519,7 +526,7 @@
else
{
// The local RS is not initialized - take the one received
- replicationCache.setGenerationId(generationId, false);
+ replicationServerDomain.setGenerationId(generationId, false);
}
}
}
@@ -529,18 +536,18 @@
return; // we did not recognize the message, ignore it
}
- // Get or create the ReplicationCache
- replicationCache = replicationServer.getReplicationCache(this.baseDn,
- true);
+ // Get or create the ReplicationServerDomain
+ replicationServerDomain = replicationServer.
+ getReplicationServerDomain(this.baseDn,true);
boolean started;
if (serverIsLDAPserver)
{
- started = replicationCache.startServer(this);
+ started = replicationServerDomain.startServer(this);
}
else
{
- started = replicationCache.startReplicationServer(this);
+ started = replicationServerDomain.startReplicationServer(this);
}
if (started)
@@ -548,8 +555,10 @@
// sendWindow MUST be created before starting the writer
sendWindow = new Semaphore(sendWindowSize);
- writer = new ServerWriter(session, serverId, this, replicationCache);
- reader = new ServerReader(session, serverId, this, replicationCache);
+ writer = new ServerWriter(session, serverId,
+ this, replicationServerDomain);
+ reader = new ServerReader(session, serverId,
+ this, replicationServerDomain);
reader.start();
writer.start();
@@ -575,7 +584,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
getMonitorInstanceName() + " RS failed to start locally " +
" the connection from serverID="+serverId);
}
@@ -812,7 +822,7 @@
* the sum of the number of missing changes for every dbHandler.
*/
int totalCount = 0;
- ServerState dbState = replicationCache.getDbServerState();
+ ServerState dbState = replicationServerDomain.getDbServerState();
for (short id : dbState)
{
int max = dbState.getMaxChangeNumber(id).getSeqnum();
@@ -926,7 +936,7 @@
* Ignore updates from a server that is degraded due to
* its inconsistent generationId
*/
- long referenceGenerationId = replicationCache.getGenerationId();
+ long referenceGenerationId = replicationServerDomain.getGenerationId();
if ((referenceGenerationId>0) &&
(referenceGenerationId != generationId))
{
@@ -993,7 +1003,7 @@
saturationCount = 0;
try
{
- replicationCache.checkAllSaturation();
+ replicationServerDomain.checkAllSaturation();
}
catch (IOException e)
{
@@ -1059,11 +1069,11 @@
SortedSet<ReplicationIterator> iteratorSortedSet =
new TreeSet<ReplicationIterator>(comparator);
/* fill the lateQueue */
- for (short serverId : replicationCache.getServers())
+ for (short serverId : replicationServerDomain.getServers())
{
ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
ReplicationIterator iterator =
- replicationCache.getChangelogIterator(serverId, lastCsn);
+ replicationServerDomain.getChangelogIterator(serverId, lastCsn);
if ((iterator != null) && (iterator.getChange() != null))
{
iteratorSortedSet.add(iterator);
@@ -1244,7 +1254,7 @@
}
if (completedFlag)
{
- replicationCache.sendAck(changeNumber, true);
+ replicationServerDomain.sendAck(changeNumber, true);
}
}
@@ -1274,8 +1284,9 @@
}
if (completedFlag)
{
- ReplicationCache replicationCache = ackList.getChangelogCache();
- replicationCache.sendAck(changeNumber, false,
+ ReplicationServerDomain replicationServerDomain =
+ ackList.getChangelogCache();
+ replicationServerDomain.sendAck(changeNumber, false,
ackList.getReplicationServerId());
}
}
@@ -1304,20 +1315,22 @@
* @param update The update that must be added to the list.
* @param ChangelogServerId The identifier of the replicationServer that sent
* the update.
- * @param replicationCache The ReplicationCache from which the change was
- * processed and to which the ack must later be sent.
+ * @param replicationServerDomain The ReplicationServerDomain from which the
+ * change was processed and to which the ack
+ * must later be sent.
* @param nbWaitedAck The number of ack that must be received before
* the update is fully acked.
*/
public static void addWaitingAck(
UpdateMessage update,
- short ChangelogServerId, ReplicationCache replicationCache,
+ short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
int nbWaitedAck)
{
ReplServerAckMessageList ackList =
new ReplServerAckMessageList(update.getChangeNumber(),
nbWaitedAck,
- ChangelogServerId, replicationCache);
+ ChangelogServerId,
+ replicationServerDomain);
synchronized(changelogsWaitingAcks)
{
changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
@@ -1561,7 +1574,7 @@
{
if (flowControl)
{
- if (replicationCache.restartAfterSaturation(this))
+ if (replicationServerDomain.restartAfterSaturation(this))
{
flowControl = false;
}
@@ -1605,11 +1618,11 @@
public void process(RoutableMessage msg)
{
if (debugEnabled())
- TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
" SH for remote server " + this.getMonitorInstanceName() +
" processes received msg=" + msg);
- replicationCache.process(msg, this);
+ replicationServerDomain.process(msg, this);
}
/**
@@ -1623,7 +1636,7 @@
throws IOException
{
if (debugEnabled())
- TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
" SH for remote server " + this.getMonitorInstanceName() +
" sends message=" + info);
@@ -1640,7 +1653,7 @@
public void receiveReplServerInfo(ReplServerInfoMessage infoMsg)
{
if (debugEnabled())
- TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
" SH for remote server " + this.getMonitorInstanceName() +
" sets replServerInfo " + "<" + infoMsg + ">");
@@ -1691,7 +1704,8 @@
public void send(RoutableMessage msg) throws IOException
{
if (debugEnabled())
- TRACER.debugInfo("In " + replicationCache.getReplicationServer().
+ TRACER.debugInfo("In " +
+ replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
" SH for remote server " + this.getMonitorInstanceName() +
" sends message=" + msg);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 69d911d..b1071ff 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -72,7 +72,7 @@
private short serverId;
private ProtocolSession session;
private ServerHandler handler;
- private ReplicationCache replicationCache;
+ private ReplicationServerDomain replicationServerDomain;
/**
* Constructor for the LDAP server reader part of the replicationServer.
@@ -80,16 +80,18 @@
* @param session The ProtocolSession from which to read the data.
* @param serverId The server ID of the server from which we read messages.
* @param handler The server handler for this server reader.
- * @param replicationCache The ReplicationCache for this server reader.
+ * @param replicationServerDomain The ReplicationServerDomain for this server
+ * reader.
*/
public ServerReader(ProtocolSession session, short serverId,
- ServerHandler handler, ReplicationCache replicationCache)
+ ServerHandler handler,
+ ReplicationServerDomain replicationServerDomain)
{
super(handler.toString() + " reader");
this.session = session;
this.serverId = serverId;
this.handler = handler;
- this.replicationCache = replicationCache;
+ this.replicationServerDomain = replicationServerDomain;
}
/**
@@ -100,14 +102,15 @@
if (debugEnabled())
{
TRACER.debugInfo(
- "In RS " + replicationCache.getReplicationServer().
+ "In RS " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
(handler.isReplicationServer()?" RS ":" LS")+
" reader starting for serverId=" + serverId);
}
/*
* wait on input stream
- * grab all incoming messages and publish them to the replicationCache
+ * grab all incoming messages and publish them to the
+ * replicationServerDomain
*/
try
{
@@ -118,7 +121,7 @@
if (debugEnabled())
{
TRACER.debugInfo(
- "In RS " + replicationCache.getReplicationServer().
+ "In RS " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
(handler.isReplicationServer()?" From RS ":" From LS")+
" with serverId=" + serverId + " receives " + msg);
@@ -127,13 +130,14 @@
{
AckMessage ack = (AckMessage) msg;
handler.checkWindow();
- replicationCache.ack(ack, serverId);
+ replicationServerDomain.ack(ack, serverId);
}
else if (msg instanceof UpdateMessage)
{
// Ignore update received from a replica with
// a bad generation ID
- long referenceGenerationId = replicationCache.getGenerationId();
+ long referenceGenerationId =
+ replicationServerDomain.getGenerationId();
if ((referenceGenerationId>0) &&
(referenceGenerationId != handler.getGenerationId()))
{
@@ -145,7 +149,7 @@
{
UpdateMessage update = (UpdateMessage) msg;
handler.decAndCheckWindow();
- replicationCache.put(update, handler);
+ replicationServerDomain.put(update, handler);
}
}
else if (msg instanceof WindowMessage)
@@ -182,7 +186,7 @@
else if (msg instanceof ResetGenerationId)
{
ResetGenerationId genIdMsg = (ResetGenerationId) msg;
- replicationCache.resetGenerationId(this.handler, genIdMsg);
+ replicationServerDomain.resetGenerationId(this.handler, genIdMsg);
}
else if (msg instanceof WindowProbe)
{
@@ -198,19 +202,20 @@
{
if (handler.isReplicationServer())
TRACER.debugInfo(
- "In RS " + replicationCache.getReplicationServer().
+ "In RS " + replicationServerDomain.getReplicationServer().
getServerId() +
" Receiving replServerInfo from " + handler.getServerId() +
- " baseDn=" + replicationCache.getBaseDn() +
+ " baseDn=" + replicationServerDomain.getBaseDn() +
" genId=" + infoMsg.getGenerationId());
}
- if (replicationCache.getGenerationId()<0)
+ if (replicationServerDomain.getGenerationId()<0)
{
// Here is the case where a ReplicationServer receives from
// another ReplicationServer the generationId for a domain
// for which the generation ID has never been set.
- replicationCache.setGenerationId(infoMsg.getGenerationId(), false);
+ replicationServerDomain.
+ setGenerationId(infoMsg.getGenerationId(),false);
}
else
{
@@ -221,19 +226,20 @@
// If we have generationId set locally and no server currently
// connected for that domain in the topology then we may also
// reset the generationId localy.
- replicationCache.mayResetGenerationId();
+ replicationServerDomain.mayResetGenerationId();
}
- if (replicationCache.getGenerationId() != infoMsg.getGenerationId())
+ if (replicationServerDomain.getGenerationId() !=
+ infoMsg.getGenerationId())
{
Message message = NOTE_BAD_GENERATION_ID.get(
- replicationCache.getBaseDn().toNormalizedString(),
+ replicationServerDomain.getBaseDn().toNormalizedString(),
Short.toString(handler.getServerId()),
Long.toString(infoMsg.getGenerationId()),
- Long.toString(replicationCache.getGenerationId()));
+ Long.toString(replicationServerDomain.getGenerationId()));
ErrorMessage errorMsg = new ErrorMessage(
- replicationCache.getReplicationServer().getServerId(),
+ replicationServerDomain.getReplicationServer().getServerId(),
handler.getServerId(),
message);
session.publish(errorMsg);
@@ -260,7 +266,7 @@
*/
if (debugEnabled())
TRACER.debugInfo(
- "In RS " + replicationCache.getReplicationServer().
+ "In RS " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
" reader IO EXCEPTION for serverID=" + serverId
+ stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
@@ -270,7 +276,7 @@
{
if (debugEnabled())
TRACER.debugInfo(
- "In RS <" + replicationCache.getReplicationServer().
+ "In RS <" + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
" reader CNF EXCEPTION serverID=" + serverId
+ stackTraceToSingleLineString(e));
@@ -284,7 +290,7 @@
{
if (debugEnabled())
TRACER.debugInfo(
- "In RS <" + replicationCache.getReplicationServer().
+ "In RS <" + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
" server reader EXCEPTION serverID=" + serverId
+ stackTraceToSingleLineString(e));
@@ -304,7 +310,7 @@
*/
if (debugEnabled())
TRACER.debugInfo(
- "In RS " + replicationCache.getReplicationServer().
+ "In RS " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
" server reader for serverID=" + serverId +
" is closing the session");
@@ -315,11 +321,11 @@
{
// ignore
}
- replicationCache.stopServer(handler);
+ replicationServerDomain.stopServer(handler);
}
if (debugEnabled())
TRACER.debugInfo(
- "In RS " + replicationCache.getReplicationServer().
+ "In RS " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
(handler.isReplicationServer()?" RS":" LDAP") +
" server reader stopped for serverID=" + serverId);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 18d7e8b..fca9f9f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -56,7 +56,7 @@
private ProtocolSession session;
private ServerHandler handler;
- private ReplicationCache replicationCache;
+ private ReplicationServerDomain replicationServerDomain;
private short serverId;
/**
@@ -67,22 +67,24 @@
* @param session the ProtocolSession that will be used to send updates.
* @param serverId the Identifier of the server.
* @param handler handler for which the ServerWriter is created.
- * @param replicationCache The ReplicationCache of this ServerWriter.
+ * @param replicationServerDomain The ReplicationServerDomain of this
+ * ServerWriter.
*/
public ServerWriter(ProtocolSession session, short serverId,
- ServerHandler handler, ReplicationCache replicationCache)
+ ServerHandler handler,
+ ReplicationServerDomain replicationServerDomain)
{
super(handler.toString() + " writer");
this.serverId = serverId;
this.session = session;
this.handler = handler;
- this.replicationCache = replicationCache;
+ this.replicationServerDomain = replicationServerDomain;
}
/**
* Run method for the ServerWriter.
- * Loops waiting for changes from the ReplicationCache and forward them
+ * Loops waiting for changes from the ReplicationServerDomain and forward them
* to the other servers
*/
public void run()
@@ -102,12 +104,12 @@
{
while (true)
{
- UpdateMessage update = replicationCache.take(this.handler);
+ UpdateMessage update = replicationServerDomain.take(this.handler);
if (update == null)
return; /* this connection is closing */
// Ignore update to be sent to a replica with a bad generation ID
- long referenceGenerationId = replicationCache.getGenerationId();
+ long referenceGenerationId = replicationServerDomain.getGenerationId();
if ((referenceGenerationId != handler.getGenerationId())
|| (referenceGenerationId == -1)
|| (handler.getGenerationId() == -1))
@@ -121,7 +123,7 @@
if (debugEnabled())
{
TRACER.debugInfo(
- "In " + replicationCache.getReplicationServer().
+ "In " + replicationServerDomain.getReplicationServer().
getMonitorInstanceName() +
", writer to " + this.handler.getMonitorInstanceName() +
" publishes msg=" + update.toString() +
@@ -168,7 +170,7 @@
{
// Can't do much more : ignore
}
- replicationCache.stopServer(handler);
+ replicationServerDomain.stopServer(handler);
if (debugEnabled())
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java
index d19bc6a..e9eed4e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java
@@ -53,7 +53,7 @@
* ReplicationMessages objects. This class is used by both the
* replicationServer and the replication package.
* </li>
- * <li><A HREF="ReplicationCache.html"><B>ReplicationCache</B></A>
+ * <li><A HREF="ReplicationServerDomain.html"><B>ReplicationServerDomain</B></A>
* implements the multiplexing part of the replication
* server. It contains method for forwarding all the received messages to
* the ServerHandler and to the dbHandler objects.<br>
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
index bc9b980..8a20d02 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -914,14 +914,14 @@
rgenId = replServer1.getGenerationId(baseDn);
assertEquals(genId, rgenId, "DS and replServer are expected to have same genId.");
- assertTrue(!replServer1.getReplicationCache(baseDn, false).
+ assertTrue(!replServer1.getReplicationServerDomain(baseDn, false).
isDegradedDueToGenerationId(server1ID),
"Expecting that DS is not degraded since domain genId has been reset");
- assertTrue(replServer1.getReplicationCache(baseDn, false).
+ assertTrue(replServer1.getReplicationServerDomain(baseDn, false).
isDegradedDueToGenerationId(server2ID),
"Expecting that broker2 is degraded since domain genId has been reset");
- assertTrue(replServer1.getReplicationCache(baseDn, false).
+ assertTrue(replServer1.getReplicationServerDomain(baseDn, false).
isDegradedDueToGenerationId(server3ID),
"Expecting that broker3 is degraded since domain genId has been reset");
@@ -1106,7 +1106,7 @@
}
debugInfo("Expecting that broker2 is not degraded since it has a correct genId");
- assertTrue(!replServer1.getReplicationCache(baseDn, false).
+ assertTrue(!replServer1.getReplicationServerDomain(baseDn, false).
isDegradedDueToGenerationId(server2ID));
debugInfo("Disconnecting DS from replServer1");
@@ -1132,7 +1132,7 @@
}
debugInfo("Expecting that broker3 is degraded since it has a bad genId");
- assertTrue(replServer1.getReplicationCache(baseDn, false).
+ assertTrue(replServer1.getReplicationServerDomain(baseDn, false).
isDegradedDueToGenerationId(server3ID));
int found = testEntriesInDb();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index b10dd33..c76195b 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -1090,25 +1090,25 @@
// Check that the list of connected LDAP servers is correct
// in each replication servers
- List<String> l1 = changelog1.getReplicationCache(baseDn, false).
+ List<String> l1 = changelog1.getReplicationServerDomain(baseDn, false).
getConnectedLDAPservers();
assertEquals(l1.size(), 1);
assertEquals(l1.get(0), String.valueOf(server1ID));
List<String> l2;
- l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
+ l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
assertEquals(l2.size(), 2);
assertTrue(l2.contains(String.valueOf(server2ID)));
assertTrue(l2.contains(String.valueOf(server3ID)));
List<String> l3;
- l3 = changelog3.getReplicationCache(baseDn, false).getConnectedLDAPservers();
+ l3 = changelog3.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
assertEquals(l3.size(), 0);
// Test updates
broker3.stop();
Thread.sleep(1000);
- l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
+ l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
assertEquals(l2.size(), 1);
assertEquals(l2.get(0), String.valueOf(server2ID));
@@ -1116,11 +1116,11 @@
server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
broker2.stop();
Thread.sleep(1000);
- l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers();
+ l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers();
assertEquals(l2.size(), 1);
assertEquals(l2.get(0), String.valueOf(server3ID));
- // TODO Test ReplicationCache.getDestinationServers method.
+ // TODO Test ReplicationServerDomain.getDestinationServers method.
broker2.stop();
broker3.stop();
--
Gitblit v1.10.0