From 13819a2e81db0422a7c8c186f838c7b243173170 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 03 Sep 2014 06:30:37 +0000
Subject: [PATCH] OPENDJ-1205 (CR-4428) Remove network layer from External ChangeLog implementation
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 121 ++++-----------------------------------
1 files changed, 14 insertions(+), 107 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index d1b0187..3f85b22 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -53,7 +53,6 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.HostPort;
-import org.opends.server.util.ServerConstants;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -289,7 +288,7 @@
{
shutdown = false;
this.rcvWindow = getMaxRcvWindow();
- connect();
+ connectAsDataServer();
}
}
@@ -702,19 +701,6 @@
}
}
- private void connect()
- {
- if (getBaseDN().toNormalizedString().equalsIgnoreCase(
- ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
- {
- connectAsECL();
- }
- else
- {
- connectAsDataServer();
- }
- }
-
/**
* Contacts all replication servers to get information from them and being
* able to choose the more suitable.
@@ -728,7 +714,7 @@
for (String serverUrl : getReplicationServerUrls())
{
// Connect to server + get and store info about it
- final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false);
+ final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false);
final ReplicationServerInfo rsInfo = rs.rsInfo;
if (rsInfo != null)
{
@@ -740,35 +726,6 @@
}
/**
- * Special aspects of connecting as ECL (External Change Log) compared to
- * connecting as data server are :
- * <ul>
- * <li>1 single RS configured</li>
- * <li>so no choice of the preferred RS</li>
- * <li>?? Heartbeat</li>
- * <li>Start handshake is :
- *
- * <pre>
- * Broker ---> StartECLMsg ---> RS
- * <---- ReplServerStartMsg ---
- * ---> StartSessionECLMsg --> RS
- * </pre>
- *
- * </li>
- * </ul>
- */
- private void connectAsECL()
- {
- // FIXME:ECL List of RS to connect is for now limited to one RS only
- final String bestServerURL = getReplicationServerUrls().iterator().next();
- final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true);
- if (rs.isConnected())
- {
- performECLPhaseTwoHandshake(bestServerURL, rs);
- }
- }
-
- /**
* Connect to a ReplicationServer.
*
* Handshake sequences between a DS and a RS is divided into 2 logical
@@ -844,7 +801,7 @@
+ evals.getBestRS());
final ConnectedRS electedRS = performPhaseOneHandshake(
- evals.getBestRS().getServerURL(), true, false);
+ evals.getBestRS().getServerURL(), true);
final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
if (electedRsInfo != null)
{
@@ -1116,12 +1073,9 @@
* Do we keep session opened or not after handshake. Use true if want
* to perform handshake phase 2 with the same session and keep the
* session to create as the current one.
- * @param isECL
- * Indicates whether or not the an ECL handshake is to be performed.
* @return The answer from the server . Null if could not get an answer.
*/
- private ConnectedRS performPhaseOneHandshake(String serverURL,
- boolean keepSession, boolean isECL)
+ private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession)
{
Session newSession = null;
Socket socket = null;
@@ -1135,8 +1089,7 @@
socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
- socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(),
- timeoutMS);
+ socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS);
newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
boolean isSslEncryption = replSessionSecurity.isSslEncryption();
@@ -1144,19 +1097,9 @@
final HostPort hp = new HostPort(
socket.getLocalAddress().getHostName(), socket.getLocalPort());
final String url = hp.toString();
- final StartMsg serverStartMsg;
- if (!isECL)
- {
- serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
- getMaxRcvWindow(), config.getHeartbeatInterval(), state,
- getGenerationID(), isSslEncryption, getGroupId());
- }
- else
- {
- serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0,
- getMaxRcvWindow(), config.getHeartbeatInterval(), state,
- getGenerationID(), isSslEncryption, getGroupId());
- }
+ final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
+ getMaxRcvWindow(), config.getHeartbeatInterval(), state,
+ getGenerationID(), isSslEncryption, getGroupId());
newSession.publish(serverStartMsg);
// Read the ReplServerStartMsg or ReplServerStartDSMsg that should
@@ -1247,45 +1190,6 @@
return setConnectedRS(ConnectedRS.noConnectedRS());
}
-
-
- /**
- * Performs the second phase handshake for External Change Log (send
- * StartSessionMsg and receive TopologyMsg messages exchange) and return the
- * reply message from the replication server.
- *
- * @param server Server we are connecting with.
- */
- private void performECLPhaseTwoHandshake(String server, ConnectedRS rs)
- {
- try
- {
- // Send our Start Session
- final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
- startECLSessionMsg.setOperationId("-1");
- rs.session.publish(startECLSessionMsg);
-
- // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
- if (debugEnabled())
- {
- debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg);
- }
-
- // Alright set the timeout to the desired value
- rs.session.setSoTimeout(timeout);
- setConnectedRS(rs);
- }
- catch (Exception e)
- {
- logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
- getServerId(), server, getBaseDN().toNormalizedString(),
- stackTraceToSingleLineString(e)));
-
- rs.session.close();
- setConnectedRS(ConnectedRS.noConnectedRS());
- }
- }
-
/**
* Performs the second phase handshake (send StartSessionMsg and receive
* TopologyMsg messages exchange) and return the reply message from the
@@ -2288,7 +2192,7 @@
try
{
- connect();
+ connectAsDataServer();
rs = connectedRS.get();
}
catch (Exception e)
@@ -3103,7 +3007,8 @@
Map<Integer, ReplicationServerInfo> previousRsInfos)
{
this.rsServerId = rsServerId;
- this.replicaInfos = dsInfosToKeep;
+ this.replicaInfos = dsInfosToKeep == null
+ ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep;
this.rsInfos = computeRSInfos(dsServerId, newRSInfos,
previousRsInfos, configuredReplicationServerUrls);
}
@@ -3310,7 +3215,9 @@
@Override
public String toString()
{
- return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos
+ return getClass().getSimpleName()
+ + " rsServerId=" + rsServerId
+ + ", replicaInfos=" + replicaInfos.values()
+ ", rsInfos=" + rsInfos.values();
}
}
--
Gitblit v1.10.0