From 0a9135e3444bbefde6188f456b9c9772a816096d Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 18 Sep 2013 15:17:14 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 166 +++++++++++++++++++++---------------------------------
1 files changed, 65 insertions(+), 101 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 2195c79..e1a7a00 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -45,6 +45,7 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
+import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
@@ -79,7 +80,7 @@
private volatile String replicationServer = NO_CONNECTED_SERVER;
private volatile Session session = null;
private final ServerState state;
- private final String baseDn;
+ private final DN baseDN;
private final int serverId;
private Semaphore sendWindow;
private int maxSendWindow;
@@ -192,9 +193,9 @@
* @param replicationDomain The replication domain that is creating us.
* @param state The ServerState that should be used by this broker
* when negotiating the session with the replicationServer.
- * @param baseDn The base DN that should be used by this broker
+ * @param baseDN The base DN that should be used by this broker
* when negotiating the session with the replicationServer.
- * @param serverID2 The server ID that should be used by this broker
+ * @param serverId The server ID that should be used by this broker
* when negotiating the session with the replicationServer.
* @param window The size of the send and receive window to use.
* @param generationId The generationId for the server associated to the
@@ -208,14 +209,14 @@
* or zero if no CSN heartbeat should be sent.
*/
public ReplicationBroker(ReplicationDomain replicationDomain,
- ServerState state, String baseDn, int serverID2, int window,
+ ServerState state, DN baseDN, int serverId, int window,
long generationId, long heartbeatInterval,
ReplSessionSecurity replSessionSecurity, byte groupId,
long changeTimeHeartbeatInterval)
{
this.domain = replicationDomain;
- this.baseDn = baseDn;
- this.serverId = serverID2;
+ this.baseDN = baseDN;
+ this.serverId = serverId;
this.state = state;
this.protocolVersion = ProtocolVersion.getCurrentVersion();
this.replSessionSecurity = replSessionSecurity;
@@ -245,7 +246,7 @@
{
shutdown = false;
this.rcvWindow = this.maxRcvWindow;
- this.connect();
+ connect();
}
}
@@ -269,7 +270,7 @@
}
this.rcvWindow = this.maxRcvWindow;
- this.connect();
+ connect();
}
}
@@ -779,8 +780,8 @@
private void connect()
{
- if (this.baseDn.compareToIgnoreCase(
- ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT) == 0)
+ if (this.baseDN.toNormalizedString().equalsIgnoreCase(
+ ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
{
connectAsECL();
} else
@@ -964,14 +965,14 @@
|| (electedRsInfo.getGenerationId() == -1))
{
Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
- .get(serverId, rsServerId, baseDn,
+ .get(serverId, rsServerId, baseDN.toNormalizedString(),
session.getReadableRemoteAddress(),
getGenerationID());
logError(message);
} else
{
Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
- .get(serverId, rsServerId, baseDn,
+ .get(serverId, rsServerId, baseDN.toNormalizedString(),
session.getReadableRemoteAddress(),
getGenerationID(),
electedRsInfo.getGenerationId());
@@ -995,15 +996,14 @@
{
Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
serverId,
- baseDn,
- collectionToString(replicationServerInfos.keySet(),
- ", "));
+ baseDN.toNormalizedString(),
+ collectionToString(replicationServerInfos.keySet(), ", "));
logError(message);
}
else
{
Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
- serverId, baseDn);
+ serverId, baseDN.toNormalizedString());
logError(message);
}
}
@@ -1082,11 +1082,10 @@
warn user and start heartbeat monitor to recover when a server
with the right group id shows up.
*/
- Message message =
- WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(Byte
- .toString(groupId), Integer.toString(rsServerId), rsInfo
- .getServerURL(), Byte.toString(getRsGroupId()), baseDn, Integer
- .toString(serverId));
+ Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
+ Byte.toString(groupId), Integer.toString(rsServerId),
+ rsInfo.getServerURL(), Byte.toString(getRsGroupId()),
+ baseDN.toNormalizedString(), Integer.toString(serverId));
logError(message);
}
startRSHeartBeatMonitoring();
@@ -1098,10 +1097,9 @@
}
catch (Exception e)
{
- Message message =
- ERR_COMPUTING_FAKE_OPS.get(baseDn, rsInfo.getServerURL(), e
- .getLocalizedMessage()
- + stackTraceToSingleLineString(e));
+ Message message = ERR_COMPUTING_FAKE_OPS.get(
+ baseDN.toNormalizedString(), rsInfo.getServerURL(),
+ e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
logError(message);
}
finally
@@ -1149,7 +1147,7 @@
if (debugEnabled())
{
- TRACER.debugInfo("RB for dn " + baseDn + " and with server id "
+ TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
+ serverId + " computed " + nChanges + " changes late.");
}
@@ -1211,6 +1209,8 @@
String port = server.substring(separator + 1);
String hostname = server.substring(0, separator);
+ final String baseDn = this.baseDN.toNormalizedString();
+
Session localSession = null;
Socket socket = null;
boolean hasConnected = false;
@@ -1218,9 +1218,7 @@
try
{
- /*
- * Open a socket connection to the next candidate.
- */
+ // Open a socket connection to the next candidate.
int intPort = Integer.parseInt(port);
InetSocketAddress serverAddr = new InetSocketAddress(
InetAddress.getByName(hostname), intPort);
@@ -1239,15 +1237,15 @@
StartMsg serverStartMsg;
if (!isECL)
{
- serverStartMsg = new ServerStartMsg(serverId, url, baseDn,
- maxRcvWindow, heartbeatInterval, state,
- this.getGenerationID(), isSslEncryption, groupId);
+ serverStartMsg = new ServerStartMsg(serverId, url,
+ baseDN.toNormalizedString(), maxRcvWindow, heartbeatInterval, state,
+ getGenerationID(), isSslEncryption, groupId);
}
else
{
serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
maxRcvWindow, heartbeatInterval, state,
- this.getGenerationID(), isSslEncryption, groupId);
+ getGenerationID(), isSslEncryption, groupId);
}
localSession.publish(serverStartMsg);
@@ -1256,7 +1254,7 @@
ReplicationMsg msg = localSession.receive();
if (debugEnabled())
{
- TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
+ TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
+ serverStartMsg + "\nAND RECEIVED:\n" + msg);
}
@@ -1266,10 +1264,9 @@
// Sanity check
String repDn = replServerInfo.getBaseDn();
- if (!this.baseDn.equals(repDn))
+ if (!baseDn.equals(repDn))
{
- errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn,
- this.baseDn);
+ errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(repDn, baseDn);
return null;
}
@@ -1324,22 +1321,8 @@
{
if (!hasConnected || !keepConnection)
{
- if (localSession != null)
- {
- localSession.close();
- }
-
- if (socket != null)
- {
- try
- {
- socket.close();
- }
- catch (IOException e)
- {
- // Ignore.
- }
- }
+ close(localSession);
+ close(socket);
}
if (!hasConnected && errorMessage != null)
@@ -1372,13 +1355,9 @@
* reply message from the replication server.
*
* @param server Server we are connecting with.
- * @return The ReplServerStartMsg the server replied. Null if could not
- * get an answer.
*/
- private TopologyMsg performECLPhaseTwoHandshake(String server)
+ private void performECLPhaseTwoHandshake(String server)
{
- TopologyMsg topologyMsg = null;
-
try
{
// Send our Start Session
@@ -1386,32 +1365,24 @@
startECLSessionMsg.setOperationId("-1");
session.publish(startECLSessionMsg);
- /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ?
- * Read the TopologyMsg that should come back.
- topologyMsg = (TopologyMsg) session.receive();
- */
+ // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
if (debugEnabled())
{
- TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
+ TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
+ startECLSessionMsg);
}
// Alright set the timeout to the desired value
session.setSoTimeout(timeout);
connected = true;
-
} catch (Exception e)
{
Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
- server, baseDn, stackTraceToSingleLineString(e));
+ server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
logError(message);
setSession(null);
-
- // Be sure to return null.
- topologyMsg = null;
}
- return topologyMsg;
}
/**
@@ -1464,7 +1435,7 @@
if (debugEnabled())
{
- TRACER.debugInfo("In RB for " + baseDn + "\nRB HANDSHAKE SENT:\n"
+ TRACER.debugInfo("In RB for " + baseDN + "\nRB HANDSHAKE SENT:\n"
+ startSessionMsg + "\nAND RECEIVED:\n" + topologyMsg);
}
@@ -1474,7 +1445,7 @@
} catch (Exception e)
{
Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
- server, baseDn, stackTraceToSingleLineString(e));
+ server, baseDN.toNormalizedString(), stackTraceToSingleLineString(e));
logError(message);
setSession(null);
@@ -2118,8 +2089,8 @@
// Start a heartbeat monitor thread.
if (heartbeatInterval > 0)
{
- heartbeatMonitor = new HeartbeatMonitor(getServerId(),
- getRsServerId(), baseDn, session, heartbeatInterval);
+ heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(),
+ baseDN.toNormalizedString(), session, heartbeatInterval);
heartbeatMonitor.start();
}
}
@@ -2185,8 +2156,8 @@
catch (Exception e)
{
MessageBuilder mb = new MessageBuilder();
- mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(baseDn,
- e.getLocalizedMessage()));
+ mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
+ baseDN.toNormalizedString(), e.getLocalizedMessage()));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
}
@@ -2210,7 +2181,7 @@
if (debugEnabled())
{
TRACER.debugInfo(this + " end restart : connected=" + connected
- + " with RSid=" + this.getRsServerId() + " genid=" + this.generationID);
+ + " with RSid=" + getRsServerId() + " genid=" + this.generationID);
}
}
@@ -2476,17 +2447,14 @@
}
else if (msg instanceof StopMsg)
{
- /*
- * RS performs a proper disconnection
- */
- Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED
- .get(replicationServerID,
- savedSession.getReadableRemoteAddress(),
- serverId, baseDn);
+ // RS performs a proper disconnection
+ Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
+ replicationServerID, savedSession.getReadableRemoteAddress(),
+ serverId, baseDN.toNormalizedString());
logError(message);
// Try to find a suitable RS
- this.reStart(savedSession, true);
+ reStart(savedSession, true);
}
else if (msg instanceof MonitorMsg)
{
@@ -2547,14 +2515,15 @@
message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
serverId, replicationServerID,
savedSession.getReadableRemoteAddress(),
- baseDn);
+ baseDN.toNormalizedString());
}
else
{
message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
serverId, replicationServerID,
savedSession.getReadableRemoteAddress(),
- bestServerInfo.getServerId(), baseDn);
+ bestServerInfo.getServerId(),
+ baseDN.toNormalizedString());
}
logError(message);
reStart(true);
@@ -2586,12 +2555,10 @@
final Session tmpSession = session;
if (tmpSession == null || !tmpSession.closeInitiated())
{
- /*
- * We did not initiate the close on our side, log an error message.
- */
- Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED
- .get(serverId, baseDn, replicationServerID,
- savedSession.getReadableRemoteAddress());
+ // We did not initiate the close on our side, log an error message.
+ Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
+ serverId, baseDN.toNormalizedString(), replicationServerID,
+ savedSession.getReadableRemoteAddress());
logError(message);
}
@@ -2678,7 +2645,7 @@
if (debugEnabled())
TRACER.debugInfo("ReplicationBroker " + serverId + " is stopping and will"
+ " close the connection to replication server " + rsServerId + " for"
- + " domain " + baseDn);
+ + " domain " + baseDN);
synchronized (startStopLock)
{
@@ -2767,10 +2734,8 @@
if (connected)
{
return sendWindow.availablePermits();
- } else
- {
- return 0;
}
+ return 0;
}
/**
@@ -2864,9 +2829,9 @@
} catch (IOException ex)
{
Message message = ERR_EXCEPTION_SENDING_CS.get(
- baseDn,
+ baseDN.toNormalizedString(),
Integer.toString(serverId),
- ex.getLocalizedMessage() + stackTraceToSingleLineString(ex));
+ ex.getLocalizedMessage() + " " + stackTraceToSingleLineString(ex));
logError(message);
}
}
@@ -3022,10 +2987,9 @@
// Start a CSN heartbeat thread.
if (changeTimeHeartbeatSendInterval > 0)
{
- String threadName = "Replica DS("
- + this.getServerId()
+ String threadName = "Replica DS(" + getServerId()
+ ") change time heartbeat publisher for domain \""
- + this.baseDn + "\" to RS(" + this.getRsServerId()
+ + this.baseDN + "\" to RS(" + getRsServerId()
+ ") at " + session.getReadableRemoteAddress();
ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
--
Gitblit v1.10.0