mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

matthew_swift
05.04.2009 9dc10dec2d5d7f61116f7f647b7cf9596ca77be0
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -58,7 +58,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeSet;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -226,8 +226,8 @@
  // This list is used to temporary store operations that needs
  // to be replayed at session establishment time.
  private final TreeSet<FakeOperation> replayOperations  =
    new TreeSet<FakeOperation>(new FakeOperationComparator());;
  private final TreeMap<ChangeNumber, FakeOperation> replayOperations  =
    new TreeMap<ChangeNumber, FakeOperation>();;
  /**
   * The isolation policy that this domain is going to use.
@@ -542,7 +542,23 @@
      String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
      ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
      deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
      synchronized (replayOperations)
      {
        int size = replayOperations.size();
        if (size >= 10000)
        {
          replayOperations.remove(replayOperations.firstKey());
        }
        replayOperations.put(
            changeNumber,
            new FakeDelOperation(
                deleteOperation.getEntryDN().toString(),
                changeNumber,modifiedEntryUUID ));
      }
    }
    return new SynchronizationProviderResult.ContinueProcessing();
  }
@@ -2863,25 +2879,41 @@
          logError(message);
        } else
        {
          for (FakeOperation replayOp : replayOperations)
          for (FakeOperation replayOp :
            replayOperations.tailMap(replServerMaxChangeNumber).values())
          {
            ChangeNumber cn = replayOp.getChangeNumber();
            /*
             * Because the entry returned by the search operation
             * can contain old historical information, it is
             * possible that some of the FakeOperation are
             * actually older than the
             * Only send the Operation if it was newer than
             * the last ChangeNumber known by the Replication Server.
             * actually older than the last ChangeNumber known by
             * the Replication Server.
             * In such case don't send the operation.
             */
            if (cn.newer(replServerMaxChangeNumber))
            if (!cn.newer(replServerMaxChangeNumber))
            {
              message =
                DEBUG_SENDING_CHANGE.get(
                    replayOp.getChangeNumber().toString());
              logError(message);
              session.publish(replayOp.generateMessage());
              continue;
            }
            /*
             * Check if the DeleteOperation has been abandoned before
             * being processed. This is necessary because the replayOperation
             *
             */
            if (replayOp instanceof FakeDelOperation)
            {
              FakeDelOperation delOp = (FakeDelOperation) replayOp;
              if (findEntryDN(delOp.getUUID()) != null)
              {
                continue;
              }
            }
            message =
              DEBUG_SENDING_CHANGE.get(
                  replayOp.getChangeNumber().toString());
            logError(message);
            session.publish(replayOp.generateMessage());
          }
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
@@ -2958,7 +2990,7 @@
      Historical.generateFakeOperations(searchEntry);
    for (FakeOperation op : updates)
    {
      replayOperations.add(op);
      replayOperations.put(op.getChangeNumber(), op);
    }
  }