From 71ebb3724c79a7d1218c36f080acd6ee162b9cd2 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 26 Apr 2007 06:31:01 +0000
Subject: [PATCH] Rename the class with names containing synchronization or changelog. Replace most of the changelog occurences with replication server. (issue 1090)
---
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 163 +++++++++++++++++++++++++++++-------------------------
1 files changed, 87 insertions(+), 76 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 0f918a1..386591d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,7 +50,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AckMessage;
-import org.opends.server.replication.protocol.ChangelogStartMessage;
+import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.RoutableMessage;
@@ -69,7 +69,7 @@
/**
* This class defines a server handler, which handles all interaction with a
- * changelog server.
+ * replication server.
*/
public class ServerHandler extends MonitorProvider
{
@@ -79,7 +79,7 @@
private MsgQueue lateQueue = new MsgQueue();
private final Map<ChangeNumber, AckMessageList> waitingAcks =
new HashMap<ChangeNumber, AckMessageList>();
- private ChangelogCache changelogCache = null;
+ private ReplicationCache replicationCache = null;
private String serverURL;
private int outCount = 0; // number of update sent to the server
private int inCount = 0; // number of updates received from the server
@@ -111,7 +111,7 @@
// flow controled and should
// be stopped from sending messsages.
private int saturationCount = 0;
- private short changelogId;
+ private short replicationServerId;
/**
* The time in milliseconds between heartbeats from the replication
@@ -124,8 +124,9 @@
*/
HeartbeatThread heartbeatThread = null;
- private static final Map<ChangeNumber, ChangelogAckMessageList>
- changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
+ private static final Map<ChangeNumber, ReplServerAckMessageList>
+ changelogsWaitingAcks =
+ new HashMap<ChangeNumber, ReplServerAckMessageList>();
/**
* Creates a new server handler instance with the provided socket.
@@ -144,22 +145,24 @@
/**
* Do the exchange of start messages to know if the remote
- * server is an LDAP or changelog server and to exchange serverID.
+ * server is an LDAP or replication server and to exchange serverID.
* Then create the reader and writer thread.
*
* @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
* null if this is an incoming connection.
- * @param changelogId The identifier of the changelog that creates this
- * server handler.
- * @param changelogURL The URL of the changelog that creates this
- * server handler.
+ * @param replicationServerId The identifier of the replicationServer that
+ * creates this server handler.
+ * @param replicationServerURL The URL of the replicationServer that creates
+ * this server handler.
* @param windowSize the window size that this server handler must use.
- * @param changelog the Changelog that created this server handler.
+ * @param replicationServer the ReplicationServer that created this server
+ * handler.
*/
- public void start(DN baseDn, short changelogId, String changelogURL,
- int windowSize, Changelog changelog)
+ public void start(DN baseDn, short replicationServerId,
+ String replicationServerURL,
+ int windowSize, ReplicationServer replicationServer)
{
- this.changelogId = changelogId;
+ this.replicationServerId = replicationServerId;
rcvWindowSizeHalf = windowSize/2;
maxRcvWindow = windowSize;
rcvWindow = windowSize;
@@ -168,10 +171,10 @@
if (baseDn != null)
{
this.baseDn = baseDn;
- changelogCache = changelog.getChangelogCache(baseDn);
- ServerState localServerState = changelogCache.getDbServerState();
- ChangelogStartMessage msg =
- new ChangelogStartMessage(changelogId, changelogURL,
+ replicationCache = replicationServer.getReplicationCache(baseDn);
+ ServerState localServerState = replicationCache.getDbServerState();
+ ReplServerStartMessage msg =
+ new ReplServerStartMessage(replicationServerId, replicationServerURL,
baseDn, windowSize, localServerState);
session.publish(msg);
@@ -225,17 +228,17 @@
serverIsLDAPserver = true;
- changelogCache = changelog.getChangelogCache(this.baseDn);
- ServerState localServerState = changelogCache.getDbServerState();
- ChangelogStartMessage myStartMsg =
- new ChangelogStartMessage(changelogId, changelogURL,
+ replicationCache = replicationServer.getReplicationCache(this.baseDn);
+ ServerState localServerState = replicationCache.getDbServerState();
+ ReplServerStartMessage myStartMsg =
+ new ReplServerStartMessage(replicationServerId, replicationServerURL,
this.baseDn, windowSize, localServerState);
session.publish(myStartMsg);
sendWindowSize = receivedMsg.getWindowSize();
}
- else if (msg instanceof ChangelogStartMessage)
+ else if (msg instanceof ReplServerStartMessage)
{
- ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg;
+ ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
serverId = receivedMsg.getServerId();
serverURL = receivedMsg.getServerURL();
String[] splittedURL = serverURL.split(":");
@@ -244,11 +247,12 @@
this.baseDn = receivedMsg.getBaseDn();
if (baseDn == null)
{
- changelogCache = changelog.getChangelogCache(this.baseDn);
- ServerState serverState = changelogCache.getDbServerState();
- ChangelogStartMessage outMsg =
- new ChangelogStartMessage(changelogId, changelogURL,
- this.baseDn, windowSize, serverState);
+ replicationCache = replicationServer.getReplicationCache(this.baseDn);
+ ServerState serverState = replicationCache.getDbServerState();
+ ReplServerStartMessage outMsg =
+ new ReplServerStartMessage(replicationServerId,
+ replicationServerURL,
+ this.baseDn, windowSize, serverState);
session.publish(outMsg);
}
else
@@ -262,21 +266,21 @@
return; // we did not recognize the message, ignore it
}
- changelogCache = changelog.getChangelogCache(this.baseDn);
+ replicationCache = replicationServer.getReplicationCache(this.baseDn);
if (serverIsLDAPserver)
{
- changelogCache.startServer(this);
+ replicationCache.startServer(this);
}
else
{
- changelogCache.startChangelog(this);
+ replicationCache.startReplicationServer(this);
}
- writer = new ServerWriter(session, serverId, this, changelogCache);
+ writer = new ServerWriter(session, serverId, this, replicationCache);
reader = new ServerReader(session, serverId, this,
- changelogCache);
+ replicationCache);
reader.start();
writer.start();
@@ -486,11 +490,12 @@
}
/**
- * Check if the server associated to this ServerHandler is a changelog server.
+ * Check if the server associated to this ServerHandler is a replication
+ * server.
* @return true if the server associated to this ServerHandler is a
- * changelog server.
+ * replication server.
*/
- public boolean isChangelogServer()
+ public boolean isReplicationServer()
{
return (!serverIsLDAPserver);
}
@@ -520,7 +525,7 @@
* the sum of the number of missing changes for every dbHandler.
*/
int totalCount = 0;
- ServerState dbState = changelogCache.getDbServerState();
+ ServerState dbState = replicationCache.getDbServerState();
for (short id : dbState)
{
int max = dbState.getMaxChangeNumber(id).getSeqnum();
@@ -554,7 +559,7 @@
* Get an approximation of the delay by looking at the age of the odest
* message that has not been sent to this server.
* This is an approximation because the age is calculated using the
- * clock of the servee where the changelog is currently running
+ * clock of the servee where the replicationServer is currently running
* while it should be calculated using the clock of the server
* that originally processed the change.
*
@@ -686,7 +691,7 @@
saturationCount = 0;
try
{
- changelogCache.checkAllSaturation();
+ replicationCache.checkAllSaturation();
}
catch (IOException e)
{
@@ -747,16 +752,16 @@
* load this change on the delayList
*
*/
- ChangelogIteratorComparator comparator =
- new ChangelogIteratorComparator();
- SortedSet<ChangelogIterator> iteratorSortedSet =
- new TreeSet<ChangelogIterator>(comparator);
+ ReplicationIteratorComparator comparator =
+ new ReplicationIteratorComparator();
+ SortedSet<ReplicationIterator> iteratorSortedSet =
+ new TreeSet<ReplicationIterator>(comparator);
/* fill the lateQueue */
- for (short serverId : changelogCache.getServers())
+ for (short serverId : replicationCache.getServers())
{
ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
- ChangelogIterator iterator =
- changelogCache.getChangelogIterator(serverId, lastCsn);
+ ReplicationIterator iterator =
+ replicationCache.getChangelogIterator(serverId, lastCsn);
if ((iterator != null) && (iterator.getChange() != null))
{
iteratorSortedSet.add(iterator);
@@ -764,7 +769,7 @@
}
while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
{
- ChangelogIterator iterator = iteratorSortedSet.first();
+ ReplicationIterator iterator = iteratorSortedSet.first();
iteratorSortedSet.remove(iterator);
lateQueue.add(iterator.getChange());
if (iterator.next())
@@ -772,7 +777,7 @@
else
iterator.releaseCursor();
}
- for (ChangelogIterator iterator : iteratorSortedSet)
+ for (ReplicationIterator iterator : iteratorSortedSet)
{
iterator.releaseCursor();
}
@@ -928,13 +933,13 @@
}
if (completedFlag)
{
- changelogCache.sendAck(changeNumber, true);
+ replicationCache.sendAck(changeNumber, true);
}
}
/**
* Process reception of an for an update that was received from a
- * Changelog Server.
+ * ReplicationServer.
*
* @param message the ack message that was received.
* @param ackingServerId The id of the server that acked the change.
@@ -942,7 +947,7 @@
public static void ackChangelog(AckMessage message, short ackingServerId)
{
ChangeNumber changeNumber = message.getChangeNumber();
- ChangelogAckMessageList ackList;
+ ReplServerAckMessageList ackList;
boolean completedFlag;
synchronized (changelogsWaitingAcks)
{
@@ -958,9 +963,9 @@
}
if (completedFlag)
{
- ChangelogCache changelogCache = ackList.getChangelogCache();
- changelogCache.sendAck(changeNumber, false,
- ackList.getChangelogServerId());
+ ReplicationCache replicationCache = ackList.getChangelogCache();
+ replicationCache.sendAck(changeNumber, false,
+ ackList.getReplicationServerId());
}
}
@@ -982,24 +987,26 @@
}
/**
- * Add an update to the list of update received from a changelog server and
+ * Add an update to the list of update received from a replicationServer and
* waiting for acks.
*
* @param update The update that must be added to the list.
- * @param ChangelogServerId The identifier of the changelog that sent the
- * update.
- * @param changelogCache The ChangelogCache from which the change was
- * processed and to which the ack must later be sent.
+ * @param ChangelogServerId The identifier of the replicationServer that sent
+ * the update.
+ * @param replicationCache The ReplicationCache from which the change was
+ * processed and to which the ack must later be sent.
* @param nbWaitedAck The number of ack that must be received before
* the update is fully acked.
*/
- public static void addWaitingAck(UpdateMessage update,
- short ChangelogServerId, ChangelogCache changelogCache, int nbWaitedAck)
+ public static void addWaitingAck(
+ UpdateMessage update,
+ short ChangelogServerId, ReplicationCache replicationCache,
+ int nbWaitedAck)
{
- ChangelogAckMessageList ackList =
- new ChangelogAckMessageList(update.getChangeNumber(),
+ ReplServerAckMessageList ackList =
+ new ReplServerAckMessageList(update.getChangeNumber(),
nbWaitedAck,
- ChangelogServerId, changelogCache);
+ ChangelogServerId, replicationCache);
synchronized(changelogsWaitingAcks)
{
changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
@@ -1031,7 +1038,7 @@
* Check type of server handled.
*
* @return true if the handled server is an LDAP server.
- * false if the handled server is a changelog server
+ * false if the handled server is a replicationServer
*/
public boolean isLDAPserver()
{
@@ -1074,7 +1081,7 @@
if (serverIsLDAPserver)
return "LDAP Server " + str;
else
- return "Changelog Server " + str;
+ return "Replication Server " + str;
}
/**
@@ -1122,7 +1129,7 @@
if (serverIsLDAPserver)
attributes.add(new Attribute("LDAP-Server", serverURL));
else
- attributes.add(new Attribute("Changelog-Server", serverURL));
+ attributes.add(new Attribute("ReplicationServer-Server", serverURL));
attributes.add(new Attribute("server-id",
String.valueOf(serverId)));
attributes.add(new Attribute("base-dn",
@@ -1199,7 +1206,7 @@
if (serverIsLDAPserver)
localString = "Directory Server ";
else
- localString = "Changelog Server ";
+ localString = "Replication Server ";
localString += serverId + " " + serverURL + " " + baseDn;
@@ -1233,7 +1240,7 @@
{
if (flowControl)
{
- if (changelogCache.restartAfterSaturation(this))
+ if (replicationCache.restartAfterSaturation(this))
{
flowControl = false;
}
@@ -1277,13 +1284,15 @@
public void process(RoutableMessage msg)
{
if (debugEnabled())
- debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+ debugInfo("SH(" + replicationServerId + ") forwards " +
+ msg + " to " + serverId);
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR,
- "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
+ "SH(" + replicationServerId + ") receives " + msg +
+ " from " + serverId, 1);
- changelogCache.process(msg, this);
+ replicationCache.process(msg, this);
}
/**
@@ -1296,11 +1305,13 @@
public void send(RoutableMessage msg) throws IOException
{
if (debugEnabled())
- debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+ debugInfo("SH(" + replicationServerId + ") forwards " +
+ msg + " to " + serverId);
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR,
- "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
+ "SH(" + replicationServerId + ") forwards " +
+ msg + " to " + serverId, 1);
session.publish(msg);
}
--
Gitblit v1.10.0