From a592fe71c4c2e29a136f9700a2981f3dcbd7e114 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 22 Sep 2014 19:47:33 +0000
Subject: [PATCH] OPENDJ-1205 (CR-4428) Remove network layer from External ChangeLog implementation
---
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java | 120 +++++++-----------------------------------------------------
1 files changed, 14 insertions(+), 106 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c37bdda..eecfc65 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -49,7 +49,6 @@
import org.opends.server.replication.protocol.*;
import org.opends.server.types.DN;
import org.opends.server.types.HostPort;
-import org.opends.server.util.ServerConstants;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
@@ -279,7 +278,7 @@
{
shutdown = false;
this.rcvWindow = getMaxRcvWindow();
- connect();
+ connectAsDataServer();
}
}
@@ -692,19 +691,6 @@
}
}
- private void connect()
- {
- if (getBaseDN().toNormalizedString().equalsIgnoreCase(
- ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
- {
- connectAsECL();
- }
- else
- {
- connectAsDataServer();
- }
- }
-
/**
* Contacts all replication servers to get information from them and being
* able to choose the more suitable.
@@ -718,7 +704,7 @@
for (String serverUrl : getReplicationServerUrls())
{
// Connect to server + get and store info about it
- final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false);
+ final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false);
final ReplicationServerInfo rsInfo = rs.rsInfo;
if (rsInfo != null)
{
@@ -730,35 +716,6 @@
}
/**
- * Special aspects of connecting as ECL (External Change Log) compared to
- * connecting as data server are :
- * <ul>
- * <li>1 single RS configured</li>
- * <li>so no choice of the preferred RS</li>
- * <li>?? Heartbeat</li>
- * <li>Start handshake is :
- *
- * <pre>
- * Broker ---> StartECLMsg ---> RS
- * <---- ReplServerStartMsg ---
- * ---> StartSessionECLMsg --> RS
- * </pre>
- *
- * </li>
- * </ul>
- */
- private void connectAsECL()
- {
- // FIXME:ECL List of RS to connect is for now limited to one RS only
- final String bestServerURL = getReplicationServerUrls().iterator().next();
- final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true);
- if (rs.isConnected())
- {
- performECLPhaseTwoHandshake(bestServerURL, rs);
- }
- }
-
- /**
* Connect to a ReplicationServer.
*
* Handshake sequences between a DS and a RS is divided into 2 logical
@@ -834,7 +791,7 @@
+ evals.getBestRS());
final ConnectedRS electedRS = performPhaseOneHandshake(
- evals.getBestRS().getServerURL(), true, false);
+ evals.getBestRS().getServerURL(), true);
final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
if (electedRsInfo != null)
{
@@ -1100,12 +1057,9 @@
* Do we keep session opened or not after handshake. Use true if want
* to perform handshake phase 2 with the same session and keep the
* session to create as the current one.
- * @param isECL
- * Indicates whether or not the an ECL handshake is to be performed.
* @return The answer from the server . Null if could not get an answer.
*/
- private ConnectedRS performPhaseOneHandshake(String serverURL,
- boolean keepSession, boolean isECL)
+ private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession)
{
Session newSession = null;
Socket socket = null;
@@ -1124,8 +1078,7 @@
socket.bind(local);
}
int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
- socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(),
- timeoutMS);
+ socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS);
newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
boolean isSslEncryption = replSessionSecurity.isSslEncryption();
@@ -1133,19 +1086,9 @@
final HostPort hp = new HostPort(
socket.getLocalAddress().getHostName(), socket.getLocalPort());
final String url = hp.toString();
- final StartMsg serverStartMsg;
- if (!isECL)
- {
- serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
- getMaxRcvWindow(), config.getHeartbeatInterval(), state,
- getGenerationID(), isSslEncryption, getGroupId());
- }
- else
- {
- serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
- getMaxRcvWindow(), config.getHeartbeatInterval(), state,
- getGenerationID(), isSslEncryption, getGroupId());
- }
+ final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
+ getMaxRcvWindow(), config.getHeartbeatInterval(), state,
+ getGenerationID(), isSslEncryption, getGroupId());
newSession.publish(serverStartMsg);
// Read the ReplServerStartMsg or ReplServerStartDSMsg that should
@@ -1233,44 +1176,6 @@
return setConnectedRS(ConnectedRS.noConnectedRS());
}
-
-
- /**
- * Performs the second phase handshake for External Change Log (send
- * StartSessionMsg and receive TopologyMsg messages exchange) and return the
- * reply message from the replication server.
- *
- * @param server Server we are connecting with.
- */
- private void performECLPhaseTwoHandshake(String server, ConnectedRS rs)
- {
- try
- {
- // Send our Start Session
- final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
- startECLSessionMsg.setOperationId("-1");
- rs.session.publish(startECLSessionMsg);
-
- // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
- if (logger.isTraceEnabled())
- {
- debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
- }
-
- // Alright set the timeout to the desired value
- rs.session.setSoTimeout(timeout);
- setConnectedRS(rs);
- }
- catch (Exception e)
- {
- logger.warn(WARN_EXCEPTION_STARTING_SESSION_PHASE, getServerId(), server, getBaseDN().toNormalizedString(),
- stackTraceToSingleLineString(e));
-
- rs.session.close();
- setConnectedRS(ConnectedRS.noConnectedRS());
- }
- }
-
/**
* Performs the second phase handshake (send StartSessionMsg and receive
* TopologyMsg messages exchange) and return the reply message from the
@@ -2273,7 +2178,7 @@
try
{
- connect();
+ connectAsDataServer();
rs = connectedRS.get();
}
catch (Exception e)
@@ -3077,7 +2982,8 @@
Map<Integer, ReplicationServerInfo> previousRsInfos)
{
this.rsServerId = rsServerId;
- this.replicaInfos = dsInfosToKeep;
+ this.replicaInfos = dsInfosToKeep == null
+ ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep;
this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
previousRsInfos, configuredReplicationServerUrls);
}
@@ -3284,7 +3190,9 @@
@Override
public String toString()
{
- return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos
+ return getClass().getSimpleName()
+ + " rsServerId=" + rsServerId
+ + ", replicaInfos=" + replicaInfos.values()
+ ", rsInfos=" + rsInfos.values();
}
}
--
Gitblit v1.10.0