From 45eb21b1354b6925fc058f834f505a9699d1bbbe Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 10 Jun 2009 08:43:50 +0000
Subject: [PATCH] External Changelog - first step - related issues 495, 519
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 200 ++++++++++++++++++++++++++++++++++++++++++++++---
1 files changed, 187 insertions(+), 13 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 348680a..8c21f77 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,16 +42,18 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.Backend;
@@ -61,13 +63,22 @@
import org.opends.server.api.RestoreTaskListener;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.WorkflowImpl;
+import org.opends.server.core.networkgroups.NetworkGroup;
import org.opends.server.loggers.LogLevel;
import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.ExternalChangeLogSession;
import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ServerStartECLMsg;
+import org.opends.server.replication.protocol.ServerStartMsg;
+import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
@@ -75,7 +86,9 @@
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.LDIFReader;
+import org.opends.server.util.ServerConstants;
import org.opends.server.util.TimeThread;
+import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import com.sleepycat.je.DatabaseException;
@@ -155,6 +168,8 @@
*/
private static final DebugTracer TRACER = getTracer();
+ private String externalChangeLogWorkflowID = "External Changelog Workflow ID";
+ ECLWorkflowElement eclwe;
private static HashSet<Integer> localPorts = new HashSet<Integer>();
/**
@@ -267,9 +282,35 @@
ReplSessionSecurity.HANDSHAKE_TIMEOUT);
if (session == null) // Error, go back to accept
continue;
- ServerHandler handler = new ServerHandler(session, queueSize);
- handler.start(null, serverId, serverURL, rcvWindow,
- false, this);
+
+ ReplicationMsg msg = session.receive();
+
+ if (msg instanceof ServerStartMsg)
+ {
+ DataServerHandler handler = new DataServerHandler(session,
+ queueSize,serverURL,serverId,this,rcvWindow);
+ handler.startFromRemoteDS((ServerStartMsg)msg);
+ }
+ else if (msg instanceof ReplServerStartMsg)
+ {
+ ReplicationServerHandler handler = new ReplicationServerHandler(
+ session,queueSize,serverURL,serverId,this,rcvWindow);
+ handler.startFromRemoteRS((ReplServerStartMsg)msg);
+ }
+ else if (msg instanceof ServerStartECLMsg)
+ {
+ ECLServerHandler handler = new ECLServerHandler(
+ session,queueSize,serverURL,serverId,this,rcvWindow);
+ handler.startFromRemoteServer((ServerStartECLMsg)msg);
+ }
+ else
+ {
+ // We did not recognize the message, close session as what
+ // can happen after is undetermined and we do not want the server to
+ // be disturbed
+ ServerHandler.closeSession(session, null, null);
+ return;
+ }
}
catch (Exception e)
{
@@ -277,6 +318,10 @@
// shutdown or changing the port number process.
// Just log debug information and loop.
// Do not log the message during shutdown.
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
if (shutdown == false) {
Message message =
ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
@@ -379,20 +424,20 @@
/**
* Establish a connection to the server with the address and port.
*
- * @param serverURL The address and port for the server, separated by a
+ * @param remoteServerURL The address and port for the server, separated by a
* colon.
* @param baseDn The baseDn of the connection
*/
- private void connect(String serverURL, String baseDn)
+ private void connect(String remoteServerURL, String baseDn)
{
- int separator = serverURL.lastIndexOf(':');
- String port = serverURL.substring(separator + 1);
- String hostname = serverURL.substring(0, separator);
- boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL);
+ int separator = remoteServerURL.lastIndexOf(':');
+ String port = remoteServerURL.substring(separator + 1);
+ String hostname = remoteServerURL.substring(0, separator);
+ boolean sslEncryption =replSessionSecurity.isSslEncryption(remoteServerURL);
if (debugEnabled())
TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
- " connects to " + serverURL);
+ " connects to " + remoteServerURL);
try
{
@@ -402,12 +447,25 @@
socket.setTcpNoDelay(true);
socket.connect(ServerAddr, 500);
+ /*
ServerHandler handler = new ServerHandler(
replSessionSecurity.createClientSession(serverURL, socket,
ReplSessionSecurity.HANDSHAKE_TIMEOUT),
queueSize);
handler.start(baseDn, serverId, this.serverURL, rcvWindow,
sslEncryption, this);
+ */
+
+ ReplicationServerHandler handler = new ReplicationServerHandler(
+ replSessionSecurity.createClientSession(remoteServerURL,
+ socket,
+ ReplSessionSecurity.HANDSHAKE_TIMEOUT),
+ queueSize,
+ this.serverURL,
+ serverId,
+ this,
+ rcvWindow);
+ handler.connect(baseDn, sslEncryption);
}
catch (Exception e)
{
@@ -470,6 +528,10 @@
serverId , this);
listenThread.start();
+ // Initialize the External Changelog
+ // FIXME: how is WF creation enabed/disabled in the RS ?
+ initializeECL();
+
if (debugEnabled())
TRACER.debugInfo("RS " +getMonitorInstanceName()+
" successfully initialized");
@@ -493,10 +555,88 @@
Message message =
ERR_COULD_NOT_BIND_CHANGELOG.get(changelogPort, e.getMessage());
logError(message);
+ } catch (DirectoryException e)
+ {
+ //FIXME:DirectoryException is raised by initializeECL => fix err msg
+ Message message = Message.raw(Category.SYNC, Severity.SEVERE_ERROR,
+ "Directory Exception raised by ECL initialization: " + e.getMessage());
+ logError(message);
}
}
/**
+ * Initializes the ECL access by creating a dedicated workflow element.
+ * @throws DirectoryException
+ */
+ private void initializeECL()
+ throws DirectoryException
+ {
+ WorkflowImpl externalChangeLogWorkflow;
+ if (WorkflowImpl.getWorkflow(externalChangeLogWorkflowID)
+ !=null)
+ return;
+
+ ECLWorkflowElement eclwe = new ECLWorkflowElement(this);
+
+ // Create the workflow for the base DN and register the workflow with
+ // the server.
+ externalChangeLogWorkflow = new WorkflowImpl(
+ externalChangeLogWorkflowID,
+ DN.decode(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT),
+ eclwe.getWorkflowElementID(),
+ eclwe);
+ externalChangeLogWorkflow.register();
+
+ NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
+ defaultNetworkGroup.registerWorkflow(externalChangeLogWorkflow);
+
+ // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
+ NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
+ adminNetworkGroup.registerWorkflow(externalChangeLogWorkflow);
+
+ // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup?
+ NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup();
+ internalNetworkGroup.registerWorkflow(externalChangeLogWorkflow);
+
+}
+
+ private void finalizeECL()
+ {
+ WorkflowImpl eclwf =
+ (WorkflowImpl)WorkflowImpl.getWorkflow(externalChangeLogWorkflowID);
+
+ // do it only if not already done by another RS (unit test case)
+ // if (DirectoryServer.getWorkflowElement(externalChangeLogWorkflowID)
+ if (eclwf!=null)
+ {
+
+
+ // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup?
+ NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup();
+ internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
+
+ // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
+ NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
+ adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
+
+ NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
+ defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
+
+ eclwf.deregister();
+ eclwf.finalizeWorkflow();
+ }
+
+ eclwe = (ECLWorkflowElement)
+ DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
+ if (eclwe!=null)
+ {
+ DirectoryServer.deregisterWorkflowElement(eclwe);
+ eclwe.finalizeWorkflowElement();
+ }
+
+}
+
+ /**
* Get the ReplicationServerDomain associated to the base DN given in
* parameter.
*
@@ -571,6 +711,8 @@
{
dbEnv.shutdown();
}
+
+ finalizeECL();
}
@@ -1220,6 +1362,38 @@
}
/**
+ * Returns the number of domains managed by this replication server.
+ * @return the number of domains managed.
+ */
+ public int getCacheSize()
+ {
+ return baseDNs.size();
+ }
+
+ /**
+ * Create a new session to get the ECL.
+ * @param msg The message that specifies the ECL request.
+ * @return Returns the created session.
+ * @throws DirectoryException When an error occurs.
+ */
+ public ExternalChangeLogSession createECLSession(StartECLSessionMsg msg)
+ throws DirectoryException
+ {
+ ExternalChangeLogSessionImpl session =
+ new ExternalChangeLogSessionImpl(this, msg);
+ return session;
+ }
+
+ /**
+ * Getter on the server URL.
+ * @return the server URL.
+ */
+ public String getServerURL()
+ {
+ return this.serverURL;
+ }
+
+ /**
* This method allows to check if the Replication Server given
* as the parameter is running in the local JVM.
*
--
Gitblit v1.10.0