mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.50.2007 0d566a1779183a479c33447ac6b0224b705a33c9
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -170,17 +170,22 @@
  /**
   * This object is used to store the list of update currently being
   * done on the local database.
   * It contain both the update that are done directly on this server
   * and the updates that was done on another server, transmitted
   * by the replication server and that are currently replayed.
   * It is usefull to make sure that dependencies between operations
   * are correctly fullfilled, that the local operations are sent in a
   * Is is usefull to make sure that the local operations are sent in a
   * correct order to the replication server and that the ServerState
   * is not updated too early.
   */
  private PendingChanges pendingChanges;
  /**
   * It contain the updates that were done on other servers, transmitted
   * by the replication server and that are currently replayed.
   * It is usefull to make sure that dependencies between operations
   * are correctly fullfilled and to to make sure that the ServerState is
   * not updated too early.
   */
  private RemotePendingChanges remotePendingChanges;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
@@ -373,10 +378,15 @@
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
     */
    ChangeNumberGenerator generator =
      new ChangeNumberGenerator(serverId, state);
    pendingChanges =
      new PendingChanges(new ChangeNumberGenerator(serverId, state),
                         broker, state);
    remotePendingChanges = new RemotePendingChanges(generator, state);
    // listen for changes on the configuration
    configuration.addChangeListener(this);
  }
@@ -679,7 +689,7 @@
   */
  public UpdateMessage receive()
  {
    UpdateMessage update = pendingChanges.getNextUpdate();
    UpdateMessage update = remotePendingChanges.getNextUpdate();
    if (update == null)
    {
@@ -774,7 +784,7 @@
   */
  public void receiveUpdate(UpdateMessage update)
  {
    pendingChanges.putRemoteUpdate(update);
    remotePendingChanges.putRemoteUpdate(update);
    numRcvdUpdates.incrementAndGet();
  }
@@ -847,7 +857,14 @@
    {
      try
      {
        pendingChanges.commit(curChangeNumber, op, msg);
        if (op.isSynchronizationOperation())
        {
          remotePendingChanges.commit(curChangeNumber);
        }
        else
        {
          pendingChanges.commit(curChangeNumber, msg);
        }
      }
      catch  (NoSuchElementException e)
      {
@@ -880,8 +897,11 @@
      }
    }
    int pushedChanges = pendingChanges.pushCommittedChanges();
    numSentUpdates.addAndGet(pushedChanges);
    if (!op.isSynchronizationOperation())
    {
      int pushedChanges = pendingChanges.pushCommittedChanges();
      numSentUpdates.addAndGet(pushedChanges);
    }
    // Wait for acknowledgement of an assured message.
    if (msg != null && isAssured)
@@ -1111,7 +1131,7 @@
          if (op instanceof ModifyOperation)
          {
            ModifyOperation newOp = (ModifyOperation) op;
            dependency = pendingChanges.checkDependencies(newOp);
            dependency = remotePendingChanges.checkDependencies(newOp);
            if (!dependency)
            {
              done = solveNamingConflict(newOp, msg);
@@ -1120,7 +1140,7 @@
          else if (op instanceof DeleteOperation)
          {
            DeleteOperation newOp = (DeleteOperation) op;
            dependency = pendingChanges.checkDependencies(newOp);
            dependency = remotePendingChanges.checkDependencies(newOp);
            if ((!dependency) && (!firstTry))
            {
              done = solveNamingConflict(newOp, msg);
@@ -1130,7 +1150,7 @@
          {
            AddOperation newOp = (AddOperation) op;
            AddMsg addMsg = (AddMsg) msg;
            dependency = pendingChanges.checkDependencies(newOp);
            dependency = remotePendingChanges.checkDependencies(newOp);
            if (!dependency)
            {
              done = solveNamingConflict(newOp, addMsg);
@@ -1139,7 +1159,7 @@
          else if (op instanceof ModifyDNOperation)
          {
            ModifyDNMsg newMsg = (ModifyDNMsg) msg;
            dependency = pendingChanges.checkDependencies(newMsg);
            dependency = remotePendingChanges.checkDependencies(newMsg);
            if (!dependency)
            {
              ModifyDNOperation newOp = (ModifyDNOperation) op;
@@ -1253,9 +1273,7 @@
   */
  public void updateError(ChangeNumber changeNumber)
  {
    pendingChanges.commit(changeNumber);
    int pushedChanges = pendingChanges.pushCommittedChanges();
    numSentUpdates.addAndGet(pushedChanges);
    remotePendingChanges.commit(changeNumber);
  }
  /**