From d04fb0f282e0fd9a4bc80d3f9d5ee15506a3b83b Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk
---
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 313 +++++++++++++++-------------------------------------
1 files changed, 90 insertions(+), 223 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 238a243..df7211a 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -40,7 +40,6 @@
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -67,7 +66,6 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
-import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
@@ -95,16 +93,12 @@
private ProtocolSession session;
private final MsgQueue msgQueue = new MsgQueue();
private MsgQueue lateQueue = new MsgQueue();
- private final Map<ChangeNumber, AckMessageList> waitingAcks =
- new HashMap<ChangeNumber, AckMessageList>();
private ReplicationServerDomain replicationServerDomain = null;
private String serverURL;
private int outCount = 0; // number of update sent to the server
private int inCount = 0; // number of updates received from the server
- private int inAckCount = 0;
- private int outAckCount = 0;
private int maxReceiveQueue = 0;
private int maxSendQueue = 0;
private int maxReceiveDelay = 0;
@@ -120,7 +114,7 @@
private ServerState serverState;
private boolean activeWriter = true;
private ServerWriter writer = null;
- private DN baseDn = null;
+ private String baseDn = null;
private int rcvWindow;
private int rcvWindowSizeHalf;
private int maxRcvWindow;
@@ -143,7 +137,7 @@
* Properties filled only if remote server is a DS
*/
- // Status of this DS
+ // Status of this DS (only used if this server handler represents a DS)
private ServerStatus status = ServerStatus.INVALID_STATUS;
// Referrals URLs this DS is exporting
private List<String> refUrls = new ArrayList<String>();
@@ -182,9 +176,6 @@
* Set when ServerHandler is stopping.
*/
private AtomicBoolean shuttingDown = new AtomicBoolean(false);
- private static final Map<ChangeNumber, ReplServerAckMessageList>
- changelogsWaitingAcks =
- new HashMap<ChangeNumber, ReplServerAckMessageList>();
/**
* Creates a new server handler instance with the provided socket.
@@ -266,7 +257,7 @@
* @param replicationServer the ReplicationServer that created this server
* handler.
*/
- public void start(DN baseDn, short replicationServerId,
+ public void start(String baseDn, short replicationServerId,
String replicationServerURL,
int windowSize, boolean sslEncryption,
ReplicationServer replicationServer)
@@ -571,7 +562,7 @@
{
// Timeout
Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get(
- this.baseDn.toNormalizedString(),
+ this.baseDn,
Short.toString(serverId),
Short.toString(replicationServer.getServerId()));
closeSession(message);
@@ -732,7 +723,7 @@
// gen ID and so won't change without a reset
// then we are just degrading the peer.
Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
- this.baseDn.toNormalizedString(),
+ this.baseDn,
Short.toString(serverId),
Long.toString(generationId),
Long.toString(localGenerationId));
@@ -759,7 +750,7 @@
// replicationServerDomain.
// setGenerationId(generationId, false);
Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
- this.baseDn.toNormalizedString(),
+ this.baseDn,
Short.toString(serverId),
Long.toString(generationId),
Long.toString(localGenerationId));
@@ -859,7 +850,7 @@
if (generationId != localGenerationId)
{
Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
- this.baseDn.toNormalizedString(),
+ this.baseDn,
Short.toString(serverId),
Long.toString(generationId),
Long.toString(localGenerationId));
@@ -873,7 +864,7 @@
// If the LDAP server has already sent changes
// it is not expected to connect to an empty RS
Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
- this.baseDn.toNormalizedString(),
+ this.baseDn,
Short.toString(serverId),
Long.toString(generationId),
Long.toString(localGenerationId));
@@ -957,7 +948,7 @@
// gen ID and so won't change without a reset
// then we are just degrading the peer.
Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
- this.baseDn.toNormalizedString(),
+ this.baseDn,
Short.toString(serverId),
Long.toString(generationId),
Long.toString(localGenerationId));
@@ -984,7 +975,7 @@
// replicationServerDomain.
// setGenerationId(generationId, false);
Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
- this.baseDn.toNormalizedString(),
+ this.baseDn,
Short.toString(serverId),
Long.toString(generationId),
Long.toString(localGenerationId));
@@ -1195,26 +1186,6 @@
}
/**
- * Get the number of Ack received from the server managed by this handler.
- *
- * @return Returns the inAckCount.
- */
- public int getInAckCount()
- {
- return inAckCount;
- }
-
- /**
- * Get the number of Ack sent to the server managed by this handler.
- *
- * @return Returns the outAckCount.
- */
- public int getOutAckCount()
- {
- return outAckCount;
- }
-
- /**
* Check is this server is saturated (this server has already been
* sent a bunch of updates and has not processed them so they are staying
* in the message queue for this server an the size of the queue
@@ -1763,145 +1734,14 @@
}
/**
- * Send the ack to the server that did the original modification.
+ * Sends an ack message to the server represented by this object.
*
- * @param changeNumber The ChangeNumber of the update that is acked.
+ * @param ack The ack message to be sent.
* @throws IOException In case of Exception thrown sending the ack.
*/
- public void sendAck(ChangeNumber changeNumber) throws IOException
+ public void sendAck(AckMsg ack) throws IOException
{
- AckMsg ack = new AckMsg(changeNumber);
session.publish(ack);
- outAckCount++;
- }
-
- /**
- * Do the work when an ack message has been received from another server.
- *
- * @param message The ack message that was received.
- * @param ackingServerId The id of the server that acked the change.
- */
- public void ack(AckMsg message, short ackingServerId)
- {
- ChangeNumber changeNumber = message.getChangeNumber();
- AckMessageList ackList;
- boolean completedFlag;
- synchronized (waitingAcks)
- {
- ackList = waitingAcks.get(changeNumber);
- if (ackList == null)
- return;
- ackList.addAck(ackingServerId);
- completedFlag = ackList.completed();
- if (completedFlag)
- {
- waitingAcks.remove(changeNumber);
- }
- }
- if (completedFlag)
- {
- replicationServerDomain.sendAck(changeNumber, true);
- }
- }
-
- /**
- * Process reception of an for an update that was received from a
- * ReplicationServer.
- *
- * @param message the ack message that was received.
- * @param ackingServerId The id of the server that acked the change.
- */
- public static void ackChangelog(AckMsg message, short ackingServerId)
- {
- ChangeNumber changeNumber = message.getChangeNumber();
- ReplServerAckMessageList ackList;
- boolean completedFlag;
- synchronized (changelogsWaitingAcks)
- {
- ackList = changelogsWaitingAcks.get(changeNumber);
- if (ackList == null)
- return;
- ackList.addAck(ackingServerId);
- completedFlag = ackList.completed();
- if (completedFlag)
- {
- changelogsWaitingAcks.remove(changeNumber);
- }
- }
- if (completedFlag)
- {
- ReplicationServerDomain replicationServerDomain =
- ackList.getChangelogCache();
- replicationServerDomain.sendAck(changeNumber, false,
- ackList.getReplicationServerId());
- }
- }
-
- /**
- * Add an update to the list of update waiting for acks.
- *
- * @param update the update that must be added to the list
- * @param nbWaitedAck The number of ack that must be received before
- * the update is fully acked.
- */
- public void addWaitingAck(UpdateMsg update, int nbWaitedAck)
- {
- AckMessageList ackList = new AckMessageList(update.getChangeNumber(),
- nbWaitedAck);
- synchronized (waitingAcks)
- {
- waitingAcks.put(update.getChangeNumber(), ackList);
- }
- }
-
- /**
- * Add an update to the list of update received from a replicationServer and
- * waiting for acks.
- *
- * @param update The update that must be added to the list.
- * @param ChangelogServerId The identifier of the replicationServer that sent
- * the update.
- * @param replicationServerDomain The ReplicationServerDomain from which the
- * change was processed and to which the ack
- * must later be sent.
- * @param nbWaitedAck The number of ack that must be received before
- * the update is fully acked.
- */
- public static void addWaitingAck(
- UpdateMsg update,
- short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
- int nbWaitedAck)
- {
- ReplServerAckMessageList ackList =
- new ReplServerAckMessageList(update.getChangeNumber(),
- nbWaitedAck,
- ChangelogServerId,
- replicationServerDomain);
- synchronized (changelogsWaitingAcks)
- {
- changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
- }
- }
-
- /**
- * Get the size of the list of update waiting for acks.
- *
- * @return the size of the list of update waiting for acks.
- */
- public int getWaitingAckSize()
- {
- synchronized (waitingAcks)
- {
- return waitingAcks.size();
- }
- }
-
- /**
- * Increment the count of Acks received from this server.
- */
- public void incrementInAckCount()
- {
- inAckCount++;
}
/**
@@ -2001,13 +1841,13 @@
.valueOf(serverId)));
attributes.add(Attributes.create("base-dn", baseDn.toString()));
- if (serverIsLDAPserver)
+ try
{
MonitorData md;
- try
- {
- md = replicationServerDomain.getMonitorData();
+ md = replicationServerDomain.getMonitorData();
+ if (serverIsLDAPserver)
+ {
// Oldest missing update
Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
@@ -2017,7 +1857,7 @@
"approx-older-change-not-synchronized", date.toString()));
attributes.add(Attributes.create(
"approx-older-change-not-synchronized-millis", String
- .valueOf(approxFirstMissingDate)));
+ .valueOf(approxFirstMissingDate)));
}
// Missing changes
@@ -2030,14 +1870,21 @@
attributes.add(Attributes.create("approximate-delay", String
.valueOf(delay)));
}
- catch (Exception e)
+ else
{
- // TODO: improve the log
- // We failed retrieving the remote monitor data.
- attributes.add(Attributes.create("error",
- stackTraceToSingleLineString(e)));
+ // Missing changes
+ long missingChanges = md.getMissingChangesRS(serverId);
+ attributes.add(Attributes.create("missing-changes", String
+ .valueOf(missingChanges)));
}
}
+ catch (Exception e)
+ {
+ // TODO: improve the log
+ // We failed retrieving the remote monitor data.
+ attributes.add(Attributes.create("error",
+ stackTraceToSingleLineString(e)));
+ }
attributes.add(
Attributes.create("queue-size", String.valueOf(msgQueue.count())));
@@ -2056,14 +1903,6 @@
attributes.add(Attributes.create("update-received", String
.valueOf(getInCount())));
- // Deprecated as long as assured is not exposed
- attributes.add(Attributes.create("update-waiting-acks", String
- .valueOf(getWaitingAckSize())));
- attributes.add(Attributes.create("ack-sent", String
- .valueOf(getOutAckCount())));
- attributes.add(Attributes.create("ack-received", String
- .valueOf(getInAckCount())));
-
// Window stats
attributes.add(Attributes.create("max-send-window", String
.valueOf(sendWindowSize)));
@@ -2125,11 +1964,14 @@
/*
* Stop the remote LSHandler
*/
- for (LightweightServerHandler lsh : directoryServers.values())
+ synchronized (directoryServers)
{
- lsh.stopHandler();
+ for (LightweightServerHandler lsh : directoryServers.values())
+ {
+ lsh.stopHandler();
+ }
+ directoryServers.clear();
}
- directoryServers.clear();
/*
* Stop the heartbeat thread.
@@ -2217,7 +2059,6 @@
{
WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
session.publish(msg);
- outAckCount++;
rcvWindow += rcvWindowSizeHalf;
}
}
@@ -2302,23 +2143,26 @@
*/
List<DSInfo> dsInfos = topoMsg.getDsList();
- // Removes the existing structures
- for (LightweightServerHandler lsh : directoryServers.values())
+ synchronized (directoryServers)
{
- lsh.stopHandler();
- }
- directoryServers.clear();
+ // Removes the existing structures
+ for (LightweightServerHandler lsh : directoryServers.values())
+ {
+ lsh.stopHandler();
+ }
+ directoryServers.clear();
- // Creates the new structure according to the message received.
- for (DSInfo dsInfo : dsInfos)
- {
- LightweightServerHandler lsh = new LightweightServerHandler(this,
- serverId, dsInfo.getDsId(), dsInfo.getGenerationId(),
- dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
- dsInfo.isAssured(), dsInfo.getAssuredMode(),
- dsInfo.getSafeDataLevel());
- lsh.startHandler();
- directoryServers.put(lsh.getServerId(), lsh);
+ // Creates the new structure according to the message received.
+ for (DSInfo dsInfo : dsInfos)
+ {
+ LightweightServerHandler lsh = new LightweightServerHandler(this,
+ serverId, dsInfo.getDsId(), dsInfo.getGenerationId(),
+ dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
+ dsInfo.isAssured(), dsInfo.getAssuredMode(),
+ dsInfo.getSafeDataLevel());
+ lsh.startHandler();
+ directoryServers.put(lsh.getServerId(), lsh);
+ }
}
}
@@ -2445,7 +2289,10 @@
*/
public boolean hasRemoteLDAPServers()
{
- return !directoryServers.isEmpty();
+ synchronized (directoryServers)
+ {
+ return !directoryServers.isEmpty();
+ }
}
/**
@@ -2496,7 +2343,6 @@
// TODO also log an error message.
WindowMsg msg = new WindowMsg(rcvWindow);
session.publish(msg);
- outAckCount++;
} else
{
// Both the LDAP server and the replication server believes that the
@@ -2556,17 +2402,10 @@
*/
public Set<Short> getConnectedDirectoryServerIds()
{
- return directoryServers.keySet();
- }
-
- /**
- * Get the map of connected DSs
- * (to the RS represented by this server handler).
- * @return The map of connected DSs
- */
- public Map<Short, LightweightServerHandler> getConnectedDSs()
- {
- return directoryServers;
+ synchronized (directoryServers)
+ {
+ return directoryServers.keySet();
+ }
}
/**
@@ -2716,4 +2555,32 @@
{
return protocolVersion;
}
+
+ /**
+ * Add the DSinfos of the connected Directory Servers
+ * to the List of DSInfo provided as a parameter.
+ *
+ * @param dsInfos The List of DSInfo that should be updated
+ * with the DSInfo for the directoryServers
+ * connected to this ServerHandler.
+ */
+ public void addDSInfos(List<DSInfo> dsInfos)
+ {
+ synchronized (directoryServers)
+ {
+ for (LightweightServerHandler ls : directoryServers.values())
+ {
+ dsInfos.add(ls.toDSInfo());
+ }
+ }
+ }
+
+ /**
+ * Gets the group id of the server represented by this object.
+ * @return The group id of the server represented by this object.
+ */
+ public byte getGroupId()
+ {
+ return groupId;
+ }
}
--
Gitblit v1.10.0