mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.06.2013 3253906b33605684c3e071a6e5c3af0f20c9e375
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -34,6 +34,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.DataFormatException;
import org.opends.messages.Category;
@@ -199,6 +200,8 @@
   * is not updated too early.
   */
  private final PendingChanges pendingChanges;
  private final AtomicReference<RSUpdater> rsUpdater =
      new AtomicReference<RSUpdater>(null);
  /**
   * It contain the updates that were done on other servers, transmitted
@@ -335,7 +338,7 @@
   * The thread that periodically saves the ServerState of this
   * LDAPReplicationDomain in the database.
   */
  private class  ServerStateFlush extends DirectoryThread
  private class ServerStateFlush extends DirectoryThread
  {
    protected ServerStateFlush()
    {
@@ -351,7 +354,7 @@
    {
      done = false;
      while (!shutdown)
      while (!isShutdownInitiated())
      {
        try
        {
@@ -368,6 +371,7 @@
        catch (InterruptedException e)
        {
          // Thread interrupted: check for shutdown.
          Thread.currentThread().interrupt();
        }
      }
      state.save();
@@ -383,6 +387,11 @@
  private class RSUpdater extends DirectoryThread
  {
    private final CSN startCSN;
    /**
     * Used to communicate that the current thread computation needs to
     * shutdown.
     */
    private AtomicBoolean shutdown = new AtomicBoolean(false);
    protected RSUpdater(CSN replServerMaxCSN)
    {
@@ -400,8 +409,7 @@
    {
      // Replication server is missing some of our changes: let's
      // send them to him.
      Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
      logError(message);
      logError(DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get());
      /*
       * Get all the changes that have not been seen by this
@@ -409,10 +417,9 @@
       */
      try
      {
        if (buildAndPublishMissingChanges(startCSN, broker))
        if (buildAndPublishMissingChanges(startCSN, broker, shutdown))
        {
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
          logError(DEBUG_CHANGES_SENT.get());
          synchronized(replayOperations)
          {
            replayOperations.clear();
@@ -427,8 +434,7 @@
           * Log an error for the repair tool
           * that will need to re-synchronize the servers.
           */
          message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
          logError(message);
          logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
        }
      } catch (Exception e)
      {
@@ -439,14 +445,24 @@
         * Log an error for the repair tool
         * that will need to re-synchronize the servers.
         */
        message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
        logError(message);
        logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
      }
      finally
      {
        broker.setRecoveryRequired(false);
        // RSUpdater thread has finished its work, let's remove it from memory
        // so another RSUpdater thread can be started if needed.
        rsUpdater.compareAndSet(this, null);
      }
    }
    /** {@inheritDoc} */
    @Override
    public void initiateShutdown()
    {
      this.shutdown.set(true);
      super.initiateShutdown();
    }
  }
@@ -2372,10 +2388,16 @@
    if (!shutdown)
    {
      shutdown = true;
      final RSUpdater rsUpdater = this.rsUpdater.get();
      if (rsUpdater != null)
      {
        rsUpdater.initiateShutdown();
      }
      // stop the thread in charge of flushing the ServerState.
      if (flushThread != null)
      {
        flushThread.initiateShutdown();
        synchronized (flushThread)
        {
          flushThread.notify();
@@ -4356,7 +4378,11 @@
        {
          pendingChanges.setRecovering(true);
          broker.setRecoveryRequired(true);
          new RSUpdater(replServerMaxCSN).start();
          final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN);
          if (this.rsUpdater.compareAndSet(null, rsUpdater))
          {
            rsUpdater.start();
          }
        }
      }
    } catch (Exception e)
@@ -4375,12 +4401,14 @@
   *          The CSN where we need to start the search
   * @param session
   *          The session to use to publish the changes
   * @param shutdown
   *          whether the current run must be stopped
   * @return A boolean indicating he success of the operation.
   * @throws Exception
   *           if an Exception happens during the search.
   */
  public boolean buildAndPublishMissingChanges(CSN startCSN,
      ReplicationBroker session) throws Exception
      ReplicationBroker session, AtomicBoolean shutdown) throws Exception
  {
    // Trim the changes in replayOperations that are older than the startCSN.
    synchronized (replayOperations)
@@ -4388,6 +4416,10 @@
      Iterator<CSN> it = replayOperations.keySet().iterator();
      while (it.hasNext())
      {
        if (shutdown.get())
        {
          return false;
        }
        if (it.next().isNewerThan(startCSN))
        {
          break;
@@ -4401,6 +4433,11 @@
    CSN currentStartCSN = startCSN;
    do
    {
      if (shutdown.get())
      {
        return false;
      }
      lastRetrievedChange = null;
      // We can't do the search in one go because we need to store the results
      // so that we are sure we send the operations in order and because the
@@ -4417,15 +4454,21 @@
      // Publish and remove all the changes from the replayOperations list
      // that are older than the endCSN.
      List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
      final List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
      synchronized (replayOperations)
      {
        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
        while (itOp.hasNext())
        {
          if (shutdown.get())
          {
            return false;
          }
          FakeOperation fakeOp = itOp.next();
          if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check
              || !state.cover(fakeOp.getCSN()))
              || !state.cover(fakeOp.getCSN())
              // do not look for replay operations in the future
              || endCSN.isNewerThan(now()))
          {
            break;
          }
@@ -4438,9 +4481,13 @@
      for (FakeOperation opToSend : opsToSend)
      {
        if (shutdown.get())
        {
          return false;
        }
        session.publishRecovery(opToSend.generateMessage());
      }
      opsToSend.clear();
      if (lastRetrievedChange != null)
      {
        currentStartCSN = lastRetrievedChange;
@@ -4449,13 +4496,16 @@
      {
        currentStartCSN = endCSN;
      }
    } while (pendingChanges.recoveryUntil(lastRetrievedChange)
          && op.getResultCode().equals(ResultCode.SUCCESS));
    return op.getResultCode().equals(ResultCode.SUCCESS);
  }
  private static CSN now()
  {
    return new CSN(TimeThread.getTime(), 0, 0);
  }
  /**
   * Search for the changes that happened since fromCSN based on the historical
@@ -4589,6 +4639,7 @@
        catch (InterruptedException e)
        {
          // Thread interrupted: check for shutdown.
          Thread.currentThread().interrupt();
        }
      }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -31,6 +31,7 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.assertj.core.api.Assertions;
import org.opends.messages.Category;
@@ -187,10 +188,8 @@
    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
    TestBroker session = new TestBroker(opList);
    boolean result =
      rd1.buildAndPublishMissingChanges(
          new CSN(startTime, 0, serverId),
          session);
      CSN csn = new CSN(startTime, 0, serverId);
      boolean result = rd1.buildAndPublishMissingChanges(csn, session, new AtomicBoolean());
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 3, "buildAndPublishMissingChanges should return 3 operations");
    assertTrue(opList.getFirst().getClass().equals(AddMsg.class));
@@ -204,7 +203,7 @@
    opList = new LinkedList<ReplicationMsg>();
    session = new TestBroker(opList);
      result = rd1.buildAndPublishMissingChanges(fromCSN, session);
      result = rd1.buildAndPublishMissingChanges(fromCSN, session, new AtomicBoolean());
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 1, "buildAndPublishMissingChanges should return 1 operation");
    assertTrue(opList.getFirst().getClass().equals(ModifyMsg.class));
@@ -278,23 +277,21 @@
        "dn: cn=test2," + baseDN,
        "changetype: modify",
        "add: description",
    "description: foo");
        "description: foo");
    resultCode = TestCaseUtils.applyModifications(false,
        "dn: cn=test1," + baseDN,
        "changetype: modify",
        "add: description",
    "description: foo");
        "description: foo");
    assertEquals(resultCode, 0);
    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
    TestBroker session = new TestBroker(opList);
    // Call the buildAndPublishMissingChanges and check that this method
    // correctly generates the 4 operations in the correct order.
    boolean result =
      rd1.buildAndPublishMissingChanges(
          new CSN(startTime, 0, serverId),
          session);
      // Call the buildAndPublishMissingChanges and check that this method
      // correctly generates the 4 operations in the correct order.
      CSN csn = new CSN(startTime, 0, serverId);
      boolean result = rd1.buildAndPublishMissingChanges(csn, session, new AtomicBoolean());
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 5, "buildAndPublishMissingChanges should return 5 operations");
    ReplicationMsg msg = opList.removeFirst();
@@ -336,14 +333,11 @@
  private LDAPReplicationDomain createReplicationDomain(int dsId)
          throws DirectoryException, ConfigException
  {
    DN baseDN = DN.decode(TEST_ROOT_DN_STRING);
    DomainFakeCfg domainConf =
      new DomainFakeCfg(baseDN, dsId, replServers, AssuredType.NOT_ASSURED,
      2, 1, 0, null);
    LDAPReplicationDomain replicationDomain =
      MultimasterReplication.createNewDomain(domainConf);
    final DN baseDN = DN.decode(TEST_ROOT_DN_STRING);
    final DomainFakeCfg domainConf = new DomainFakeCfg(
        baseDN, dsId, replServers, AssuredType.NOT_ASSURED, 2, 1, 0, null);
    LDAPReplicationDomain replicationDomain = MultimasterReplication.createNewDomain(domainConf);
    replicationDomain.start();
    return replicationDomain;
  }
}