mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
24.20.2013 8c8415177610baee0fc1c615926f21da621f0836
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -25,7 +25,6 @@
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
@@ -37,14 +36,12 @@
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.io.*;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
@@ -63,14 +60,7 @@
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.LockFileManager;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.ModifyOperationBasis;
import org.opends.server.core.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -80,12 +70,7 @@
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
@@ -93,16 +78,7 @@
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.*;
import org.opends.server.types.operation.PluginOperation;
import org.opends.server.types.operation.PostOperationAddOperation;
import org.opends.server.types.operation.PostOperationDeleteOperation;
import org.opends.server.types.operation.PostOperationModifyDNOperation;
import org.opends.server.types.operation.PostOperationModifyOperation;
import org.opends.server.types.operation.PostOperationOperation;
import org.opends.server.types.operation.PreOperationAddOperation;
import org.opends.server.types.operation.PreOperationDeleteOperation;
import org.opends.server.types.operation.PreOperationModifyDNOperation;
import org.opends.server.types.operation.PreOperationModifyOperation;
import org.opends.server.types.operation.*;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -191,8 +167,10 @@
   */
  private static final DebugTracer TRACER = getTracer();
  // The update to replay message queue where the listener thread is going to
  // push incoming update messages.
  /**
   * The update to replay message queue where the listener thread is going to
   * push incoming update messages.
   */
  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
  private final AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
  private final AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
@@ -238,9 +216,11 @@
  private volatile boolean disabled = false;
  private volatile boolean stateSavingDisabled = false;
  // This list is used to temporary store operations that needs
  // to be replayed at session establishment time.
  private final TreeMap<ChangeNumber, FakeOperation> replayOperations  =
  /**
   * This list is used to temporary store operations that needs to be replayed
   * at session establishment time.
   */
  private final SortedMap<ChangeNumber, FakeOperation> replayOperations =
    new TreeMap<ChangeNumber, FakeOperation>();
  /**
@@ -288,7 +268,7 @@
   * Fractional replication variables.
   */
  // Holds the fractional configuration for this domain, if any.
  /** Holds the fractional configuration for this domain, if any. */
  private FractionalConfig fractionalConfig = null;
  /**
@@ -341,29 +321,39 @@
   * fractionalFilterOperation(PreOperationModifyOperation
   *  modifyOperation, boolean performFiltering) method
   */
  // The operation contains attributes subject to fractional filtering according
  // to the fractional configuration
  /**
   * The operation contains attributes subject to fractional filtering according
   * to the fractional configuration.
   */
  private static final int FRACTIONAL_HAS_FRACTIONAL_FILTERED_ATTRIBUTES = 1;
  // The operation contains no attributes subject to fractional filtering
  // according to the fractional configuration
  /**
   * The operation contains no attributes subject to fractional filtering
   * according to the fractional configuration.
   */
  private static final int FRACTIONAL_HAS_NO_FRACTIONAL_FILTERED_ATTRIBUTES = 2;
  // The operation should become a no-op
  /** The operation should become a no-op. */
  private static final int FRACTIONAL_BECOME_NO_OP = 3;
  // This configuration boolean indicates if this ReplicationDomain should log
  // ChangeNumbers.
  /**
   * This configuration boolean indicates if this ReplicationDomain should log
   * ChangeNumbers.
   */
  private boolean logChangeNumber = false;
  // This configuration integer indicates the time the domain keeps the
  // historical information necessary to solve conflicts.
  // When a change stored in the historical part of the user entry has a date
  // (from its replication ChangeNumber) older than this delay, it is candidate
  // to be purged.
  /**
   * This configuration integer indicates the time the domain keeps the
   * historical information necessary to solve conflicts.<br>
   * When a change stored in the historical part of the user entry has a date
   * (from its replication ChangeNumber) older than this delay, it is candidate
   * to be purged.
   */
  private long histPurgeDelayInMilliSec = 0;
  // The last change number purged in this domain. Allows to have a continuous
  // purging process from one purge processing (task run) to the next one.
  // Values 0 when the server starts.
  /**
   * The last change number purged in this domain. Allows to have a continuous
   * purging process from one purge processing (task run) to the next one.
   * Values 0 when the server starts.
   */
  private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
  /**
@@ -752,10 +742,8 @@
    if (fractionalConfig.isFractional())
    {
      // Set new fractional configuration values
      if (newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
        fractionalConfig.setFractionalExclusive(true);
      else
        fractionalConfig.setFractionalExclusive(false);
      fractionalConfig.setFractionalExclusive(
          newFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
      fractionalConfig.setFractionalSpecificClassesAttributes(
        newFractionalConfig.getFractionalSpecificClassesAttributes());
      fractionalConfig.setFractionalAllClassesAttributes(
@@ -950,10 +938,8 @@
    // Set stored fractional configuration values
    if (storedFractionalConfig.isFractional())
    {
      if (storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL)
        storedFractionalConfig.setFractionalExclusive(true);
      else
        storedFractionalConfig.setFractionalExclusive(false);
      storedFractionalConfig.setFractionalExclusive(
          storedFractionalMode == FractionalConfig.EXCLUSIVE_FRACTIONAL);
    }
    storedFractionalConfig.setFractionalSpecificClassesAttributes(
      storedFractionalSpecificClassesAttributes);
@@ -1434,12 +1420,10 @@
            AttributeValue rdnAttributeValue =
              entryRdn.getAttributeValue(attributeType);
            List<Attribute> attrList = attributesMap.get(attributeType);
            Iterator<Attribute> attrIt = attrList.iterator();
            AttributeValue sameAttrValue = null;
            //    Locate the attribute value identical to the one in the RDN
            while(attrIt.hasNext())
            for (Attribute attr : attrList)
            {
              Attribute attr = attrIt.next();
              if (attr.contains(rdnAttributeValue))
              {
                for (AttributeValue attrValue : attr) {
@@ -2577,9 +2561,12 @@
  /**
   * Create and replay a synchronized Operation from an UpdateMsg.
   *
   * @param msg The UpdateMsg to be replayed.
   * @param msg
   *          The UpdateMsg to be replayed.
   * @param shutdown
   *          whether the server initiated shutdown
   */
  public void replay(LDAPUpdateMsg msg)
  public void replay(LDAPUpdateMsg msg, AtomicBoolean shutdown)
  {
    Operation op = null;
    boolean replayDone = false;
@@ -2599,6 +2586,11 @@
        while ((!dependency) && (!replayDone) && (retryCount-- > 0))
        {
          if (shutdown.get())
          {
            // shutdown initiated, let's leave
            return;
          }
          // Try replay the operation
          op.setInternalOperation(true);
          op.setSynchronizationOperation(true);
@@ -2622,6 +2614,25 @@
              // renamed by a more recent modify DN.
              replayDone = true;
            }
            else if (result == ResultCode.BUSY)
            {
              /*
               * We probably could not get a lock (OPENDJ-885). Give the server
               * another chance to process this operation immediately.
               */
              Thread.yield();
              continue;
            }
            else if (result == ResultCode.UNAVAILABLE)
            {
              /*
               * It can happen when a rebuild is performed or the backend is
               * offline (OPENDJ-49). Give the server another chance to process
               * this operation after some time.
               */
              Thread.sleep(50);
              continue;
            }
            else if (op instanceof ModifyOperation)
            {
              ModifyOperation newOp = (ModifyOperation) op;
@@ -2693,22 +2704,13 @@
        }
      } catch (ASN1Exception e)
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
        replayErrorMsg = message.toString();
        replayErrorMsg = logDecodingOperationError(msg, e);
      } catch (LDAPException e)
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
        replayErrorMsg = message.toString();
        replayErrorMsg = logDecodingOperationError(msg, e);
      } catch (DataFormatException e)
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
        replayErrorMsg = message.toString();
        replayErrorMsg = logDecodingOperationError(msg, e);
      } catch (Exception e)
      {
        if (changeNumber != null)
@@ -2726,10 +2728,7 @@
          updateError(changeNumber);
        } else
        {
          Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
            String.valueOf(msg) + stackTraceToSingleLineString(e));
          logError(message);
          replayErrorMsg = message.toString();
          replayErrorMsg = logDecodingOperationError(msg, e);
        }
      } finally
      {
@@ -2753,6 +2752,14 @@
    } while (msg != null);
  }
  private String logDecodingOperationError(LDAPUpdateMsg msg, Exception e)
  {
    Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
      String.valueOf(msg) + stackTraceToSingleLineString(e));
    logError(message);
    return message.toString();
  }
  /**
   * This method is called when an error happens while replaying
   * an operation.
@@ -4862,7 +4869,7 @@
   * {@inheritDoc}
   */
  @Override
  public boolean processUpdate(UpdateMsg updateMsg)
  public boolean processUpdate(UpdateMsg updateMsg, AtomicBoolean shutdown)
  {
    // Ignore message if fractional configuration is inconsistent and
    // we have been passed into bad data set status
@@ -4880,8 +4887,23 @@
      // Put update message into the replay queue
      // (block until some place in the queue is available)
      UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
      updateToReplayQueue.offer(updateToReplay);
      final UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
      while (!shutdown.get())
      {
        // loop until we can offer to the queue or shutdown was initiated
        try
        {
          if (updateToReplayQueue.offer(updateToReplay, 1, TimeUnit.SECONDS))
          {
            // successful offer to the queue, let's exit the loop
            break;
          }
        }
        catch (InterruptedException e)
        {
          // Thread interrupted: check for shutdown.
        }
      }
      return false;
    }
@@ -5321,10 +5343,8 @@
        case EXCLUSIVE_FRACTIONAL:
        case INCLUSIVE_FRACTIONAL:
          result.setFractional(true);
          if (newFractionalMode == EXCLUSIVE_FRACTIONAL)
            result.setFractionalExclusive(true);
          else
            result.setFractionalExclusive(false);
          result.setFractionalExclusive(
              newFractionalMode == EXCLUSIVE_FRACTIONAL);
          break;
      }
      result.setFractionalSpecificClassesAttributes(