mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.08.2013 0d88ba4b58e27627ac2cb852d14fe91a0174c6dc
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -48,7 +48,6 @@
import org.opends.server.api.Backend;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.core.*;
@@ -450,8 +449,7 @@
           * Log an error for the repair tool
           * that will need to re-synchronize the servers.
           */
          message = ERR_CANNOT_RECOVER_CHANGES.get(
              baseDn.toNormalizedString());
          message = ERR_CANNOT_RECOVER_CHANGES.get(baseDn.toNormalizedString());
          logError(message);
        }
      } catch (Exception e)
@@ -463,8 +461,7 @@
         * Log an error for the repair tool
         * that will need to re-synchronize the servers.
         */
        message = ERR_CANNOT_RECOVER_CHANGES.get(
            baseDn.toNormalizedString());
        message = ERR_CANNOT_RECOVER_CHANGES.get(baseDn.toNormalizedString());
        logError(message);
      }
      finally
@@ -545,7 +542,6 @@
     * the last CSN seen from all LDAP servers in the topology.
     */
    state = new PersistentServerState(baseDn, serverId, getServerState());
    flushThread = new ServerStateFlush();
    /*
@@ -557,9 +553,7 @@
     */
    generator = getGenerator();
    pendingChanges =
      new PendingChanges(generator, this);
    pendingChanges = new PendingChanges(generator, this);
    remotePendingChanges = new RemotePendingChanges(getServerState());
    // listen for changes on the configuration
@@ -777,9 +771,7 @@
      return false;
    }
    /*
     * Search the domain root entry that is used to save the generation id
     */
    // Search the domain root entry that is used to save the generation id
    ByteString asn1BaseDn = ByteString.valueOf(baseDn.toString());
    Set<String> attributes = new LinkedHashSet<String>(3);
    attributes.add(REPLICATION_GENERATION_ID);
@@ -933,7 +925,6 @@
     * Compare configuration stored in passed fractional configuration
     * attributes with local variable one
     */
    try
    {
      return FractionalConfig.
@@ -1225,7 +1216,7 @@
    // Create a list of filtered attributes for this entry
    Entry concernedEntry = modifyDNOperation.getOriginalEntry();
    List<String> fractionalConcernedAttributes =
    Set<String> fractionalConcernedAttributes =
      createFractionalConcernedAttrList(fractionalConfig,
      concernedEntry.getObjectClasses().keySet());
@@ -1266,9 +1257,9 @@
      }
      boolean attributeToBeFiltered = (fractionalExclusive && found)
          || (!fractionalExclusive && !found);
      if (attributeToBeFiltered &&
        !newRdn.hasAttributeType(attributeType) &&
        !modifyDNOperation.deleteOldRDN())
      if (attributeToBeFiltered
          && !newRdn.hasAttributeType(attributeType)
          && !modifyDNOperation.deleteOldRDN())
      {
        /*
         * A forbidden attribute is in the old RDN and no more in the new RDN,
@@ -1313,7 +1304,7 @@
     * fractional replication configuration
     */
    List<String> fractionalConcernedAttributes =
    Set<String> fractionalConcernedAttributes =
      createFractionalConcernedAttrList(fractionalConfig, classes.keySet());
    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
    if (fractionalExclusive && fractionalConcernedAttributes.isEmpty())
@@ -1444,7 +1435,7 @@
   }
  private static boolean canRemoveAttribute(AttributeType attributeType,
      boolean fractionalExclusive, List<String> fractionalConcernedAttributes)
      boolean fractionalExclusive, Set<String> fractionalConcernedAttributes)
  {
    String attributeName = attributeType.getPrimaryName();
    String attributeOid = attributeType.getOID();
@@ -1459,7 +1450,7 @@
        || (!foundAttribute && !fractionalExclusive);
  }
  private static boolean contains(List<String> fractionalConcernedAttributes,
  private static boolean contains(Set<String> fractionalConcernedAttributes,
      String attributeName, String attributeOid)
  {
    final boolean foundAttribute =
@@ -1478,17 +1469,18 @@
   * @return The list of attributes of the entry to be excluded/included
   * when the operation will be performed.
   */
  private static List<String> createFractionalConcernedAttrList(
  private static Set<String> createFractionalConcernedAttrList(
    FractionalConfig fractionalConfig, Set<ObjectClass> entryObjectClasses)
  {
    /*
     * Is the concerned entry of a type concerned by fractional replication
     * configuration ? If yes, add the matching attribute names to a list of
     * configuration ? If yes, add the matching attribute names to a set of
     * attributes to take into account for filtering
     * (inclusive or exclusive mode)
     * (inclusive or exclusive mode).
     * Using a Set to avoid duplicate attributes (from 2 inheriting classes for
     * instance)
     */
    List<String> fractionalConcernedAttributes = new ArrayList<String>();
    Set<String> fractionalConcernedAttributes = new HashSet<String>();
    // Get object classes the entry matches
    List<String> fractionalAllClassesAttributes =
@@ -1504,31 +1496,14 @@
      {
        if (entryObjectClass.hasNameOrOID(fractionalClass.toLowerCase()))
        {
          List<String> attrList =
            fractionalSpecificClassesAttributes.get(fractionalClass);
          for(String attr : attrList)
          {
            // Avoid duplicate attributes (from 2 inheriting classes for
            // instance)
            if (!fractionalConcernedAttributes.contains(attr))
            {
              fractionalConcernedAttributes.add(attr);
            }
          }
          fractionalConcernedAttributes.addAll(
              fractionalSpecificClassesAttributes.get(fractionalClass));
        }
      }
    }
    /*
     * Add to the list any attribute which is class independent
     */
    for (String attr : fractionalAllClassesAttributes)
    {
      if (!fractionalConcernedAttributes.contains(attr))
      {
        fractionalConcernedAttributes.add(attr);
      }
    }
    // Add to the set any attribute which is class independent
    fractionalConcernedAttributes.addAll(fractionalAllClassesAttributes);
    return fractionalConcernedAttributes;
  }
@@ -1555,7 +1530,7 @@
     */
    Entry modifiedEntry = modifyOperation.getCurrentEntry();
    List<String> fractionalConcernedAttributes =
    Set<String> fractionalConcernedAttributes =
      createFractionalConcernedAttrList(fractionalConfig,
      modifiedEntry.getObjectClasses().keySet());
    boolean fractionalExclusive = fractionalConfig.isFractionalExclusive();
@@ -1612,16 +1587,14 @@
        // found, return immediately the answer;
        return FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES;
      }
      else
      // Found a modification to remove, remove it from the list.
      modsIt.remove();
      result = FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES;
      if (mods.isEmpty())
      {
        // Found a modification to remove, remove it from the list.
        modsIt.remove();
        result = FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES;
        if (mods.isEmpty())
        {
          // This operation must become a no-op as no more modification in it
          return FRACTIONAL_BECOME_NO_OP;
        }
        // This operation must become a no-op as no more modification in it
        return FRACTIONAL_BECOME_NO_OP;
      }
    }
@@ -1854,19 +1827,17 @@
          return new SynchronizationProviderResult.StopProcessing(
              ResultCode.NO_SUCH_OBJECT, null);
        }
        else
        DN entryDN = addOperation.getEntryDN();
        DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
        if (parentDnFromEntryDn != null
            && !parentDnFromCtx.equals(parentDnFromEntryDn))
        {
          DN entryDN = addOperation.getEntryDN();
          DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
          if (parentDnFromEntryDn != null
              && !parentDnFromCtx.equals(parentDnFromEntryDn))
          {
            // parentEntry has been renamed
            // replication name conflict resolution is expected to fix that
            // later in the flow
            return new SynchronizationProviderResult.StopProcessing(
                ResultCode.NO_SUCH_OBJECT, null);
          }
          // parentEntry has been renamed
          // replication name conflict resolution is expected to fix that
          // later in the flow
          return new SynchronizationProviderResult.StopProcessing(
              ResultCode.NO_SUCH_OBJECT, null);
        }
      }
    }
@@ -1971,13 +1942,13 @@
         * another entry.
         * We must not let the change proceed, return a negative
         * result and set the result code to NO_SUCH_OBJECT.
         * When the operation will return, the thread that started the
         * operation will try to find the correct entry and restart a new
         * operation.
         * When the operation will return, the thread that started the operation
         * will try to find the correct entry and restart a new operation.
         */
        return new SynchronizationProviderResult.StopProcessing(
            ResultCode.NO_SUCH_OBJECT, null);
      }
      if (modifyDNOperation.getNewSuperior() != null)
      {
        /*
@@ -1992,6 +1963,7 @@
            ResultCode.NO_SUCH_OBJECT, null);
        }
      }
      /*
       * If the object has been renamed more recently than this
       * operation, cancel the operation.
@@ -2317,10 +2289,7 @@
    LDAPFilter filter = LDAPFilter.createEqualityFilter(DS_SYNC_CONFLICT,
        ByteString.valueOf(freedDN.toString()));
     Set<String> attrs = new LinkedHashSet<String>(1);
     attrs.add(EntryHistorical.HISTORICAL_ATTRIBUTE_NAME);
     attrs.add(EntryHistorical.ENTRYUUID_ATTRIBUTE_NAME);
     attrs.add("*");
     Set<String> attrs = allOperationalAttributes();
     InternalSearchOperation searchOp =  conn.processSearch(
       ByteString.valueOf(baseDn.toString()),
       SearchScope.WHOLE_SUBTREE,
@@ -3606,16 +3575,11 @@
   * @return generationId The retrieved value of generationId
   * @throws DirectoryException When an error occurs.
   */
  private long loadGenerationId()
  throws DirectoryException
  private long loadGenerationId() throws DirectoryException
  {
    long aGenerationId=-1;
    if (debugEnabled())
      TRACER.debugInfo("Attempt to read generation ID from DB " + baseDn);
    ByteString asn1BaseDn = ByteString.valueOf(baseDn.toString());
    boolean found = false;
    LDAPFilter filter;
    try
    {
@@ -3631,6 +3595,7 @@
     * Search the database entry that is used to periodically
     * save the generation id
     */
    ByteString asn1BaseDn = ByteString.valueOf(baseDn.toString());
    Set<String> attributes = new LinkedHashSet<String>(1);
    attributes.add(REPLICATION_GENERATION_ID);
    InternalSearchOperation search = conn.processSearch(asn1BaseDn,
@@ -3639,6 +3604,9 @@
        filter,attributes);
    if (search.getResultCode() == ResultCode.NO_SUCH_OBJECT)
    {
      // FIXME JNR Am I dreaming, or next code is doing exactly the same thing
      // as code before if statement?
      // if the base entry does not exist look for the generationID
      // in the config entry.
      asn1BaseDn = ByteString.valueOf(baseDn.toString());
@@ -3647,6 +3615,9 @@
          DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
          filter,attributes);
    }
    boolean found = false;
    long aGenerationId = -1;
    if (search.getResultCode() != ResultCode.SUCCESS)
    {
      if (search.getResultCode() != ResultCode.NO_SUCH_OBJECT)
@@ -3737,60 +3708,6 @@
   * Total Update >>
   */
  /**
   * Clears all the entries from the JE backend determined by the
   * be id passed into the method.
   *
   * @param  createBaseEntry  Indicate whether to automatically create the base
   *                          entry and add it to the backend.
   * @param beID  The be id to clear.
   * @param dn   The suffix of the backend to create if the the createBaseEntry
   *             boolean is true.
   * @throws Exception  If an unexpected problem occurs.
   */
  public static void clearJEBackend(boolean createBaseEntry, String beID,
      String dn) throws Exception
  {
    BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
    // FIXME Should setBackendEnabled be part of TaskUtils ?
    TaskUtils.disableBackend(beID);
    try
    {
      String lockFile = LockFileManager.getBackendLockFileName(backend);
      StringBuilder failureReason = new StringBuilder();
      if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
      {
        throw new RuntimeException(failureReason.toString());
      }
      try
      {
        backend.clearBackend();
      }
      finally
      {
        LockFileManager.releaseLock(lockFile, failureReason);
      }
    }
    finally
    {
      TaskUtils.enableBackend(beID);
    }
    if (createBaseEntry)
    {
      DN baseDN = DN.decode(dn);
      Entry e = createEntry(baseDN);
      backend = (BackendImpl)DirectoryServer.getBackend(beID);
      backend.addEntry(e, null);
    }
  }
  /**
   * This method trigger an export of the replicated data.
   *
@@ -3817,13 +3734,10 @@
   *
   * @throws DirectoryException when an error occurred
   */
  protected long exportBackend(OutputStream output, boolean checksumOutput)
  throws DirectoryException
  private long exportBackend(OutputStream output, boolean checksumOutput)
      throws DirectoryException
  {
    long genID = 0;
    Backend backend = retrievesBackend(this.baseDn);
    long numberOfEntries = backend.numSubordinates(baseDn, true) + 1;
    long entryCount = Math.min(numberOfEntries, 1000);
    //  Acquire a shared lock for the backend.
    try
@@ -3835,8 +3749,7 @@
        Message message = ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(
            backend.getBackendID(), String.valueOf(failureReason));
        logError(message);
        throw new DirectoryException(
            ResultCode.OTHER, message, null);
        throw new DirectoryException(ResultCode.OTHER, message, null);
      }
    }
    catch (Exception e)
@@ -3845,13 +3758,13 @@
          ERR_LDIFEXPORT_CANNOT_LOCK_BACKEND.get(
                  backend.getBackendID(), e.getLocalizedMessage());
      logError(message);
      throw new DirectoryException(
          ResultCode.OTHER, message, null);
      throw new DirectoryException(ResultCode.OTHER, message, null);
    }
    long numberOfEntries = backend.numSubordinates(baseDn, true) + 1;
    long entryCount = Math.min(numberOfEntries, 1000);
    OutputStream os;
    ReplLDIFOutputStream ros = null;
    if (checksumOutput)
    {
      ros = new ReplLDIFOutputStream(entryCount);
@@ -3869,11 +3782,11 @@
    {
      os = output;
    }
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
    // baseDn branch is the only one included in the export
    List<DN> includeBranches = new ArrayList<DN>(1);
    includeBranches.add(this.baseDn);
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
    exportConfig.setIncludeBranches(includeBranches);
    // For the checksum computing mode, only consider the 'stable' attributes
@@ -3895,6 +3808,7 @@
    }
    //  Launch the export.
    long genID = 0;
    try
    {
      backend.exportLDIF(exportConfig);
@@ -3907,8 +3821,7 @@
        Message message =
            ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(de.getMessageObject());
        logError(message);
        throw new DirectoryException(
            ResultCode.OTHER, message, null);
        throw new DirectoryException(ResultCode.OTHER, message, null);
      }
    }
    catch (Exception e)
@@ -3916,8 +3829,7 @@
      Message message = ERR_LDIFEXPORT_ERROR_DURING_EXPORT.get(
          stackTraceToSingleLineString(e));
      logError(message);
      throw new DirectoryException(
          ResultCode.OTHER, message, null);
      throw new DirectoryException(ResultCode.OTHER, message, null);
    }
    finally
    {
@@ -3940,8 +3852,7 @@
          Message message = WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(
              backend.getBackendID(), String.valueOf(failureReason));
          logError(message);
          throw new DirectoryException(
              ResultCode.OTHER, message, null);
          throw new DirectoryException(ResultCode.OTHER, message, null);
        }
      }
      catch (Exception e)
@@ -3949,8 +3860,7 @@
        Message message = WARN_LDIFEXPORT_CANNOT_UNLOCK_BACKEND.get(
            backend.getBackendID(), stackTraceToSingleLineString(e));
        logError(message);
        throw new DirectoryException(
            ResultCode.OTHER, message, null);
        throw new DirectoryException(ResultCode.OTHER, message, null);
      }
    }
    return genID;
@@ -3973,8 +3883,7 @@
   * @param backend The backend.
   * @throws Exception
   */
  private void preBackendImport(Backend backend)
  throws Exception
  private void preBackendImport(Backend backend) throws Exception
  {
    // Stop saving state
    stateSavingDisabled = true;
@@ -4015,8 +3924,7 @@
        Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
            backend.getBackendID());
        if (ieContext.getException() == null)
          ieContext.setException(new DirectoryException(ResultCode.OTHER,
              message));
          ieContext.setException(new DirectoryException(OTHER, message));
      }
      else
      {
@@ -4133,15 +4041,13 @@
    LDAPReplicationDomain replicationDomain = null;
    // Retrieves the domain
    DirectoryServer.getSynchronizationProviders();
    for (SynchronizationProvider<?> provider :
      DirectoryServer.getSynchronizationProviders())
    {
      if (!( provider instanceof MultimasterReplication))
      {
        Message message = ERR_INVALID_PROVIDER.get();
        throw new DirectoryException(ResultCode.OTHER,
            message);
        throw new DirectoryException(ResultCode.OTHER, message);
      }
      // From the domainDN retrieves the replication domain
@@ -4155,8 +4061,7 @@
      {
        // Should never happen
        Message message = ERR_MULTIPLE_MATCHING_DOMAIN.get();
        throw new DirectoryException(ResultCode.OTHER,
            message);
        throw new DirectoryException(ResultCode.OTHER, message);
      }
      replicationDomain = domain;
    }
@@ -4271,10 +4176,8 @@
        configuration.getHeartbeatInterval(),
        (byte)configuration.getGroupId());
    // Read assured configuration and reconnect if needed
    // Read assured + fractional configuration and each time reconnect if needed
    readAssuredConfig(configuration, true);
    // Read fractional configuration and reconnect if needed
    readFractionalConfig(configuration, true);
    solveConflictFlag = isSolveConflict(configuration);
@@ -4574,14 +4477,11 @@
      Iterator<CSN> it = replayOperations.keySet().iterator();
      while (it.hasNext())
      {
        if (it.next().olderOrEqual(startCSN))
        {
          it.remove();
        }
        else
        if (it.next().newer(startCSN))
        {
          break;
        }
        it.remove();
      }
    }
@@ -4591,12 +4491,11 @@
    do
    {
      lastRetrievedChange = null;
      // We can't do the search in one go because we need to
      // store the results so that we are sure we send the operations
      // in order and because the list might be large
      // So we search by interval of 10 seconds
      // and store the results in the replayOperations list
      // so that they are sorted before sending them.
      // We can't do the search in one go because we need to store the results
      // so that we are sure we send the operations in order and because the
      // list might be large.
      // So we search by interval of 10 seconds and store the results in the
      // replayOperations list so that they are sorted before sending them.
      long missingChangesDelta = currentStartCSN.getTime() + 10000;
      CSN endCSN = new CSN(missingChangesDelta, 0xffffffff, serverId);
@@ -4613,17 +4512,15 @@
        while (itOp.hasNext())
        {
          FakeOperation fakeOp = itOp.next();
          if (fakeOp.getCSN().olderOrEqual(endCSN)
              && state.cover(fakeOp.getCSN()))
          {
            lastRetrievedChange = fakeOp.getCSN();
            opsToSend.add(fakeOp);
            itOp.remove();
          }
          else
          if (fakeOp.getCSN().newer(endCSN) // sanity check
              || !state.cover(fakeOp.getCSN()))
          {
            break;
          }
          lastRetrievedChange = fakeOp.getCSN();
          opsToSend.add(fakeOp);
          itOp.remove();
        }
      }
@@ -4665,7 +4562,7 @@
   * @throws Exception
   *           when raised.
   */
  public static InternalSearchOperation searchForChangedEntries(DN baseDn,
  private static InternalSearchOperation searchForChangedEntries(DN baseDn,
      CSN fromCSN, CSN lastCSN, InternalSearchListener resultListener)
      throws Exception
  {
@@ -4688,10 +4585,7 @@
        "(&(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + fromCSN + ")" +
          "(" + HISTORICAL_ATTRIBUTE_NAME + "<=dummy:" + maxValueForId + "))");
    Set<String> attrs = new LinkedHashSet<String>(3);
    attrs.add(HISTORICAL_ATTRIBUTE_NAME);
    attrs.add(ENTRYUUID_ATTRIBUTE_NAME);
    attrs.add("*");
    Set<String> attrs = allOperationalAttributes();
    return conn.processSearch(
      ByteString.valueOf(baseDn.toString()),
      SearchScope.WHOLE_SUBTREE,
@@ -4701,6 +4595,15 @@
      resultListener);
  }
  private static Set<String> allOperationalAttributes()
  {
    Set<String> attrs = new LinkedHashSet<String>(3);
    attrs.add(HISTORICAL_ATTRIBUTE_NAME);
    attrs.add(ENTRYUUID_ATTRIBUTE_NAME);
    attrs.add("*");
    return attrs;
  }
  /**
   * Search for the changes that happened since fromCSN based on the historical
   * attribute. The only changes that will be send will be the one generated on
@@ -5252,11 +5155,9 @@
          throw new ConfigException(
            NOTE_ERR_FRACTIONAL_CONFIG_BOTH_MODES.get());
        }
        else
        {
          fractionalMode = EXCLUSIVE_FRACTIONAL;
          iterator = exclIt;
        }
        fractionalMode = EXCLUSIVE_FRACTIONAL;
        iterator = exclIt;
      }
      else
      {
@@ -5487,10 +5388,7 @@
       // Not possible. We know the filter just above is correct.
     }
     Set<String> attrs = new LinkedHashSet<String>(3);
     attrs.add(HISTORICAL_ATTRIBUTE_NAME);
     attrs.add(ENTRYUUID_ATTRIBUTE_NAME);
     attrs.add("*");
     Set<String> attrs = allOperationalAttributes();
     InternalSearchOperation searchOp =  conn.processSearch(
         ByteString.valueOf(baseDn.toString()),
         SearchScope.WHOLE_SUBTREE,
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -164,9 +164,10 @@
  /**
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
   *
   * @param baseDn The baseDn associated to the ReplicationServerDomain.
   * @param localReplicationServer the ReplicationServer that created this
   *                          replicationServer cache.
   * @param baseDn
   *          The baseDn associated to the ReplicationServerDomain.
   * @param localReplicationServer
   *          the ReplicationServer that created this instance.
   */
  public ReplicationServerDomain(String baseDn,
      ReplicationServer localReplicationServer)
@@ -1126,12 +1127,14 @@
  /**
   * This method resets the generationId for this domain if there is no LDAP
   * server currently connected in the whole topology on this domain and
   * if the generationId has never been saved.
   *
   * - test emptiness of directoryServers list
   * - traverse replicationServers list and test for each if DS are connected
   * So it strongly relies on the directoryServers list
   * server currently connected in the whole topology on this domain and if the
   * generationId has never been saved.
   * <ul>
   * <li>test emptiness of {@link #connectedDSs} list</li>
   * <li>traverse {@link #connectedRSs} list and test for each if DS are
   * connected</li>
   * </ul>
   * So it strongly relies on the {@link #connectedDSs} list
   */
  private void resetGenerationIdIfPossible()
  {
@@ -2442,21 +2445,25 @@
  /**
   * A synchronization mechanism is created to insure exclusive access to the
   * domain. The goal is to have a consistent view of the topology by locking
   * the structures holding the topology view of the domain: directoryServers
   * and replicationServers. When a connection is established with a peer DS or
   * RS, the lock should be taken before updating these structures, then
   * released. The same mechanism should be used when updating any data related
   * to the view of the topology: for instance if the status of a DS is changed,
   * the lock should be taken before updating the matching server handler and
   * sending the topology messages to peers and released after.... This allows
   * every member of the topology to have a consistent view of the topology and
   * to be sure it will not miss some information.
   * the structures holding the topology view of the domain:
   * {@link #connectedDSs} and {@link #connectedRSs}. When a connection is
   * established with a peer DS or RS, the lock should be taken before updating
   * these structures, then released. The same mechanism should be used when
   * updating any data related to the view of the topology: for instance if the
   * status of a DS is changed, the lock should be taken before updating the
   * matching server handler and sending the topology messages to peers and
   * released after.... This allows every member of the topology to have a
   * consistent view of the topology and to be sure it will not miss some
   * information.
   * <p>
   * So the locking system must be called (not exhaustive list):
   * - when connection established with a DS or RS
   * - when connection ended with a DS or RS
   * - when receiving a TopologyMsg and updating structures
   * - when creating and sending a TopologyMsg
   * - when a DS status is changing (ChangeStatusMsg received or sent)...
   * <ul>
   * <li>when connection established with a DS or RS</li>
   * <li>when connection ended with a DS or RS</li>
   * <li>when receiving a TopologyMsg and updating structures</li>
   * <li>when creating and sending a TopologyMsg</li>
   * <li>when a DS status is changing (ChangeStatusMsg received or sent)...</li>
   * </ul>
   */
  private final ReentrantLock lock = new ReentrantLock();
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -50,6 +50,7 @@
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.LockFileManager;
import org.opends.server.extensions.ConfigFileHandler;
import org.opends.server.loggers.*;
import org.opends.server.loggers.debug.DebugLogger;
@@ -63,6 +64,7 @@
import org.opends.server.protocols.ldap.BindResponseProtocolOp;
import org.opends.server.protocols.ldap.LDAPMessage;
import org.opends.server.protocols.ldap.LDAPReader;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.tools.LDAPModify;
import org.opends.server.tools.dsconfig.DSConfig;
import org.opends.server.types.*;
@@ -951,8 +953,64 @@
  }
  /**
   * This was used to track down which test was trashing the indexes.
   * We left it here because it might be useful again.
   * Clears all the entries from the JE backend determined by the be id passed
   * into the method.
   *
   * @param createBaseEntry
   *          Indicate whether to automatically create the base entry and add it
   *          to the backend.
   * @param beID
   *          The be id to clear.
   * @param dn
   *          The suffix of the backend to create if the the createBaseEntry
   *          boolean is true.
   * @throws Exception
   *           If an unexpected problem occurs.
   */
  public static void clearJEBackend2(boolean createBaseEntry, String beID, String dn)
      throws Exception
  {
    BackendImpl backend = (BackendImpl) DirectoryServer.getBackend(beID);
    // FIXME Should setBackendEnabled be part of TaskUtils ?
    TaskUtils.disableBackend(beID);
    try
    {
      String lockFile = LockFileManager.getBackendLockFileName(backend);
      StringBuilder failureReason = new StringBuilder();
      if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
      {
        throw new RuntimeException(failureReason.toString());
      }
      try
      {
        backend.clearBackend();
      }
      finally
      {
        LockFileManager.releaseLock(lockFile, failureReason);
      }
    }
    finally
    {
      TaskUtils.enableBackend(beID);
    }
    if (createBaseEntry)
    {
      DN baseDN = DN.decode(dn);
      Entry e = createEntry(baseDN);
      backend = (BackendImpl) DirectoryServer.getBackend(beID);
      backend.addEntry(e, null);
    }
  }
  /**
   * This was used to track down which test was trashing the indexes. We left it
   * here because it might be useful again.
   */
  public static void printUntrustedIndexes()
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -93,17 +93,16 @@
   * The tracer object for the debug logger
   */
  private static final DebugTracer TRACER = getTracer();
  private static final int WINDOW_SIZE = 10;
  /**
   * A "person" entry
   */
  protected Entry taskInitFromS2;
  protected Entry taskInitTargetS2;
  protected Entry taskInitTargetAll;
  private Entry taskInitFromS2;
  private Entry taskInitTargetS2;
  private Entry taskInitTargetAll;
  protected String[] updatedEntries;
  private String[] updatedEntries;
  private static final int server1ID = 1;
  private static final int server2ID = 2;
  private static final int server3ID = 3;
@@ -112,19 +111,17 @@
  private static final int changelog3ID = 10;
  private static final String EXAMPLE_DN = "dc=example,dc=com";
  private static int[] replServerPort = new int[20];
  private DN baseDn;
  ReplicationBroker server2 = null;
  ReplicationBroker server3 = null;
  ReplicationServer changelog1 = null;
  ReplicationServer changelog2 = null;
  ReplicationServer changelog3 = null;
  boolean emptyOldChanges = true;
  LDAPReplicationDomain replDomain = null;
  int initWindow = 100;
  private ReplicationBroker server2;
  private ReplicationBroker server3;
  private ReplicationServer changelog1;
  private ReplicationServer changelog2;
  private ReplicationServer changelog3;
  private boolean emptyOldChanges = true;
  private LDAPReplicationDomain replDomain;
  private int initWindow = 100;
  private void log(String s)
  {
@@ -135,7 +132,8 @@
      TRACER.debugInfo(s);
    }
  }
  protected void log(String message, Exception e)
  private void log(String message, Exception e)
  {
    log(message + stackTraceToSingleLineString(e));
  }
@@ -160,9 +158,8 @@
    // This test uses import tasks which do not work with memory backend
    // (like the test backend we use in every tests): backend is disabled then
    // re-enabled and this clears the backend reference and thus the underlying
    // data. So for this particular test, we use a classical backend. Let's
    // clear it.
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    // data. So for this particular test, we use a classical backend.
    TestCaseUtils.clearJEBackend2(false, "userRoot", EXAMPLE_DN);
    // For most tests, a limited number of entries is enough
    updatedEntries = newLDIFEntries(2);
@@ -605,8 +602,7 @@
    + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
    + "ds-cfg-window-size: " + WINDOW_SIZE;
    // Clear the backend
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    TestCaseUtils.clearJEBackend2(false, "userRoot", EXAMPLE_DN);
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
@@ -1457,7 +1453,7 @@
   * Disconnect broker and remove entries from the local DB
   * @param testCase The name of the test case.
   */
  protected void afterTest(String testCase)
  private void afterTest(String testCase)
  {
    // Check that the domain has completed the import/export task.
@@ -1545,8 +1541,7 @@
    callParanoiaCheck = false;
    super.classCleanUp();
    // Clear the backend
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    TestCaseUtils.clearJEBackend2(false, "userRoot", EXAMPLE_DN);
    paranoiaCheck();
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
@@ -27,10 +27,6 @@
 */
package org.opends.server.replication;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.testng.Assert.*;
import java.io.File;
import java.util.UUID;
@@ -41,7 +37,6 @@
import org.opends.server.core.AddOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
@@ -49,12 +44,16 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.testng.Assert.*;
/**
 * Test re-synchronization after after backup/restore and LDIF import.
 */
public class ReSyncTest extends ReplicationTestCase
{
  // The tracer object for the debug logger
  /** The tracer object for the debug logger */
  private static final DebugTracer TRACER = getTracer();
  private void debugInfo(String s)
@@ -66,8 +65,7 @@
    }
  }
  protected static final String EXAMPLE_DN = "dc=example,dc=com";
  private static final String EXAMPLE_DN = "dc=example,dc=com";
  private File reSyncTempDir;
 /**
@@ -95,8 +93,7 @@
    // re-enabled and this clears the backend reference and thus the underlying
    // data. So for this particular test, we use a classical backend. Let's
    // clear it and create the root entry
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    TestCaseUtils.clearJEBackend2(false, "userRoot", EXAMPLE_DN);
    addEntry("dn: dc=example,dc=com\n" + "objectClass: top\n"
        + "objectClass: domain\n");
@@ -268,9 +265,7 @@
    callParanoiaCheck = false;
    super.classCleanUp();
    // Clear the backend
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    TestCaseUtils.clearJEBackend2(false, "userRoot", EXAMPLE_DN);
    TestCaseUtils.deleteDirectory(reSyncTempDir);
    paranoiaCheck();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -1006,8 +1006,7 @@
    // We need a backend with a real configuration in cn=config as at import time
    // the real domain will check for backend existence in cn=config. So we use
    // dc=example,dc=com for this particular test.
    // Clear the backend
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", TEST2_ROOT_DN_STRING);
    clearJEBackend2(false, "userRoot", TEST2_ROOT_DN_STRING);
    try
    {
@@ -1245,8 +1244,7 @@
    // We need a backend with a real configuration in cn=config as at import time
    // the real domain will check for backend existence in cn=config. So we use
    // dc=example,dc=com for this particular test.
    // Clear the backend
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", TEST2_ROOT_DN_STRING);
    clearJEBackend2(false, "userRoot", TEST2_ROOT_DN_STRING);
    try
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/StateMachineTest.java
@@ -56,7 +56,6 @@
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.AfterClass;
@@ -66,7 +65,6 @@
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
@@ -84,10 +82,10 @@
  private static final int DS3_ID = 3;
  private static final int RS1_ID = 41;
  private int rs1Port = -1;
  private LDAPReplicationDomain ds1 = null;
  private ReplicationBroker ds2 = null;
  private ReplicationBroker ds3 = null;
  private ReplicationServer rs1 = null;
  private LDAPReplicationDomain ds1;
  private ReplicationBroker ds2;
  private ReplicationBroker ds3;
  private ReplicationServer rs1;
  /** The tracer object for the debug logger */
  private static final DebugTracer TRACER = getTracer();
  private int initWindow = 100;
@@ -110,7 +108,7 @@
    rs1Port = TestCaseUtils.findFreePort();
  }
  private void endTest()
  private void endTest() throws Exception
  {
    if (ds1 != null)
    {
@@ -118,14 +116,8 @@
      ds1 = null;
    }
    try
    {
      // Clear any reference to a domain in synchro plugin
      MultimasterReplication.deleteDomain(DN.decode(EXAMPLE_DN));
    } catch (DirectoryException ex)
    {
      fail("Error deleting reference to domain: " + EXAMPLE_DN);
    }
    // Clear any reference to a domain in synchro plugin
    MultimasterReplication.deleteDomain(DN.decode(EXAMPLE_DN));
    if (ds2 != null)
    {
@@ -151,19 +143,13 @@
    rs1Port = -1;
  }
  private void sleep(long time) throws InterruptedException
  {
    Thread.sleep(time);
  }
  /**
   * Check connection of the provided ds to the
   * replication server. Waits for connection to be ok up to secTimeout seconds
   * before failing.
   */
  private void checkConnection(int secTimeout, int dsId)
  private void checkConnection(int secTimeout, int dsId) throws Exception
  {
    ReplicationBroker rb = null;
    LDAPReplicationDomain rd = null;
    switch (dsId)
@@ -201,22 +187,13 @@
        return;
      }
      // Sleep 1 second
      try
      {
        Thread.sleep(100);
      } catch (InterruptedException ex)
      {
        fail("Error sleeping " + stackTraceToSingleLineString(ex));
      }
      Thread.sleep(100);
      nSec++;
      if (nSec > secTimeout*10)
      {
        // Timeout reached, end with error
        fail("checkConnection: DS " + dsId + " is not connected to the RS after "
      // Timeout reached, end with error
      assertFalse(nSec > secTimeout * 10,
          "checkConnection: DS " + dsId + " is not connected to the RS after "
          + secTimeout + " seconds.");
      }
    }
  }
@@ -390,7 +367,6 @@
      // having sent them to TCP receive queue of DS2.
      bw = new BrokerWriter(ds3, DS3_ID, false);
      bw.followAndPause(11);
      // sleep(1000);
      /**
       * DS3 sends changes (less than threshold): DS2 should still be in normal
@@ -402,7 +378,7 @@
      {
        nChangesSent = thresholdValue - 1;
        bw.followAndPause(nChangesSent);
        sleep(1000); // Be sure status analyzer has time to test
        Thread.sleep(1000); // Be sure status analyzer has time to test
        ReplicationMsg msg = br3.getLastMsg();
        debugInfo(testCase + " Step 1: last message from writer: " + msg);
        assertTrue(msg == null, (msg != null) ? msg.toString() : "null" );
@@ -414,28 +390,7 @@
       */
      bw.followAndPause(thresholdValue - nChangesSent);
      // wait for a status MSG status analyzer to broker 3
      ReplicationMsg lastMsg = null;
      for (int count = 0; count< 50; count++)
      {
        List<DSInfo> dsList = ds3.getDsList();
        DSInfo ds3Info = null;
        if (dsList.size() > 0)
        {
          ds3Info = dsList.get(0);
        }
        if ((ds3Info != null) && (ds3Info.getDsId() == DS2_ID) &&
            (ds3Info.getStatus()== ServerStatus.DEGRADED_STATUS) )
        {
          break;
        }
        else
        {
          if (count < 50)
            sleep(200); // Be sure status analyzer has time to test
          else
            fail("DS2 did not get degraded : " + ds3Info);
        }
      }
      waitForDegradedStatusOnBroker3();
      /**
       * DS3 sends 10 additional changes after threshold value, DS2 should still be
@@ -443,8 +398,8 @@
       */
      bw.followAndPause(10);
      bw.shutdown();
      sleep(1000); // Be sure status analyzer has time to test
      lastMsg = br3.getLastMsg();
      Thread.sleep(1000); // Be sure status analyzer has time to test
      ReplicationMsg lastMsg = br3.getLastMsg();
      ReplicationMsg msg = br3.getLastMsg();
      debugInfo(testCase + " Step 3: last message from writer: " + msg);
      assertTrue(lastMsg == null);
@@ -455,27 +410,7 @@
       */
      br2 = new BrokerReader(ds2, DS2_ID);
      // wait for a status MSG status analyzer to broker 3
      for (int count = 0; count< 50; count++)
      {
        List<DSInfo> dsList = ds3.getDsList();
        DSInfo ds3Info = null;
        if (dsList.size() > 0)
        {
          ds3Info = dsList.get(0);
        }
        if ((ds3Info != null) && (ds3Info.getDsId() == DS2_ID) &&
            (ds3Info.getStatus()== ServerStatus.DEGRADED_STATUS) )
        {
          break;
        }
        else
        {
          if (count < 50)
            sleep(200); // Be sure status analyzer has time to test
          else
            fail("DS2 did not get degraded.");
        }
      }
      waitForDegradedStatusOnBroker3();
    } finally
    {
@@ -486,6 +421,28 @@
    }
  }
  private void waitForDegradedStatusOnBroker3() throws InterruptedException
  {
    for (int count = 0; count< 50; count++)
    {
      List<DSInfo> dsList = ds3.getDsList();
      DSInfo ds3Info = null;
      if (dsList.size() > 0)
      {
        ds3Info = dsList.get(0);
      }
      if (ds3Info != null
          && ds3Info.getDsId() == DS2_ID
          && ds3Info.getStatus() == ServerStatus.DEGRADED_STATUS)
      {
        break;
      }
      assertTrue(count < 50, "DS2 did not get degraded : " + ds3Info);
      Thread.sleep(200); // Be sure status analyzer has time to test
    }
  }
  /**
   * Go through the possible state machine transitions:
   *
@@ -515,24 +472,18 @@
      int DEGRADED_STATUS_THRESHOLD = 1;
      /**
       * RS1 starts with 1 message as degraded status threshold value
       */
      // RS1 starts with 1 message as degraded status threshold value
      rs1 = createReplicationServer(testCase, DEGRADED_STATUS_THRESHOLD);
      /**
       * DS2 starts and connects to RS1
       */
      // DS2 starts and connects to RS1
      ds2 = createReplicationBroker(DS2_ID, new ServerState(), EMPTY_DN_GENID);
      br = new BrokerReader(ds2, DS2_ID);
      checkConnection(30, DS2_ID);
      /**
       * DS2 starts sending a lot of changes
       */
      // DS2 starts sending a lot of changes
      bw = new BrokerWriter(ds2, DS2_ID, false);
      bw.follow();
      sleep(1000); // Let some messages being queued in RS
      Thread.sleep(1000); // Let some messages being queued in RS
      /**
       * DS1 starts and connects to RS1, server state exchange should lead to
@@ -581,7 +532,7 @@
       */
      bw = new BrokerWriter(ds2, DS2_ID, false);
      bw.follow();
      sleep(8000); // Let some messages being queued in RS, and analyzer see the change
      Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
      sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
      /**
@@ -646,7 +597,7 @@
      bw = new BrokerWriter(ds2, DS2_ID, false);
      br = new BrokerReader(ds2, DS2_ID);
      bw.follow();
      sleep(8000); // Let some messages being queued in RS, and analyzer see the change
      Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
      sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
      /**
@@ -685,7 +636,7 @@
      bw = new BrokerWriter(ds2, DS2_ID, false);
      br = new BrokerReader(ds2, DS2_ID);
      bw.follow();
      sleep(8000); // Let some messages being queued in RS, and analyzer see the change
      Thread.sleep(8000); // Let some messages being queued in RS, and analyzer see the change
      sleepAssertStatusEquals(30, ds1, ServerStatus.DEGRADED_STATUS);
      /**
@@ -769,9 +720,7 @@
    // going into degraded status, we need to send a lot of updates. This makes
    // the memory test backend crash with OutOfMemoryError. So we prefer here
    // a backend backed up with a file
    // Clear the backend
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    TestCaseUtils.clearJEBackend2(false, "userRoot", EXAMPLE_DN);
  }
@@ -787,8 +736,7 @@
    callParanoiaCheck = false;
    super.classCleanUp();
    // Clear the backend
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
    TestCaseUtils.clearJEBackend2(false, "userRoot", EXAMPLE_DN);
    paranoiaCheck();
  }
@@ -1094,13 +1042,7 @@
      // Wait for all messages sent
      while (!sessionDone.get())
      {
        try
        {
          Thread.sleep(200);
        } catch (InterruptedException ex)
        {
          /* Don't care */
        }
        TestCaseUtils.sleep(200);
      }
    }
@@ -1144,13 +1086,7 @@
      // Wait for all messages sent
      while (!sessionDone.get())
      {
        try
        {
          Thread.sleep(1000);
        } catch (InterruptedException ex)
        {
          /* Don't care */
        }
        TestCaseUtils.sleep(1000);
      }
      careAboutAmountOfChanges = false;
    }
@@ -1182,7 +1118,7 @@
          entryWithUUIDldif);
      } catch (Exception e)
      {
        fail(e.getMessage());
        throw new RuntimeException(e);
      }
      // Create an update message to add an entry.
@@ -1206,10 +1142,10 @@
  private class BrokerReader extends Thread
  {
    private ReplicationBroker rb = null;
    private ReplicationBroker rb;
    private int serverId = -1;
    private boolean shutdown = false;
    private ReplicationMsg lastMsg = null;
    private boolean shutdown;
    private ReplicationMsg lastMsg;
    public BrokerReader(ReplicationBroker rb, int serverId)
    {
@@ -1290,24 +1226,15 @@
   * @param expectedValue The value the tested value should be equal to
   */
  private void sleepAssertStatusEquals(int secTimeout, LDAPReplicationDomain testedValue,
    ServerStatus expectedValue)
    ServerStatus expectedValue) throws Exception
  {
    int nSec = 0;
    if (testedValue == null || expectedValue == null)
      fail("sleepAssertStatusEquals: null parameters");
    assertTrue(testedValue != null && expectedValue != null, "sleepAssertStatusEquals: null parameters");
    // Go out of the loop only if equality is obtained or if timeout occurs
    int nSec = 0;
    while (true)
    {
      // Sleep 1 second
      try
      {
        Thread.sleep(1000);
      } catch (InterruptedException ex)
      {
        fail("Error sleeping " + stackTraceToSingleLineString(ex));
      }
      Thread.sleep(1000);
      nSec++;
      // Test equality of values
@@ -1318,12 +1245,10 @@
        return;
      }
      if (nSec == secTimeout)
      {
        // Timeout reached, end with error
        fail("sleepAssertStatusEquals: got <" + testedValue.getStatus()
          + "> where expected <" + expectedValue + ">");
      }
      // Timeout reached, end with error
      assertTrue(nSec < secTimeout, "sleepAssertStatusEquals: got <"
          + testedValue.getStatus() + "> where expected <" + expectedValue
          + ">");
    }
  }
}