mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

ludovicp
30.11.2010 08f9243b55eb5f0f8030eebe976a0d85ffbd44c2
Implements a configurable limit in the number of persistent searches a server can handle.
15 files modified
308 ■■■■■ changed files
opends/resource/schema/02-config.ldif 6 ●●●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/WorkQueueConfiguration.xml 31 ●●●●● patch | view | raw | blame | history
opends/src/admin/messages/ParallelWorkQueueCfgDefn.properties 3 ●●●●● patch | view | raw | blame | history
opends/src/admin/messages/TraditionalWorkQueueCfgDefn.properties 3 ●●●●● patch | view | raw | blame | history
opends/src/admin/messages/WorkQueueCfgDefn.properties 3 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/config.properties 5 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/core.properties 2 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/api/WorkQueue.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/core/DirectoryServer.java 52 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/core/PersistentSearch.java 7 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java 41 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java 41 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tools/LDAPSearch.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java 11 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/controls/PersistentSearchControlTest.java 88 ●●●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif
@@ -2485,6 +2485,11 @@
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.7
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.613
  NAME 'ds-cfg-max-psearches'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
  NAME 'ds-cfg-access-control-handler'
  SUP top
@@ -3247,6 +3252,7 @@
  STRUCTURAL
  MUST ( cn $
         ds-cfg-java-class )
 MAY  ds-cfg-max-psearches
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.72
  NAME 'ds-cfg-traditional-work-queue'
opends/src/admin/defn/org/opends/server/admin/std/WorkQueueConfiguration.xml
@@ -23,7 +23,7 @@
  ! CDDL HEADER END
  !
  !
  !      Copyright 2007-2008 Sun Microsystems, Inc.
  !      Copyright 2007-2010 Sun Microsystems, Inc.
  ! -->
<adm:managed-object name="work-queue" plural-name="work-queues"
  package="org.opends.server.admin.std"
@@ -33,7 +33,7 @@
  <adm:synopsis>
    The
    <adm:user-friendly-name />
    provides the configuration for the server work queue and
    provides the configuration for the server work queue and
    is responsible for ensuring that requests received from clients are
    processed in a timely manner.
  </adm:synopsis>
@@ -75,4 +75,31 @@
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="max-psearches">
    <adm:synopsis>
      Defines the maximum number of concurrent persistent searches that
      can be performed on Directory Server
    </adm:synopsis>
    <adm:description>
      The persistent search mechanism provides an active channel through which entries that change,
      and information about the changes that occur, can be communicated. Because each persistent search
      operation uses one thread, limiting the number of simultaneous persistent searches prevents certain
      kinds of denial of service attacks. A value of 0 indicates that there is no limit on the persistent searches.
    </adm:description>
    <adm:default-behavior>
      <adm:alias>
        <adm:synopsis>
          Let the server decide.
        </adm:synopsis>
      </adm:alias>
    </adm:default-behavior>
    <adm:syntax>
      <adm:integer lower-limit="0" upper-limit="2147483647" />
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-max-psearches</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
</adm:managed-object>
opends/src/admin/messages/ParallelWorkQueueCfgDefn.properties
@@ -3,6 +3,9 @@
synopsis=The Parallel Work Queue is a type of work queue that uses a number of worker threads that watch a queue and pick up an operation to process whenever one becomes available.
description=The parallel work queue is a FIFO queue serviced by a fixed number of worker threads. This fixed number of threads can be changed on the fly, with the change taking effect as soon as it is made. This work queue implementation is unbound ie it does not block after reaching certain queue size and as such should only be used on a very well tuned server configuration to avoid potential out of memory errors.
property.java-class.synopsis=Specifies the fully-qualified name of the Java class that provides the Parallel Work Queue implementation.
property.max-psearches.synopsis=Defines the maximum number of concurrent persistent searches that can be performed on Directory Server
property.max-psearches.description=The persistent search mechanism provides an active channel through which entries that change, and information about the changes that occur, can be communicated. Because each persistent search operation uses one thread, limiting the number of simultaneous persistent searches prevents certain kinds of denial of service attacks. A value of 0 indicates that there is no limit on the persistent searches.
property.max-psearches.default-behavior.alias.synopsis=Let the server decide.
property.num-worker-threads.synopsis=Specifies the number of worker threads to be used for processing operations placed in the queue.
property.num-worker-threads.description=If the value is increased, the additional worker threads are created immediately. If the value is reduced, the appropriate number of threads are destroyed as operations complete processing.
property.num-worker-threads.default-behavior.alias.synopsis=Let the server decide.
opends/src/admin/messages/TraditionalWorkQueueCfgDefn.properties
@@ -3,6 +3,9 @@
synopsis=The Traditional Work Queue is a type of work queue that uses a number of worker threads that watch a queue and pick up an operation to process whenever one becomes available.
description=The traditional work queue is a FIFO queue serviced by a fixed number of worker threads. This fixed number of threads can be changed on the fly, with the change taking effect as soon as it is made. You can limit the size of the work queue to a specified number of operations. When this many operations are in the queue, waiting to be picked up by threads, any new requests are rejected with an error message.
property.java-class.synopsis=Specifies the fully-qualified name of the Java class that provides the Traditional Work Queue implementation.
property.max-psearches.synopsis=Defines the maximum number of concurrent persistent searches that can be performed on Directory Server
property.max-psearches.description=The persistent search mechanism provides an active channel through which entries that change, and information about the changes that occur, can be communicated. Because each persistent search operation uses one thread, limiting the number of simultaneous persistent searches prevents certain kinds of denial of service attacks. A value of 0 indicates that there is no limit on the persistent searches.
property.max-psearches.default-behavior.alias.synopsis=Let the server decide.
property.max-work-queue-capacity.synopsis=Specifies the maximum number of queued operations that can be in the work queue at any given time.
property.max-work-queue-capacity.description=If the work queue is already full and additional requests are received by the server, the requests are rejected. A value of zero indicates that there is no limit to the size of the queue.
property.max-work-queue-capacity.default-behavior.alias.synopsis=The work queue does not impose any limit on the number of operations that can be enqueued at any one time.
opends/src/admin/messages/WorkQueueCfgDefn.properties
@@ -3,3 +3,6 @@
synopsis=The Work Queue provides the configuration for the server work queue and is responsible for ensuring that requests received from clients are processed in a timely manner.
description=Only a single work queue can be defined in the server. Whenever a connection handler receives a client request, it should place the request in the work queue to be processed appropriately.
property.java-class.synopsis=Specifies the fully-qualified name of the Java class that provides the Work Queue implementation.
property.max-psearches.synopsis=Defines the maximum number of concurrent persistent searches that can be performed on Directory Server
property.max-psearches.description=The persistent search mechanism provides an active channel through which entries that change, and information about the changes that occur, can be communicated. Because each persistent search operation uses one thread, limiting the number of simultaneous persistent searches prevents certain kinds of denial of service attacks. A value of 0 indicates that there is no limit on the persistent searches.
property.max-psearches.default-behavior.alias.synopsis=Let the server decide.
opends/src/messages/messages/config.properties
@@ -20,7 +20,7 @@
#
# CDDL HEADER END
#
#      Copyright 2006-2009 Sun Microsystems, Inc.
#      Copyright 2006-2010 Sun Microsystems, Inc.
@@ -2160,3 +2160,6 @@
SEVERE_WARN_CONFIG_SCHEMA_CONFLICTING_LDAP_SYNTAX_724=An ldap syntax read \
 from schema configuration file %s conflicts with another ldap syntax already \
 read into the schema:  %s.  The later ldap syntax description will be used
MILD_ERR_CONFIG_CORE_INVALID_MAX_PSEARCH_LIMIT_725=The provided maximum allowed \
 simultaneous persistent searches '%d' is invalid.  The maximum allowed \
 value must be between 0 and '%d' where '%d' is the number of worker threads
opends/src/messages/messages/core.properties
@@ -1839,3 +1839,5 @@
 allowed cryptography strength "%d" in jurisdiction policy files
MILD_ERR_DISK_SPACE_MONITOR_UPDATE_FAILED_729=Failed to update free disk space \
 for directory %s: %s
MILD_ERR_MAX_PSEARCH_LIMIT_EXCEEDED_730=The directory server is not accepting  \
 a new persistent search request because the server has already reached its limit
opends/src/server/org/opends/server/api/WorkQueue.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.api;
import org.opends.messages.Message;
@@ -165,5 +165,16 @@
    return false;
  }
 /**
   * Specifies the maximum number of simultaneous persistent
   * searches that are allowed.
   *
   * @return   The maximum number of simultaneous persistent
  *                      searches that are allowed.
   */
  public abstract int getMaxPersistentSearchLimit();
}
opends/src/server/org/opends/server/core/DirectoryServer.java
@@ -60,6 +60,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.lang.management.ManagementFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
@@ -612,6 +613,9 @@
  // a search.
  private int lookthroughLimit;
  // The current active persistent searches.
  private AtomicInteger activePSearches = new AtomicInteger(0);
  // Whether to use collect operation processing times in nanosecond resolution
  private boolean useNanoTime;
@@ -8868,6 +8872,54 @@
  /**
   *  Registers a new persistent search by increasing the count
   *  of active persistent searches. After receiving a persistent
   *  search request, a Local or Remote WFE must call this method to
   *  let the core server manage the count of concurrent persistent
   *  searches.
   */
  public static void registerPersistentSearch()
  {
    directoryServer.activePSearches.incrementAndGet();
  }
  /**
   * Deregisters a canceled persistent search.  After a persistent
   * search is canceled, the handler must call this method to let
   * the core server manage the count of concurrent persistent
   *  searches.
   */
  public static void deregisterPersistentSearch()
  {
    directoryServer.activePSearches.decrementAndGet();
  }
  /**
   * Indicates whether a new persistent search is allowed.
   *
   * @return <CODE>true</CODE>if a new persistent search is allowed
   *          or <CODE>false</CODE>f if not.
   */
  public static boolean allowNewPersistentSearch()
  {
    //0 indicates that there is no limit.
    int maxAllowedPSearch = getWorkQueue().getMaxPersistentSearchLimit();
    if(maxAllowedPSearch ==0 ||
            directoryServer.activePSearches.get() <
            maxAllowedPSearch)
    {
      return true;
    }
    return false;
  }
  /**
   * Retrieves the default maximum length of time in seconds that should be
   * allowed when processing a search.
   *
opends/src/server/org/opends/server/core/PersistentSearch.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.core;
@@ -121,6 +121,9 @@
      psearch.searchOperation.getClientConnection().deregisterPersistentSearch(
          psearch);
      //Decrement of psearch count maintained by the server.
      DirectoryServer.deregisterPersistentSearch();
      // Notify any cancellation callbacks.
      for (CancellationCallback callback : psearch.cancellationCallbacks)
      {
@@ -774,6 +777,8 @@
  {
    searchOperation.getClientConnection().registerPersistentSearch(this);
    searchOperation.setSendResponse(false);
    //Register itself with the Core.
    DirectoryServer.registerPersistentSearch();
  }
opends/src/server/org/opends/server/extensions/ParallelWorkQueue.java
@@ -104,6 +104,9 @@
  // a configuration change has not been completely applied).
  private int numWorkerThreads;
   // The number of maximum allowed persistent searches.
  private int maxPSearches;
  // The queue that will be used to actually hold the pending operations.
  private ConcurrentLinkedQueue<AbstractOperation> opQueue;
@@ -142,6 +145,17 @@
    // Get the necessary configuration from the provided entry.
    numWorkerThreads = getNumWorkerThreads(configuration);
    //Check the value of the maximum persistent searches attribute.
    //We don't allow a value greater than the number of threads.
    maxPSearches = configuration.getMaxPsearches()==null?
      numWorkerThreads:configuration.getMaxPsearches();
    if(maxPSearches >  numWorkerThreads)
    {
      Message message = ERR_CONFIG_CORE_INVALID_MAX_PSEARCH_LIMIT.get(
              maxPSearches, numWorkerThreads, numWorkerThreads);
      throw new ConfigException(message);
    }
    // Create the actual work queue.
    opQueue = new ConcurrentLinkedQueue<AbstractOperation>();
@@ -498,7 +512,19 @@
                      ParallelWorkQueueCfg configuration,
                      List<Message> unacceptableReasons)
  {
    // The provided configuration will always be acceptable.
    //Check if the max persistent search value is under limit.
    if(configuration.getMaxPsearches() !=null)
    {
      int nPSearches = configuration.getMaxPsearches();
      int nWorkerThreads = getNumWorkerThreads(configuration);
      if(nPSearches >  nWorkerThreads)
      {
        Message message = ERR_CONFIG_CORE_INVALID_MAX_PSEARCH_LIMIT.get(
                nPSearches, nWorkerThreads, nWorkerThreads);
        unacceptableReasons.add(message);
        return false;
      }
    }
    return true;
  }
@@ -550,6 +576,9 @@
        }
      }
    }
     //Get the new maximum psearch value.
     maxPSearches = configuration.getMaxPsearches()==null?
      numWorkerThreads:configuration.getMaxPsearches();
    return new ConfigChangeResult(ResultCode.SUCCESS, false, resultMessages);
  }
@@ -582,6 +611,16 @@
  /**
   * {@inheritDoc}
   */
  public int getMaxPersistentSearchLimit()
  {
    return maxPSearches;
  }
  // Determine the number of worker threads.
  private int getNumWorkerThreads(ParallelWorkQueueCfg configuration)
  {
opends/src/server/org/opends/server/extensions/TraditionalWorkQueue.java
@@ -113,6 +113,9 @@
  // a configuration change has not been completely applied).
  private int numWorkerThreads;
  //The maximum number of concurrent persistent searches.
  private int maxPSearches;
  // The queue that will be used to actually hold the pending operations.
  private LinkedBlockingQueue<AbstractOperation> opQueue;
@@ -152,6 +155,17 @@
    // Get the necessary configuration from the provided entry.
    numWorkerThreads = getNumWorkerThreads(configuration);
    //Check the value of the maximum persistent searches attribute.
    //We don't allow a value greater than the number of threads.
    maxPSearches = configuration.getMaxPsearches()==null?
      numWorkerThreads:configuration.getMaxPsearches();
    if(maxPSearches >  numWorkerThreads)
    {
      Message message = ERR_CONFIG_CORE_INVALID_MAX_PSEARCH_LIMIT.get(
              maxPSearches, numWorkerThreads, numWorkerThreads);
      throw new ConfigException(message);
    }
    maxCapacity      = configuration.getMaxWorkQueueCapacity();
@@ -551,7 +565,19 @@
                      TraditionalWorkQueueCfg configuration,
                      List<Message> unacceptableReasons)
  {
    // The provided configuration will always be acceptable.
    //Check if the max persistent search value is under limit.
    if(configuration.getMaxPsearches() !=null)
    {
      int nPSearches = configuration.getMaxPsearches();
      int nWorkerThreads = getNumWorkerThreads(configuration);
      if(nPSearches >  nWorkerThreads)
      {
        Message message = ERR_CONFIG_CORE_INVALID_MAX_PSEARCH_LIMIT.get(
                nPSearches, nWorkerThreads, nWorkerThreads);
        unacceptableReasons.add(message);
        return false;
      }
    }
    return true;
  }
@@ -607,6 +633,10 @@
    }
   //Get the new maximum psearch value.
    maxPSearches = configuration.getMaxPsearches()==null?
      numWorkerThreads:configuration.getMaxPsearches();
    // Apply a change to the maximum capacity if appropriate.  Since we can't
    // change capacity on the fly, then we'll have to create a new queue and
    // transfer any remaining items into it.  Any thread that is waiting on the
@@ -708,6 +738,15 @@
  }
  /**
   * {@inheritDoc}
   */
  public int getMaxPersistentSearchLimit()
  {
    return maxPSearches;
  }
  // Determine the number of worker threads.
  private int getNumWorkerThreads(TraditionalWorkQueueCfg configuration)
opends/src/server/org/opends/server/tools/LDAPSearch.java
@@ -1771,7 +1771,7 @@
      AtomicInteger nextMessageID = new AtomicInteger(1);
      connection = new LDAPConnection(hostNameValue, portNumber,
                                      connectionOptions, out, err);
      int timeout = connectTimeout.getIntValue();
      int timeout = pSearchInfo.isPresent()?0:connectTimeout.getIntValue();
      connection.connectToHost(bindDNValue, bindPasswordValue, nextMessageID,
          timeout);
opends/src/server/org/opends/server/workflowelement/localbackend/LocalBackendSearchOperation.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 */
package org.opends.server.workflowelement.localbackend;
@@ -251,6 +251,15 @@
      // If there's a persistent search, then register it with the server.
      if (persistentSearch != null)
      {
        //The Core server maintains the count of concurrent persistent searches
        //so that all the backends (Remote and Local)  are aware of it. Verify
        //with the core if we have already reached the threshold.
        if(!DirectoryServer.allowNewPersistentSearch())
        {
          setResultCode(ResultCode.ADMIN_LIMIT_EXCEEDED);
          appendErrorMessage(ERR_MAX_PSEARCH_LIMIT_EXCEEDED.get());
          break searchProcessing;
        }
        wfe.registerPersistentSearch(persistentSearch);
        persistentSearch.enable();
      }
opends/tests/unit-tests-testng/src/server/org/opends/server/controls/PersistentSearchControlTest.java
@@ -22,14 +22,21 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 */
package org.opends.server.controls;
import java.util.ArrayList;
import static org.opends.server.util.ServerConstants.*;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.opends.messages.Message;
import org.opends.server.TestCaseUtils;
import org.opends.server.core.ModifyOperation;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -39,8 +46,13 @@
import org.opends.server.types.*;
import org.opends.server.protocols.asn1.ASN1;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
import org.opends.server.protocols.ldap.LDAPReader;
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.tools.LDAPSearch;
/**
 * Test ChangeNumber and ChangeNumberGenerator
@@ -523,4 +535,78 @@
    toString = toString +")";
    assertEquals(toString, ecnc.toString());
  }
  /**
   * Tests the maximum persistent search limit imposed by the server.
   */
  @Test()
  public void testMaxPSearch() throws Exception
  {
    TestCaseUtils.initializeTestBackend(true);
    //Modify the configuration to allow only 1 concurrent persistent search.
    InternalClientConnection conn =
         InternalClientConnection.getRootConnection();
    ArrayList<ByteString> values = new ArrayList<ByteString>();
    values.add(ByteString.valueOf("1"));
    LDAPAttribute attr = new LDAPAttribute("ds-cfg-max-psearches", values);
    ArrayList<RawModification> mods = new ArrayList<RawModification>();
    mods.add(new LDAPModification(ModificationType.REPLACE, attr));
    ModifyOperation modifyOperation =
         conn.processModify(ByteString.valueOf("cn=Work Queue,cn=config"), mods);
    assertEquals(modifyOperation.getResultCode(), ResultCode.SUCCESS);
    //Create a persistent search request.
    LinkedHashSet<String> attributes = new LinkedHashSet<String>();
    attributes.add("cn");
    List<Control> controls = new LinkedList<Control>();
          // Creates psearch control
    HashSet<PersistentSearchChangeType> changeTypes =
      new HashSet<PersistentSearchChangeType>();
    changeTypes.add(PersistentSearchChangeType.ADD);
    changeTypes.add(PersistentSearchChangeType.DELETE);
    changeTypes.add(PersistentSearchChangeType.MODIFY);
    changeTypes.add(PersistentSearchChangeType.MODIFY_DN);
    PersistentSearchControl persSearchControl = new PersistentSearchControl(
          changeTypes, true, true);
      controls.add(persSearchControl);
    final InternalSearchOperation search =
        conn.processSearch("o=test", SearchScope.BASE_OBJECT,
            DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit
            0, // Time limit
            true, // Types only
            "(objectClass=*)", attributes, controls, null);
    Thread t = new Thread(new Runnable() {
      public void run() {
        try {
          search.run();
        }
        catch(Exception ex) {}
      }
    },"Persistent Search Test");
    t.start();
    t.join(2000);
     //Create a persistent search request.
    final String[] args =
     {
     "-D", "cn=Directory Manager",
     "-w", "password",
      "-h", "127.0.0.1",
      "-p", String.valueOf(TestCaseUtils.getServerLdapPort()),
      "-b", "o=test",
      "-s", "sub",
      "-C","ps:add:true:true",
      "--noPropertiesFile",
      "(objectClass=*)"
    };
    assertEquals(LDAPSearch.mainSearch(args, false,
                  true, null, System.err),11);
    //cancel the persisting persistent search.
    search.cancel(new CancelRequest(true,Message.EMPTY));
  }
}