From 4ee9f7e5b2a8ae82e1969b0fc6e29d96c2994a11 Mon Sep 17 00:00:00 2001
From: matthew_swift <matthew_swift@localhost>
Date: Thu, 13 Nov 2008 19:22:03 +0000
Subject: [PATCH] This change relates to issue 3567.

---
 opendj-sdk/opends/src/server/org/opends/server/core/PersistentSearch.java                                    |  137 +++++++++++++++++++++++++--------
 opendj-sdk/opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java |    4 
 opendj-sdk/opends/src/server/org/opends/server/core/SearchOperationWrapper.java                              |   16 ----
 opendj-sdk/opends/src/server/org/opends/server/core/SearchOperationBasis.java                                |   27 ------
 opendj-sdk/opends/src/server/org/opends/server/core/SearchOperation.java                                     |   14 ---
 opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java                      |   11 ++
 6 files changed, 113 insertions(+), 96 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/PersistentSearch.java b/opendj-sdk/opends/src/server/org/opends/server/core/PersistentSearch.java
index a40bd0f..5453b3c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/PersistentSearch.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/PersistentSearch.java
@@ -53,6 +53,33 @@
 /**
  * This class defines a data structure that will be used to hold the
  * information necessary for processing a persistent search.
+ * <p>
+ * Work flow element implementations are responsible for managing the
+ * persistent searches that they are currently handling.
+ * <p>
+ * Typically, a work flow element search operation will first decode
+ * the persistent search control and construct a new {@code
+ * PersistentSearch}.
+ * <p>
+ * Once the initial search result set has been returned and no errors
+ * encountered, the work flow element implementation should register a
+ * cancellation callback which will be invoked when the persistent
+ * search is cancelled. This is achieved using
+ * {@link #registerCancellationCallback(CancellationCallback)}. The
+ * callback should make sure that any resources associated with the
+ * {@code PersistentSearch} are released. This may included removing
+ * the {@code PersistentSearch} from a list, or abandoning a
+ * persistent search operation that has been sent to a remote server.
+ * <p>
+ * Finally, the {@code PersistentSearch} should be enabled using
+ * {@link #enable()}. This method will register the {@code
+ * PersistentSearch} with the client connection and notify the
+ * underlying search operation that no result should be sent to the
+ * client.
+ * <p>
+ * Work flow element implementations should {@link #cancel()} active
+ * persistent searches when the work flow element fails or is shut
+ * down.
  */
 public final class PersistentSearch
 {
@@ -81,34 +108,65 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  // Indicates whether entries returned should include the entry
-  // change notification control.
-  private final boolean returnECs;
+
+
+  // Cancel a persistent search.
+  private static synchronized void cancel(PersistentSearch psearch)
+  {
+    if (!psearch.isCancelled)
+    {
+      psearch.isCancelled = true;
+
+      // The persistent search can no longer be cancelled.
+      psearch.searchOperation.getClientConnection().deregisterPersistentSearch(
+          psearch);
+
+      // Notify any cancellation callbacks.
+      for (CancellationCallback callback : psearch.cancellationCallbacks)
+      {
+        try
+        {
+          callback.persistentSearchCancelled(psearch);
+        }
+        catch (Exception e)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.ERROR, e);
+          }
+        }
+      }
+    }
+  }
 
   // The base DN for the search operation.
   private final DN baseDN;
 
-  // The set of change types we want to see.
-  private final Set<PersistentSearchChangeType> changeTypes;
-
-  // The scope for the search operation.
-  private final SearchScope scope;
-
-  // The filter for the search operation.
-  private final SearchFilter filter;
-
-  // The reference to the associated search operation.
-  private final SearchOperation searchOperation;
-
-  // Indicates whether or not this persistent search has already been
-  // aborted.
-  private boolean isCancelled = false;
-
   // Cancellation callbacks which should be run when this persistent
   // search is cancelled.
   private final List<CancellationCallback> cancellationCallbacks =
     new CopyOnWriteArrayList<CancellationCallback>();
 
+  // The set of change types we want to see.
+  private final Set<PersistentSearchChangeType> changeTypes;
+
+  // The filter for the search operation.
+  private final SearchFilter filter;
+
+  // Indicates whether or not this persistent search has already been
+  // aborted.
+  private boolean isCancelled = false;
+
+  // Indicates whether entries returned should include the entry
+  // change notification control.
+  private final boolean returnECs;
+
+  // The scope for the search operation.
+  private final SearchScope scope;
+
+  // The reference to the associated search operation.
+  private final SearchOperation searchOperation;
+
 
 
   /**
@@ -140,7 +198,9 @@
   /**
    * Cancels this persistent search operation. On exit this persistent
    * search will no longer be valid and any resources associated with
-   * it will have been released.
+   * it will have been released. In addition, any other persistent
+   * searches that are associated with this persistent search will
+   * also be canceled.
    *
    * @return The result of the cancellation.
    */
@@ -148,24 +208,18 @@
   {
     if (!isCancelled)
     {
-      isCancelled = true;
+      // Cancel this persistent search.
+      cancel(this);
 
-      // The persistent search can no longer be cancelled.
-      searchOperation.getClientConnection().deregisterPersistentSearch(this);
-
-      // Notify any cancellation callbacks.
-      for (CancellationCallback callback : cancellationCallbacks)
+      // Cancel any other persistent searches which are associated
+      // with this one. For example, a persistent search may be
+      // distributed across multiple proxies.
+      for (PersistentSearch psearch : searchOperation.getClientConnection()
+          .getPersistentSearches())
       {
-        try
+        if (psearch.getMessageID() == getMessageID())
         {
-          callback.persistentSearchCancelled(this);
-        }
-        catch (Exception e)
-        {
-          if (debugEnabled())
-          {
-            TRACER.debugCaught(DebugLogLevel.ERROR, e);
-          }
+          cancel(psearch);
         }
       }
     }
@@ -700,6 +754,19 @@
 
 
   /**
+   * Enable this persistent search. The persistent search will be
+   * registered with the client connection and will be prevented from
+   * sending responses to the client.
+   */
+  public void enable()
+  {
+    searchOperation.getClientConnection().registerPersistentSearch(this);
+    searchOperation.setSendResponse(false);
+  }
+
+
+
+  /**
    * Retrieves a string representation of this persistent search.
    *
    * @return A string representation of this persistent search.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperation.java b/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperation.java
index 17e7a70..5f8bb40 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperation.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperation.java
@@ -334,20 +334,6 @@
   public abstract void setIncludeUsableControl(boolean includeUsableControl);
 
   /**
-   * Register the psearch in the search operation.
-   *
-   * @param psearch - Persistent search associated to that operation
-   */
-  public abstract void setPersistentSearch(PersistentSearch psearch);
-
-  /**
-   * Get the psearch from the search operation.
-   *
-   * @return the psearch, or null if no psearch was registered
-   */
-  public abstract PersistentSearch getPersistentSearch();
-
-  /**
    * Indicates whether the client is able to handle referrals.
    *
    * @return true, if the client is able to handle referrals
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperationBasis.java b/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperationBasis.java
index 4c2b312..b4aad32 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperationBasis.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperationBasis.java
@@ -143,9 +143,6 @@
   // The matched values control associated with this search operation.
   private MatchedValuesControl matchedValuesControl;
 
-  // The persistent search associated with this search operation.
-  private PersistentSearch persistentSearch;
-
   // The search filter for the search operation.
   private SearchFilter filter;
 
@@ -249,7 +246,6 @@
     clientAcceptsReferrals = true;
     includeUsableControl   = false;
     responseSent           = new AtomicBoolean(false);
-    persistentSearch       = null;
     returnLDAPSubentries   = false;
     matchedValuesControl   = null;
     realAttributesOnly     = false;
@@ -350,7 +346,6 @@
     clientAcceptsReferrals = true;
     includeUsableControl   = false;
     responseSent           = new AtomicBoolean(false);
-    persistentSearch       = null;
     returnLDAPSubentries   = false;
     matchedValuesControl   = null;
   }
@@ -1274,12 +1269,6 @@
     if(cancelResult == null && this.cancelRequest == null)
     {
       this.cancelRequest = cancelRequest;
-
-      if (persistentSearch != null)
-      {
-        persistentSearch.cancel();
-        persistentSearch = null;
-      }
     }
   }
 
@@ -1346,14 +1335,6 @@
   /**
    * {@inheritDoc}
    */
-  public PersistentSearch getPersistentSearch()
-  {
-    return persistentSearch;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
   public boolean isIncludeUsableControl()
   {
     return includeUsableControl;
@@ -1370,14 +1351,6 @@
   /**
    * {@inheritDoc}
    */
-  public void setPersistentSearch(PersistentSearch psearch)
-  {
-    this.persistentSearch = psearch;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
   public Long getTimeLimitExpiration()
   {
     return timeLimitExpiration;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperationWrapper.java b/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperationWrapper.java
index 31864f9..c3aa626 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperationWrapper.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/core/SearchOperationWrapper.java
@@ -332,22 +332,6 @@
   /**
    * {@inheritDoc}
    */
-  public void setPersistentSearch(PersistentSearch psearch)
-  {
-    search.setPersistentSearch(psearch);
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public PersistentSearch getPersistentSearch()
-  {
-    return search.getPersistentSearch();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
   public Long getTimeLimitExpiration()
   {
     return search.getTimeLimitExpiration();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java b/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
index d7eee43..96809e7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/protocols/ldap/LDAPClientConnection.java
@@ -1287,10 +1287,14 @@
       {
         if (ps.getMessageID() == messageID)
         {
+          // We only need to find the first persistent search
+          // associated with the provided message ID. The persistent
+          // search will ensure that all other related persistent
+          // searches are cancelled.
           CancelResult cancelResult = ps.cancel();
 
           if (keepStats && (cancelResult.getResultCode() ==
-              ResultCode.CANCELED))
+            ResultCode.CANCELED))
           {
             statTracker.updateAbandonedOperation();
           }
@@ -1428,6 +1432,11 @@
 
         for (PersistentSearch persistentSearch : getPersistentSearches())
         {
+          if (persistentSearch.getMessageID() == messageID)
+          {
+            continue;
+          }
+
           persistentSearch.cancel();
           lastCompletionTime.set(TimeThread.getTime());
         }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java b/opendj-sdk/opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java
index 293cb99..a3334cd 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java
@@ -231,8 +231,7 @@
       if (persistentSearch != null)
       {
         wfe.registerPersistentSearch(persistentSearch);
-        clientConnection.registerPersistentSearch(persistentSearch);
-        setSendResponse(false);
+        persistentSearch.enable();
       }
 
 
@@ -536,7 +535,6 @@
           persistentSearch = new PersistentSearch(this,
                                       psearchControl.getChangeTypes(),
                                       psearchControl.getReturnECs());
-          setPersistentSearch(persistentSearch);
 
           // If we're only interested in changes, then we don't actually want
           // to process the search now.

--
Gitblit v1.10.0