From 4aa594ecf7ff52baf627cecbe9959b500e847367 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 20 Dec 2013 14:43:11 +0000
Subject: [PATCH] OPENDJ-1205 Remove network layer from External ChangeLog implementation 

---
 opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java |   78 +++++++++++++++++++++------------------
 1 files changed, 42 insertions(+), 36 deletions(-)

diff --git a/opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java b/opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java
index c37acd1..e71bd28 100644
--- a/opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java
+++ b/opends/src/server/org/opends/server/workflowelement/externalchangelog/ECLSearchOperation.java
@@ -39,10 +39,10 @@
 import org.opends.server.core.*;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.ExternalChangeLogSession;
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.ECLServerHandler;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.types.*;
 import org.opends.server.types.operation.PostOperationSearchOperation;
@@ -73,11 +73,6 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  /**
-   * The ECL Start Session we'll send to the RS.
-   */
-  private StartECLSessionMsg startECLSessionMsg;
-
   /** The set of supported controls for this WE. */
   private static final Set<String> CHANGELOG_SUPPORTED_CONTROLS =
       new HashSet<String>(Arrays.asList(
@@ -159,7 +154,7 @@
    */
   private SearchFilter filter;
 
-  private ExternalChangeLogSession eclSession;
+  private ECLServerHandler eclServerHandler;
 
   /**
    * A flag to know if the ECLControl has been requested.
@@ -190,7 +185,7 @@
    *           if this operation should be canceled
    */
   void processECLSearch(ECLWorkflowElement wfe)
-  throws CanceledOperationException
+      throws CanceledOperationException
   {
     boolean executePostOpPlugins = false;
 
@@ -205,7 +200,8 @@
     {
       replicationServer  = wfe.getReplicationServer();
       clientConnection   = getClientConnection();
-      startECLSessionMsg = new StartECLSessionMsg();
+
+      final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
 
       // Set default behavior as "from change number".
       // "from cookie" is set only when cookie is provided.
@@ -244,7 +240,7 @@
       // Analyse controls - including the cookie control
       try
       {
-        handleRequestControls();
+        handleRequestControls(startECLSessionMsg);
       }
       catch (DirectoryException de)
       {
@@ -299,7 +295,7 @@
       // Process the search.
       try
       {
-        processSearch();
+        processSearch(startECLSessionMsg);
       }
       catch (DirectoryException de)
       {
@@ -322,7 +318,7 @@
           persistentSearch.cancel();
           setSendResponse(true);
         }
-        close(eclSession);
+        shutdownECLServerHandler();
         throw coe;
       }
       catch (Exception e)
@@ -367,7 +363,8 @@
    * @throws  DirectoryException  If there is a problem with any of the request
    *                              controls.
    */
-  private void handleRequestControls() throws DirectoryException
+  private void handleRequestControls(StartECLSessionMsg startECLSessionMsg)
+      throws DirectoryException
   {
     List<Control> requestControls  = getRequestControls();
     if (requestControls != null && !requestControls.isEmpty())
@@ -392,7 +389,7 @@
           continue;
         }
 
-        if (oid.equals(OID_ECL_COOKIE_EXCHANGE_CONTROL))
+        if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(oid))
         {
           ExternalChangelogRequestControl eclControl =
             getRequestControl(ExternalChangelogRequestControl.DECODER);
@@ -405,7 +402,7 @@
             startECLSessionMsg.setCrossDomainServerState(cookie.toString());
           }
         }
-        else if (oid.equals(OID_LDAP_ASSERTION))
+        else if (OID_LDAP_ASSERTION.equals(oid))
         {
           LDAPAssertionRequestControl assertControl =
             getRequestControl(LDAPAssertionRequestControl.DECODER);
@@ -462,7 +459,7 @@
                     de.getMessageObject()), de);
           }
         }
-        else if (oid.equals(OID_PROXIED_AUTH_V1))
+        else if (OID_PROXIED_AUTH_V1.equals(oid))
         {
           // Log usage of legacy proxy authz V1 control.
           addAdditionalLogItem(AdditionalLogItem.keyOnly(getClass(),
@@ -490,7 +487,7 @@
             setProxiedAuthorizationDN(authorizationEntry.getDN());
           }
         }
-        else if (oid.equals(OID_PROXIED_AUTH_V2))
+        else if (OID_PROXIED_AUTH_V2.equals(oid))
         {
           // The requester must have the PROXIED_AUTH privilege in order to be
           // able to use this control.
@@ -514,7 +511,7 @@
             setProxiedAuthorizationDN(authorizationEntry.getDN());
           }
         }
-        else if (oid.equals(OID_PERSISTENT_SEARCH))
+        else if (OID_PERSISTENT_SEARCH.equals(oid))
         {
           PersistentSearchControl psearchControl =
             getRequestControl(PersistentSearchControl.DECODER);
@@ -532,13 +529,13 @@
             startECLSessionMsg.setPersistent(
                 StartECLSessionMsg.PERSISTENT);
         }
-        else if (oid.equals(OID_LDAP_SUBENTRIES))
+        else if (OID_LDAP_SUBENTRIES.equals(oid))
         {
           SubentriesControl subentriesControl =
                   getRequestControl(SubentriesControl.DECODER);
           setReturnSubentriesOnly(subentriesControl.getVisibility());
         }
-        else if (oid.equals(OID_LDUP_SUBENTRIES))
+        else if (OID_LDUP_SUBENTRIES.equals(oid))
         {
           // Support for legacy draft-ietf-ldup-subentry.
           addAdditionalLogItem(AdditionalLogItem.keyOnly(getClass(),
@@ -546,25 +543,25 @@
 
           setReturnSubentriesOnly(true);
         }
-        else if (oid.equals(OID_MATCHED_VALUES))
+        else if (OID_MATCHED_VALUES.equals(oid))
         {
           MatchedValuesControl matchedValuesControl =
             getRequestControl(MatchedValuesControl.DECODER);
           setMatchedValuesControl(matchedValuesControl);
         }
-        else if (oid.equals(OID_ACCOUNT_USABLE_CONTROL))
+        else if (OID_ACCOUNT_USABLE_CONTROL.equals(oid))
         {
           setIncludeUsableControl(true);
         }
-        else if (oid.equals(OID_REAL_ATTRS_ONLY))
+        else if (OID_REAL_ATTRS_ONLY.equals(oid))
         {
           setRealAttributesOnly(true);
         }
-        else if (oid.equals(OID_VIRTUAL_ATTRS_ONLY))
+        else if (OID_VIRTUAL_ATTRS_ONLY.equals(oid))
         {
           setVirtualAttributesOnly(true);
         }
-        else if (oid.equals(OID_GET_EFFECTIVE_RIGHTS) &&
+        else if (OID_GET_EFFECTIVE_RIGHTS.equals(oid) &&
             DirectoryServer.isSupportedControl(OID_GET_EFFECTIVE_RIGHTS))
         {
           // Do nothing here and let AciHandler deal with it.
@@ -582,8 +579,8 @@
     }
   }
 
-  private void processSearch() throws DirectoryException,
-      CanceledOperationException
+  private void processSearch(StartECLSessionMsg startECLSessionMsg)
+      throws DirectoryException, CanceledOperationException
   {
     if (debugEnabled())
     {
@@ -591,13 +588,14 @@
           + startECLSessionMsg.getOperationId() + "]");
     }
 
-    // Start a specific ECL session
-    eclSession = replicationServer.createECLSession(startECLSessionMsg);
+    // Start a specific ECL server handler
+    eclServerHandler =
+        new ECLServerHandler(replicationServer, startECLSessionMsg);
     boolean abortECLSession = false;
     try
     {
       // Get first update (this is needed to determine hasSubordinates.
-      ECLUpdateMsg update = eclSession.getNextUpdate();
+      ECLUpdateMsg update = eclServerHandler.getNextECLUpdate();
 
       // Return root entry if requested.
       if (CHANGELOG_ROOT_DN.matchesBaseAndScope(baseDN, getScope()))
@@ -632,7 +630,7 @@
           return;
         }
 
-        update = eclSession.getNextUpdate();
+        update = eclServerHandler.getNextECLUpdate();
       }
     }
     catch (CanceledOperationException e)
@@ -649,7 +647,7 @@
     {
       if (persistentSearch == null || abortECLSession)
       {
-        close(eclSession);
+        shutdownECLServerHandler();
       }
     }
   }
@@ -1080,8 +1078,8 @@
   public CancelResult cancel(CancelRequest cancelRequest)
   {
     if (debugEnabled())
-      TRACER.debugInfo(this + " cancel() " + eclSession);
-    close(eclSession);
+      TRACER.debugInfo(this + " cancel() " + eclServerHandler);
+    shutdownECLServerHandler();
     return super.cancel(cancelRequest);
   }
 
@@ -1090,8 +1088,16 @@
   public void abort(CancelRequest cancelRequest)
   {
     if (debugEnabled())
-      TRACER.debugInfo(this + " abort() " + eclSession);
-    close(eclSession);
+      TRACER.debugInfo(this + " abort() " + eclServerHandler);
+    shutdownECLServerHandler();
+  }
+
+  private void shutdownECLServerHandler()
+  {
+    if (eclServerHandler != null)
+    {
+      eclServerHandler.shutdown();
+    }
   }
 
   /**

--
Gitblit v1.10.0