From 45eb21b1354b6925fc058f834f505a9699d1bbbe Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 10 Jun 2009 08:43:50 +0000
Subject: [PATCH] External Changelog - first step - related issues 495,  519

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java |  200 ++++++++++++++++++++++++++++++++++++++++++++++---
 1 files changed, 187 insertions(+), 13 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 348680a..8c21f77 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,16 +42,18 @@
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
+import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
 import org.opends.server.admin.server.ConfigurationChangeListener;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.api.Backend;
@@ -61,13 +63,22 @@
 import org.opends.server.api.RestoreTaskListener;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.WorkflowImpl;
+import org.opends.server.core.networkgroups.NetworkGroup;
 import org.opends.server.loggers.LogLevel;
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.ExternalChangeLogSession;
 import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ServerStartECLMsg;
+import org.opends.server.replication.protocol.ServerStartMsg;
+import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.types.BackupConfig;
 import org.opends.server.types.ConfigChangeResult;
 import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.Entry;
 import org.opends.server.types.LDIFExportConfig;
@@ -75,7 +86,9 @@
 import org.opends.server.types.RestoreConfig;
 import org.opends.server.types.ResultCode;
 import org.opends.server.util.LDIFReader;
+import org.opends.server.util.ServerConstants;
 import org.opends.server.util.TimeThread;
+import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 
 import com.sleepycat.je.DatabaseException;
 
@@ -155,6 +168,8 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
+  private String externalChangeLogWorkflowID = "External Changelog Workflow ID";
+  ECLWorkflowElement eclwe;
   private static HashSet<Integer> localPorts = new HashSet<Integer>();
 
   /**
@@ -267,9 +282,35 @@
              ReplSessionSecurity.HANDSHAKE_TIMEOUT);
         if (session == null) // Error, go back to accept
           continue;
-        ServerHandler handler = new ServerHandler(session, queueSize);
-        handler.start(null, serverId, serverURL, rcvWindow,
-                      false, this);
+
+        ReplicationMsg msg = session.receive();
+
+        if (msg instanceof ServerStartMsg)
+        {
+          DataServerHandler handler = new DataServerHandler(session,
+              queueSize,serverURL,serverId,this,rcvWindow);
+          handler.startFromRemoteDS((ServerStartMsg)msg);
+        }
+        else if (msg instanceof ReplServerStartMsg)
+        {
+          ReplicationServerHandler handler = new ReplicationServerHandler(
+              session,queueSize,serverURL,serverId,this,rcvWindow);
+          handler.startFromRemoteRS((ReplServerStartMsg)msg);
+        }
+        else if (msg instanceof ServerStartECLMsg)
+        {
+          ECLServerHandler handler = new ECLServerHandler(
+              session,queueSize,serverURL,serverId,this,rcvWindow);
+          handler.startFromRemoteServer((ServerStartECLMsg)msg);
+        }
+        else
+        {
+          // We did not recognize the message, close session as what
+          // can happen after is undetermined and we do not want the server to
+          // be disturbed
+          ServerHandler.closeSession(session, null, null);
+          return;
+        }
       }
       catch (Exception e)
       {
@@ -277,6 +318,10 @@
         // shutdown or changing the port number process.
         // Just log debug information and loop.
         // Do not log the message during shutdown.
+        if (debugEnabled())
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
         if (shutdown == false) {
           Message message =
             ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage());
@@ -379,20 +424,20 @@
   /**
    * Establish a connection to the server with the address and port.
    *
-   * @param serverURL  The address and port for the server, separated by a
+   * @param remoteServerURL  The address and port for the server, separated by a
    *                    colon.
    * @param baseDn     The baseDn of the connection
    */
-  private void connect(String serverURL, String baseDn)
+  private void connect(String remoteServerURL, String baseDn)
   {
-    int separator = serverURL.lastIndexOf(':');
-    String port = serverURL.substring(separator + 1);
-    String hostname = serverURL.substring(0, separator);
-    boolean sslEncryption = replSessionSecurity.isSslEncryption(serverURL);
+    int separator = remoteServerURL.lastIndexOf(':');
+    String port = remoteServerURL.substring(separator + 1);
+    String hostname = remoteServerURL.substring(0, separator);
+    boolean sslEncryption =replSessionSecurity.isSslEncryption(remoteServerURL);
 
     if (debugEnabled())
       TRACER.debugInfo("RS " + this.getMonitorInstanceName() +
-               " connects to " + serverURL);
+               " connects to " + remoteServerURL);
 
     try
     {
@@ -402,12 +447,25 @@
       socket.setTcpNoDelay(true);
       socket.connect(ServerAddr, 500);
 
+      /*
       ServerHandler handler = new ServerHandler(
            replSessionSecurity.createClientSession(serverURL, socket,
            ReplSessionSecurity.HANDSHAKE_TIMEOUT),
            queueSize);
       handler.start(baseDn, serverId, this.serverURL, rcvWindow,
                     sslEncryption, this);
+      */
+
+      ReplicationServerHandler handler = new ReplicationServerHandler(
+          replSessionSecurity.createClientSession(remoteServerURL,
+              socket,
+              ReplSessionSecurity.HANDSHAKE_TIMEOUT),
+              queueSize,
+              this.serverURL,
+              serverId,
+              this,
+              rcvWindow);
+      handler.connect(baseDn, sslEncryption);
     }
     catch (Exception e)
     {
@@ -470,6 +528,10 @@
         serverId , this);
       listenThread.start();
 
+      // Initialize the External Changelog
+      // FIXME: how is WF creation enabed/disabled in the RS ?
+      initializeECL();
+
       if (debugEnabled())
         TRACER.debugInfo("RS " +getMonitorInstanceName()+
             " successfully initialized");
@@ -493,10 +555,88 @@
       Message message =
           ERR_COULD_NOT_BIND_CHANGELOG.get(changelogPort, e.getMessage());
       logError(message);
+    } catch (DirectoryException e)
+    {
+      //FIXME:DirectoryException is raised by initializeECL => fix err msg
+      Message message = Message.raw(Category.SYNC, Severity.SEVERE_ERROR,
+        "Directory Exception raised by ECL initialization: " + e.getMessage());
+      logError(message);
     }
   }
 
   /**
+   * Initializes the ECL access by creating a dedicated workflow element.
+   * @throws DirectoryException
+   */
+  private void initializeECL()
+  throws DirectoryException
+  {
+    WorkflowImpl externalChangeLogWorkflow;
+    if (WorkflowImpl.getWorkflow(externalChangeLogWorkflowID)
+        !=null)
+      return;
+
+    ECLWorkflowElement eclwe = new ECLWorkflowElement(this);
+
+    // Create the workflow for the base DN and register the workflow with
+    // the server.
+    externalChangeLogWorkflow = new WorkflowImpl(
+        externalChangeLogWorkflowID,
+        DN.decode(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT),
+        eclwe.getWorkflowElementID(),
+        eclwe);
+    externalChangeLogWorkflow.register();
+
+    NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
+    defaultNetworkGroup.registerWorkflow(externalChangeLogWorkflow);
+
+    // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
+    NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
+    adminNetworkGroup.registerWorkflow(externalChangeLogWorkflow);
+
+    // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup?
+    NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup();
+    internalNetworkGroup.registerWorkflow(externalChangeLogWorkflow);
+
+}
+
+  private void finalizeECL()
+  {
+    WorkflowImpl eclwf =
+      (WorkflowImpl)WorkflowImpl.getWorkflow(externalChangeLogWorkflowID);
+
+    // do it only if not already done by another RS (unit test case)
+    // if (DirectoryServer.getWorkflowElement(externalChangeLogWorkflowID)
+    if (eclwf!=null)
+    {
+
+
+    // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup?
+    NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup();
+    internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
+
+    // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
+    NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
+    adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
+
+    NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
+    defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
+
+    eclwf.deregister();
+    eclwf.finalizeWorkflow();
+    }
+
+    eclwe = (ECLWorkflowElement)
+    DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
+    if (eclwe!=null)
+    {
+      DirectoryServer.deregisterWorkflowElement(eclwe);
+      eclwe.finalizeWorkflowElement();
+    }
+
+}
+
+  /**
    * Get the ReplicationServerDomain associated to the base DN given in
    * parameter.
    *
@@ -571,6 +711,8 @@
     {
       dbEnv.shutdown();
     }
+
+    finalizeECL();
 }
 
 
@@ -1220,6 +1362,38 @@
   }
 
   /**
+   * Returns the number of domains managed by this replication server.
+   * @return the number of domains managed.
+   */
+  public int getCacheSize()
+  {
+    return baseDNs.size();
+  }
+
+  /**
+   * Create a new session to get the ECL.
+   * @param msg The message that specifies the ECL request.
+   * @return Returns the created session.
+   * @throws DirectoryException When an error occurs.
+   */
+  public ExternalChangeLogSession createECLSession(StartECLSessionMsg msg)
+  throws DirectoryException
+  {
+    ExternalChangeLogSessionImpl session =
+      new ExternalChangeLogSessionImpl(this, msg);
+    return session;
+  }
+
+  /**
+   * Getter on the server URL.
+   * @return the server URL.
+   */
+  public String getServerURL()
+  {
+    return this.serverURL;
+  }
+
+  /**
    * This method allows to check if the Replication Server given
    * as the parameter is running in the local JVM.
    *

--
Gitblit v1.10.0