From 45eb21b1354b6925fc058f834f505a9699d1bbbe Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 10 Jun 2009 08:43:50 +0000
Subject: [PATCH] External Changelog - first step - related issues 495, 519
---
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 309 +++++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 294 insertions(+), 15 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c349289..a53ca20 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -26,15 +26,9 @@
*/
package org.opends.server.replication.service;
-import org.opends.server.replication.protocol.HeartbeatMonitor;
-
-import org.opends.messages.*;
-
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-
-import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
@@ -52,13 +46,33 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
+import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.HeartbeatMonitor;
+import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ServerStartECLMsg;
+import org.opends.server.replication.protocol.ServerStartMsg;
+import org.opends.server.replication.protocol.StartECLSessionMsg;
+import org.opends.server.replication.protocol.StartSessionMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.WindowMsg;
+import org.opends.server.replication.protocol.WindowProbeMsg;
+import org.opends.server.util.ServerConstants;
import org.opends.server.replication.server.ReplicationServer;
/**
@@ -300,6 +314,43 @@
}
}
+ private void connect()
+ {
+ if (this.baseDn.compareToIgnoreCase(
+ ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)==0)
+ {
+ connectAsECL();
+ }
+ else
+ {
+ connectAsDataServer();
+ }
+ }
+
+ /**
+ * Special aspects of connecting as ECL compared to connecting as data server
+ * are :
+ * - 1 single RS configured
+ * - so no choice of the prefered RS
+ * - No same groupID polling
+ * - ?? Heartbeat
+ * - Start handshake is :
+ * Broker ---> StartECLMsg ---> RS
+ * <---- ReplServerStartMsg ---
+ * ---> StartSessionECLMsg --> RS
+ */
+ private void connectAsECL()
+ {
+ // FIXME:ECL List of RS to connect is for now limited to one RS only
+ String bestServer = this.servers.iterator().next();
+
+ ReplServerStartMsg inReplServerStartMsg
+ = performECLPhaseOneHandshake(bestServer, true);
+
+ if (inReplServerStartMsg!=null)
+ performECLPhaseTwoHandshake(bestServer);
+ }
+
/**
* Connect to a ReplicationServer.
*
@@ -325,7 +376,7 @@
*
* @throws NumberFormatException address was invalid
*/
- private void connect()
+ private void connectAsDataServer()
{
HashMap<String, ServerInfo> rsInfos = new HashMap<String, ServerInfo>();
@@ -350,6 +401,9 @@
* Connect to each replication server and get their ServerState then find
* out which one is the best to connect to.
*/
+ if (debugEnabled())
+ TRACER.debugInfo("phase 1 : will perform PhaseOneH with each RS in " +
+ " order to elect the prefered one");
for (String server : servers)
{
// Connect to server and get reply message
@@ -375,6 +429,9 @@
serverId, baseDn, groupId);
// Best found, now initialize connection to this one (handshake phase 1)
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "phase 2 : will perform PhaseOneH with the prefered RS.");
replServerStartMsg = performPhaseOneHandshake(bestServer, true);
if (replServerStartMsg != null) // Handshake phase 1 exchange went well
@@ -764,6 +821,8 @@
{
try
{
+ if (debugEnabled())
+ TRACER.debugInfo("In RB, closing session after phase 1");
localSession.close();
} catch (IOException e)
{
@@ -789,6 +848,225 @@
}
/**
+ * Connect to the provided server performing the first phase handshake
+ * (start messages exchange) and return the reply message from the replication
+ * server.
+ *
+ * @param server Server to connect to.
+ * @param keepConnection Do we keep session opened or not after handshake.
+ * Use true if want to perform handshake phase 2 with the same session
+ * and keep the session to create as the current one.
+ * @return The ReplServerStartMsg the server replied. Null if could not
+ * get an answer.
+ */
+ private ReplServerStartMsg performECLPhaseOneHandshake(String server,
+ boolean keepConnection)
+ {
+ ReplServerStartMsg replServerStartMsg = null;
+
+ // Parse server string.
+ int separator = server.lastIndexOf(':');
+ String port = server.substring(separator + 1);
+ String hostname = server.substring(0, separator);
+ ProtocolSession localSession = null;
+
+ boolean error = false;
+ try
+ {
+ /*
+ * Open a socket connection to the next candidate.
+ */
+ int intPort = Integer.parseInt(port);
+ InetSocketAddress serverAddr = new InetSocketAddress(
+ InetAddress.getByName(hostname), intPort);
+ if (keepConnection)
+ tmpReadableServerName = serverAddr.toString();
+ Socket socket = new Socket();
+ socket.setReceiveBufferSize(1000000);
+ socket.setTcpNoDelay(true);
+ socket.connect(serverAddr, 500);
+ localSession = replSessionSecurity.createClientSession(server, socket,
+ ReplSessionSecurity.HANDSHAKE_TIMEOUT);
+ boolean isSslEncryption =
+ replSessionSecurity.isSslEncryption(server);
+
+ // Send our start msg.
+ ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg(
+ baseDn, 0, 0, 0, 0,
+ maxRcvWindow, heartbeatInterval, state,
+ ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
+ isSslEncryption,
+ groupId);
+ localSession.publish(serverStartECLMsg);
+
+ // Read the ReplServerStartMsg that should come back.
+ replServerStartMsg = (ReplServerStartMsg) localSession.receive();
+
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In RB for " + baseDn +
+ "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
+ "\nAND RECEIVED:\n" + replServerStartMsg.toString());
+ }
+
+ // Sanity check
+ String repDn = replServerStartMsg.getBaseDn();
+ if (!(this.baseDn.equals(repDn)))
+ {
+ Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
+ this.baseDn);
+ logError(message);
+ error = true;
+ }
+
+ /*
+ * We have sent our own protocol version to the replication server.
+ * The replication server will use the same one (or an older one
+ * if it is an old replication server).
+ */
+ if (keepConnection)
+ protocolVersion = ProtocolVersion.minWithCurrent(
+ replServerStartMsg.getVersion());
+
+ if (!isSslEncryption)
+ {
+ localSession.stopEncryption();
+ }
+ } catch (ConnectException e)
+ {
+ /*
+ * There was no server waiting on this host:port
+ * Log a notice and try the next replicationServer in the list
+ */
+ if (!connectionError)
+ {
+ Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
+ if (keepConnection) // Log error message only for final connection
+ {
+ // the error message is only logged once to avoid overflowing
+ // the error log
+ logError(message);
+ } else if (debugEnabled())
+ {
+ TRACER.debugInfo(message.toString());
+ }
+ }
+ error = true;
+ } catch (Exception e)
+ {
+ if ( (e instanceof SocketTimeoutException) && debugEnabled() )
+ {
+ TRACER.debugInfo("Timeout trying to connect to RS " + server +
+ " for dn: " + baseDn);
+ }
+ Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("1",
+ baseDn, server, e.getLocalizedMessage() +
+ stackTraceToSingleLineString(e));
+ if (keepConnection) // Log error message only for final connection
+ {
+ logError(message);
+ } else if (debugEnabled())
+ {
+ TRACER.debugInfo(message.toString());
+ }
+ error = true;
+ }
+
+ // Close session if requested
+ if (!keepConnection || error)
+ {
+ if (localSession != null)
+ {
+ try
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("In RB, closing session after phase 1");
+ localSession.close();
+ } catch (IOException e)
+ {
+ // The session was already closed, just ignore.
+ }
+ localSession = null;
+ }
+ if (error)
+ {
+ replServerStartMsg = null;
+ } // Be sure to return null.
+
+ }
+
+ // If this connection as the one to use for sending and receiving updates,
+ // store it.
+ if (keepConnection)
+ {
+ session = localSession;
+ }
+
+ return replServerStartMsg;
+ }
+
+ /**
+ * Performs the second phase handshake (send StartSessionMsg and receive
+ * TopologyMsg messages exchange) and return the reply message from the
+ * replication server.
+ *
+ * @param server Server we are connecting with.
+ * @param initStatus The status we are starting with
+ * @return The ReplServerStartMsg the server replied. Null if could not
+ * get an answer.
+ */
+ private TopologyMsg performECLPhaseTwoHandshake(String server)
+ {
+ TopologyMsg topologyMsg = null;
+
+ try
+ {
+ // Send our Start Session
+ StartECLSessionMsg startECLSessionMsg = null;
+ startECLSessionMsg = new StartECLSessionMsg();
+ startECLSessionMsg.setOperationId(Short.toString(serverId));
+ session.publish(startECLSessionMsg);
+
+ /* FIXME:ECL In the handshake phase two, should RS send back a topo msg ?
+ * Read the TopologyMsg that should come back.
+ topologyMsg = (TopologyMsg) session.receive();
+ */
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In RB for " + baseDn +
+ "\nRB HANDSHAKE SENT:\n" + startECLSessionMsg.toString());
+ // + "\nAND RECEIVED:\n" + topologyMsg.toString());
+ }
+
+ // Alright set the timeout to the desired value
+ session.setSoTimeout(timeout);
+ connected = true;
+
+ } catch (Exception e)
+ {
+ Message message = ERR_EXCEPTION_STARTING_SESSION_PHASE.get("2",
+ baseDn, server, e.getLocalizedMessage() +
+ stackTraceToSingleLineString(e));
+ logError(message);
+
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ } catch (IOException ex)
+ {
+ // The session was already closed, just ignore.
+ }
+ session = null;
+ }
+ // Be sure to return null.
+ topologyMsg = null;
+ }
+ return topologyMsg;
+ }
+
+ /**
* Performs the second phase handshake (send StartSessionMsg and receive
* TopologyMsg messages exchange) and return the reply message from the
* replication server.
@@ -1720,6 +1998,12 @@
{
boolean done = false;
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("SameGroupIdPoller for: " + baseDn.toString() +
+ " started.");
+ }
+
while ((!done) && (!sameGroupIdPollershutdown))
{
// Sleep some time between checks
@@ -1850,11 +2134,6 @@
*/
public void receiveTopo(TopologyMsg topoMsg)
{
-
- if (debugEnabled())
- TRACER.debugInfo("Replication domain " + baseDn
- + " received topology info update:\n" + topoMsg);
-
// Store new lists
synchronized(getDsList())
{
--
Gitblit v1.10.0