mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
11.19.2007 a714dc56fbe8419a6f0e4e8ffd36384009a89557
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -49,6 +49,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -128,17 +129,14 @@
{
  private ReplicationMonitor monitor;
  private ChangeNumberGenerator changeNumberGenerator;
  private ReplicationBroker broker;
  private List<ListenerThread> synchroThreads =
    new ArrayList<ListenerThread>();
  private final SortedMap<ChangeNumber, PendingChange> pendingChanges =
    new TreeMap<ChangeNumber, PendingChange>();
  private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
    new TreeMap<ChangeNumber, UpdateMessage>();
  private int numRcvdUpdates = 0;
  private int numSentUpdates = 0;
  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
  private AtomicInteger numSentUpdates = new AtomicInteger(0);
  private AtomicInteger numProcessedUpdates = new AtomicInteger();
  private int debugCount = 0;
  private PersistentServerState state;
@@ -150,6 +148,19 @@
  private int maxSendDelay = 0;
  /**
   * This object is used to store the list of update currently being
   * done on the local database.
   * It contain both the update that are done directly on this server
   * and the updates that was done on another server, transmitted
   * by the replication server and that are currently replayed.
   * It is usefull to make sure that dependencies between operations
   * are correctly fullfilled, that the local operations are sent in a
   * correct order to the replication server and that the ServerState
   * is not updated too early.
   */
  private PendingChanges pendingChanges;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
@@ -245,7 +256,7 @@
  private ConfigEntry backendConfigEntry;
  private List<DN> branches = new ArrayList<DN>(0);
  private int listenerThreadNumber = 1;
  private int listenerThreadNumber = 10;
  private boolean receiveStatus = true;
  private Collection<String> replicationServers;
@@ -317,12 +328,6 @@
    DirectoryServer.registerMonitorProvider(monitor);
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
     */
    changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
    /*
     * create the broker object used to publish and receive changes
     */
    try
@@ -348,6 +353,14 @@
      */
    }
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
     */
    pendingChanges =
      new PendingChanges(new ChangeNumberGenerator(serverId, state),
                         broker, state);
    // listen for changes on the configuration
    configuration.addChangeListener(this);
  }
@@ -444,23 +457,37 @@
       * of the parent entry
       */
      // There is a potential of perfs improvement here
      // if we could avoid the following parent entry retrieval
      DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
      if (parentDnFromCtx != null)
      String parentUid = ctx.getParentUid();
      // root entry have no parent,
      // there is no need to check for it.
      if (parentUid != null)
      {
        DN entryDN = addOperation.getEntryDN();
        DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
        if ((parentDnFromEntryDn != null)
            && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
        // There is a potential of perfs improvement here
        // if we could avoid the following parent entry retrieval
        DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
        if (parentDnFromCtx == null)
        {
          // parentEntry has been renamed
          // replication name conflict resolution is expected to fix that
          // later in the flow
          // The parent does not exist with the specified unique id
          // stop the operation with NO_SUCH_OBJECT and let the
          // conflict resolution or the dependency resolution solve this.
          addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
          return new SynchronizationProviderResult(false);
        }
        else
        {
          DN entryDN = addOperation.getEntryDN();
          DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
          if ((parentDnFromEntryDn != null)
              && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
          {
            // parentEntry has been renamed
            // replication name conflict resolution is expected to fix that
            // later in the flow
            addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
            return new SynchronizationProviderResult(false);
          }
        }
      }
    }
    return new SynchronizationProviderResult(true);
@@ -633,89 +660,96 @@
   */
  public UpdateMessage receive()
  {
    synchronized (broker)
    UpdateMessage update = pendingChanges.getNextUpdate();
    if (update == null)
    {
      UpdateMessage update = null;
      while (update == null)
      synchronized (broker)
      {
        ReplicationMessage msg;
        try
        while (update == null)
        {
          msg = broker.receive();
          if (msg == null)
          ReplicationMessage msg;
          try
          {
            // The server is in the shutdown process
            return null;
          }
          log("Broker received message :" + msg);
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
            receiveAck(ack);
          }
          else if (msg instanceof UpdateMessage)
          {
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          else if (msg instanceof InitializeRequestMessage)
          {
            // Another server requests us to provide entries
            // for a total update
            InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
            try
            msg = broker.receive();
            if (msg == null)
            {
              initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                  null);
              // The server is in the shutdown process
              return null;
            }
            catch(DirectoryException de)
            log("Broker received message :" + msg);
            if (msg instanceof AckMessage)
            {
              // Returns an error message to notify the sender
              int msgID = de.getMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              broker.publish(errorMsg);
              AckMessage ack = (AckMessage) msg;
              receiveAck(ack);
            }
          }
          else if (msg instanceof InitializeTargetMessage)
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
            else if (msg instanceof InitializeRequestMessage)
            {
              // Another server requests us to provide entries
              // for a total update
              InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
              try
              {
                initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                                 null);
              }
              catch(DirectoryException de)
              {
                // Returns an error message to notify the sender
                int msgID = de.getMessageID();
                ErrorMessage errorMsg =
                  new ErrorMessage(initMsg.getsenderID(),
                                   msgID, de.getMessage());
                broker.publish(errorMsg);
              }
            }
            else if (msg instanceof InitializeTargetMessage)
            {
              // Another server is exporting its entries to us
              InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
            try
            {
              importBackend(initMsg);
              try
              {
                importBackend(initMsg);
              }
              catch(DirectoryException de)
              {
                // Return an error message to notify the sender
                int msgID = de.getMessageID();
                ErrorMessage errorMsg =
                  new ErrorMessage(initMsg.getsenderID(),
                                   msgID, de.getMessage());
                log(getMessage(msgID,
                               backend.getBackendID()) + de.getMessage());
                broker.publish(errorMsg);
              }
            }
            catch(DirectoryException de)
            else if (msg instanceof ErrorMessage)
            {
              // Return an error message to notify the sender
              int msgID = de.getMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
              broker.publish(errorMsg);
              if (ieContext != null)
              {
                // This is an error termination for the 2 following cases :
                // - either during an export
                // - or before an import really started
                //   For example, when we publish a request and the
                //  replicationServer did not find any import source.
                abandonImportExport((ErrorMessage)msg);
              }
            }
            else if (msg instanceof UpdateMessage)
            {
              update = (UpdateMessage) msg;
              receiveUpdate(update);
            }
          }
          else if (msg instanceof ErrorMessage)
          catch (SocketTimeoutException e)
          {
            if (ieContext != null)
            {
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  replicationServer did not find any import source.
              abandonImportExport((ErrorMessage)msg);
            }
            // just retry
          }
        } catch (SocketTimeoutException e)
        {
          // just retry
        }
      }
      return update;
    }
    return update;
  }
  /**
@@ -725,21 +759,8 @@
   */
  public void receiveUpdate(UpdateMessage update)
  {
    ChangeNumber changeNumber = update.getChangeNumber();
    synchronized (pendingChanges)
    {
      if (pendingChanges.containsKey(changeNumber))
      {
        /*
         * This should never happen,
         * TODO log error and throw exception
         */
      }
      pendingChanges.put(changeNumber,
          new PendingChange(changeNumber, null, update));
      numRcvdUpdates++;
    }
    pendingChanges.putRemoteUpdate(update);
    numRcvdUpdates.incrementAndGet();
  }
  /**
@@ -752,10 +773,9 @@
    UpdateMessage update;
    ChangeNumber changeNumber = ack.getChangeNumber();
    synchronized (pendingChanges)
    synchronized (waitingAckMsgs)
    {
      update = waitingAckMsgs.get(changeNumber);
      waitingAckMsgs.remove(changeNumber);
      update = waitingAckMsgs.remove(changeNumber);
    }
    if (update != null)
    {
@@ -798,61 +818,55 @@
         * This is an operation type that we do not know about
         * It should never happen.
         */
        synchronized (pendingChanges)
        {
          pendingChanges.remove(curChangeNumber);
          int    msgID   = MSGID_UNKNOWN_TYPE;
          String message = getMessage(msgID, op.getOperationType().toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          return;
        }
        pendingChanges.remove(curChangeNumber);
        int    msgID   = MSGID_UNKNOWN_TYPE;
        String message = getMessage(msgID, op.getOperationType().toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        return;
      }
    }
    synchronized(pendingChanges)
    if (result == ResultCode.SUCCESS)
    {
      if (result == ResultCode.SUCCESS)
      try
      {
        PendingChange curChange = pendingChanges.get(curChangeNumber);
        if (curChange == null)
        {
          // This should never happen
          int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
          String message = getMessage(msgID, curChangeNumber.toString(),
              op.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          return;
        }
        curChange.setCommitted(true);
        pendingChanges.commit(curChangeNumber, op, msg);
      }
      catch  (NoSuchElementException e)
      {
        int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
        String message = getMessage(msgID, curChangeNumber.toString(),
                                    op.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        return;
      }
        if (op.isSynchronizationOperation())
          curChange.setOp(op);
        else
          curChange.setMsg(msg);
        if (msg != null && isAssured)
      if (msg != null && isAssured)
      {
        synchronized (waitingAckMsgs)
        {
          // Add the assured message to the list of those whose acknowledgements
          // we are awaiting.
          // Add the assured message to the list of update that are
          // waiting acknowledgements
          waitingAckMsgs.put(curChangeNumber, msg);
        }
      }
      else if (!op.isSynchronizationOperation())
      {
        // Remove an unsuccessful non-replication operation from the pending
        // changes list.
        if (curChangeNumber != null)
        {
          pendingChanges.remove(curChangeNumber);
        }
      }
      pushCommittedChanges();
    }
    else if (!op.isSynchronizationOperation())
    {
      // Remove an unsuccessful non-replication operation from the pending
      // changes list.
      if (curChangeNumber != null)
      {
        pendingChanges.remove(curChangeNumber);
      }
    }
    int pushedChanges = pendingChanges.pushCommittedChanges();
    numSentUpdates.addAndGet(pushedChanges);
    // Wait for acknowledgement of an assured message.
    if (msg != null && isAssured)
@@ -880,7 +894,7 @@
   */
  public int getNumRcvdUpdates()
  {
    return numRcvdUpdates;
    return numRcvdUpdates.get();
  }
  /**
@@ -890,11 +904,11 @@
   */
  public int getNumSentUpdates()
  {
    return numSentUpdates;
    return numSentUpdates.get();
  }
  /**
   * get the number of updates in the pending list.
   * Get the number of updates in the pending list.
   *
   * @return The number of updates in the pending list
   */
@@ -1052,45 +1066,63 @@
  {
    Operation op = null;
    boolean done = false;
    boolean dependency = false;
    ChangeNumber changeNumber = null;
    int retryCount = 10;
    boolean firstTry = true;
    try
    {
      while (!done && retryCount-- > 0)
      while ((!dependency) && (!done) && (retryCount-- > 0))
      {
        op = msg.createOperation(conn);
        op.setInternalOperation(true);
        op.setSynchronizationOperation(true);
        changeNumber = OperationContext.getChangeNumber(op);
        if (changeNumber != null)
          changeNumberGenerator.adjust(changeNumber);
        op.run();
        ResultCode result = op.getResultCode();
        if (result != ResultCode.SUCCESS)
        {
          if (op instanceof ModifyOperation)
          {
            ModifyOperation newOp = (ModifyOperation) op;
            done = solveNamingConflict(newOp, msg);
            dependency = pendingChanges.checkDependencies(newOp);
            if (!dependency)
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof DeleteOperation)
          {
            DeleteOperation newOp = (DeleteOperation) op;
            done = solveNamingConflict(newOp, msg);
            dependency = pendingChanges.checkDependencies(newOp);
            if ((!dependency) && (!firstTry))
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof AddOperation)
          {
            AddOperation newOp = (AddOperation) op;
            done = solveNamingConflict(newOp, msg);
          } else if (op instanceof ModifyDNOperation)
            dependency = pendingChanges.checkDependencies(newOp);
            if (!dependency)
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof ModifyDNOperation)
          {
            ModifyDNOperation newOp = (ModifyDNOperation) op;
            done = solveNamingConflict(newOp, msg);
            ModifyDNMsg newMsg = (ModifyDNMsg) msg;
            dependency = pendingChanges.checkDependencies(newMsg);
            if (!dependency)
            {
              ModifyDNOperation newOp = (ModifyDNOperation) op;
              done = solveNamingConflict(newOp, msg);
            }
          }
          else
          {
@@ -1108,9 +1140,10 @@
        {
          done = true;
        }
        firstTry = false;
      }
      if (!done)
      if (!done && !dependency)
      {
        // Continue with the next change but the servers could now become
        // inconsistent.
@@ -1119,6 +1152,7 @@
        String message = getMessage(msgID, op.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
        updateError(changeNumber);
      }
    }
@@ -1177,9 +1211,12 @@
    }
    finally
    {
      if (msg.isAssured())
        ack(msg.getChangeNumber());
      incProcessedUpdates();
      if (!dependency)
      {
        if (msg.isAssured())
          ack(msg.getChangeNumber());
        incProcessedUpdates();
      }
    }
  }
@@ -1193,12 +1230,9 @@
   */
  public void updateError(ChangeNumber changeNumber)
  {
    synchronized (pendingChanges)
    {
      PendingChange change = pendingChanges.get(changeNumber);
      change.setCommitted(true);
      pushCommittedChanges();
    }
    pendingChanges.commit(changeNumber);
    int pushedChanges = pendingChanges.pushCommittedChanges();
    numSentUpdates.addAndGet(pushedChanges);
  }
  /**
@@ -1210,15 +1244,7 @@
   */
  private ChangeNumber generateChangeNumber(Operation operation)
  {
    ChangeNumber changeNumber;
    changeNumber = changeNumberGenerator.NewChangeNumber();
    PendingChange change = new PendingChange(changeNumber, operation, null);
    synchronized(pendingChanges)
    {
      pendingChanges.put(changeNumber, change);
    }
    return changeNumber;
    return pendingChanges.putLocalOperation(operation);
  }
@@ -1604,42 +1630,6 @@
  }
  /**
   * Push all committed local changes to the replicationServer service.
   * PRECONDITION : The pendingChanges lock must be held before calling
   * this method.
   */
  private void pushCommittedChanges()
  {
    if (pendingChanges.isEmpty())
      return;
    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
    while ((firstChange != null) && firstChange.isCommitted())
    {
      if ((firstChange.getOp() != null ) &&
          (firstChange.getOp().isSynchronizationOperation() == false))
      {
        numSentUpdates++;
        broker.publish(firstChange.getMsg());
      }
      state.update(firstChangeNumber);
      pendingChanges.remove(firstChangeNumber);
      if (pendingChanges.isEmpty())
      {
        firstChange = null;
      }
      else
      {
        firstChangeNumber = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstChangeNumber);
      }
    }
  }
  /**
   * Get the maximum receive window size.
   *
   * @return The maximum receive window size.