mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.06.2013 90856376b31a5d0071a17b86c2f0c92950873612
OPENDJ-1190 (CR-2523) Under rare circumstances the DS replication recovery thread (RSUpdater) can spin


This change is linked to the misbehaving RSUpdater thread which (wrongly):
- could be started multiple times
- would not shutdown willingly when the server is shutting down
- would try to look for replay operations in the future



LDAPReplicationDomain.java:
Added AtomicReference for RSUpdater thread.
Used DirectoryThread.isShutdownInitiated() for ServerStateFlush and RSUpdater threads.
In RSUpdater, added shutdown field to pass it down to buildAndPublishMissingChanges() + overrode initiateShutdown() to set the shutdown field to true.
In shutdown(), called initiateShutdown() for RSUpdater and RSUpdater threads.
In sessionInitiated(), only start the RSUpdaterThread if it does not already exist.
In buildAndPublishMissingChanges(), added early exits in case of shutdown.
Added now() method.

HistoricalCsnOrderingTest.java:
Consequence of the change to LDAPReplicationDomain.
2 files modified
109 ■■■■ changed files
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 83 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java 26 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -34,6 +34,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.DataFormatException;
import org.opends.messages.Category;
@@ -199,6 +200,8 @@
   * is not updated too early.
   */
  private final PendingChanges pendingChanges;
  private final AtomicReference<RSUpdater> rsUpdater =
      new AtomicReference<RSUpdater>(null);
  /**
   * It contain the updates that were done on other servers, transmitted
@@ -351,7 +354,7 @@
    {
      done = false;
      while (!shutdown)
      while (!isShutdownInitiated())
      {
        try
        {
@@ -368,6 +371,7 @@
        catch (InterruptedException e)
        {
          // Thread interrupted: check for shutdown.
          Thread.currentThread().interrupt();
        }
      }
      state.save();
@@ -383,6 +387,11 @@
  private class RSUpdater extends DirectoryThread
  {
    private final CSN startCSN;
    /**
     * Used to communicate that the current thread computation needs to
     * shutdown.
     */
    private AtomicBoolean shutdown = new AtomicBoolean(false);
    protected RSUpdater(CSN replServerMaxCSN)
    {
@@ -400,8 +409,7 @@
    {
      // Replication server is missing some of our changes: let's
      // send them to him.
      Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
      logError(message);
      logError(DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get());
      /*
       * Get all the changes that have not been seen by this
@@ -409,10 +417,9 @@
       */
      try
      {
        if (buildAndPublishMissingChanges(startCSN, broker))
        if (buildAndPublishMissingChanges(startCSN, broker, shutdown))
        {
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
          logError(DEBUG_CHANGES_SENT.get());
          synchronized(replayOperations)
          {
            replayOperations.clear();
@@ -427,8 +434,7 @@
           * Log an error for the repair tool
           * that will need to re-synchronize the servers.
           */
          message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
          logError(message);
          logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
        }
      } catch (Exception e)
      {
@@ -439,14 +445,24 @@
         * Log an error for the repair tool
         * that will need to re-synchronize the servers.
         */
        message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
        logError(message);
        logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
      }
      finally
      {
        broker.setRecoveryRequired(false);
        // RSUpdater thread has finished its work, let's remove it from memory
        // so another RSUpdater thread can be started if needed.
        rsUpdater.compareAndSet(this, null);
      }
    }
    /** {@inheritDoc} */
    @Override
    public void initiateShutdown()
    {
      this.shutdown.set(true);
      super.initiateShutdown();
    }
  }
@@ -2372,10 +2388,16 @@
    if (!shutdown)
    {
      shutdown = true;
      final RSUpdater rsUpdater = this.rsUpdater.get();
      if (rsUpdater != null)
      {
        rsUpdater.initiateShutdown();
      }
      // stop the thread in charge of flushing the ServerState.
      if (flushThread != null)
      {
        flushThread.initiateShutdown();
        synchronized (flushThread)
        {
          flushThread.notify();
@@ -4356,7 +4378,11 @@
        {
          pendingChanges.setRecovering(true);
          broker.setRecoveryRequired(true);
          new RSUpdater(replServerMaxCSN).start();
          final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN);
          if (this.rsUpdater.compareAndSet(null, rsUpdater))
          {
            rsUpdater.start();
          }
        }
      }
    } catch (Exception e)
@@ -4375,12 +4401,14 @@
   *          The CSN where we need to start the search
   * @param session
   *          The session to use to publish the changes
   * @param shutdown
   *          whether the current run must be stopped
   * @return A boolean indicating he success of the operation.
   * @throws Exception
   *           if an Exception happens during the search.
   */
  public boolean buildAndPublishMissingChanges(CSN startCSN,
      ReplicationBroker session) throws Exception
      ReplicationBroker session, AtomicBoolean shutdown) throws Exception
  {
    // Trim the changes in replayOperations that are older than the startCSN.
    synchronized (replayOperations)
@@ -4388,6 +4416,10 @@
      Iterator<CSN> it = replayOperations.keySet().iterator();
      while (it.hasNext())
      {
        if (shutdown.get())
        {
          return false;
        }
        if (it.next().isNewerThan(startCSN))
        {
          break;
@@ -4401,6 +4433,11 @@
    CSN currentStartCSN = startCSN;
    do
    {
      if (shutdown.get())
      {
        return false;
      }
      lastRetrievedChange = null;
      // We can't do the search in one go because we need to store the results
      // so that we are sure we send the operations in order and because the
@@ -4417,15 +4454,21 @@
      // Publish and remove all the changes from the replayOperations list
      // that are older than the endCSN.
      List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
      final List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
      synchronized (replayOperations)
      {
        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
        while (itOp.hasNext())
        {
          if (shutdown.get())
          {
            return false;
          }
          FakeOperation fakeOp = itOp.next();
          if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check
              || !state.cover(fakeOp.getCSN()))
              || !state.cover(fakeOp.getCSN())
              // do not look for replay operations in the future
              || endCSN.isNewerThan(now()))
          {
            break;
          }
@@ -4438,9 +4481,13 @@
      for (FakeOperation opToSend : opsToSend)
      {
        if (shutdown.get())
        {
          return false;
        }
        session.publishRecovery(opToSend.generateMessage());
      }
      opsToSend.clear();
      if (lastRetrievedChange != null)
      {
        currentStartCSN = lastRetrievedChange;
@@ -4449,13 +4496,16 @@
      {
        currentStartCSN = endCSN;
      }
    } while (pendingChanges.recoveryUntil(lastRetrievedChange)
          && op.getResultCode().equals(ResultCode.SUCCESS));
    return op.getResultCode().equals(ResultCode.SUCCESS);
  }
  private static CSN now()
  {
    return new CSN(TimeThread.getTime(), 0, 0);
  }
  /**
   * Search for the changes that happened since fromCSN based on the historical
@@ -4589,6 +4639,7 @@
        catch (InterruptedException e)
        {
          // Thread interrupted: check for shutdown.
          Thread.currentThread().interrupt();
        }
      }
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -31,6 +31,7 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.assertj.core.api.Assertions;
import org.opends.messages.Category;
@@ -187,10 +188,8 @@
    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
    TestBroker session = new TestBroker(opList);
    boolean result =
      rd1.buildAndPublishMissingChanges(
          new CSN(startTime, 0, serverId),
          session);
      CSN csn = new CSN(startTime, 0, serverId);
      boolean result = rd1.buildAndPublishMissingChanges(csn, session, new AtomicBoolean());
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 3, "buildAndPublishMissingChanges should return 3 operations");
    assertTrue(opList.getFirst().getClass().equals(AddMsg.class));
@@ -204,7 +203,7 @@
    opList = new LinkedList<ReplicationMsg>();
    session = new TestBroker(opList);
      result = rd1.buildAndPublishMissingChanges(fromCSN, session);
      result = rd1.buildAndPublishMissingChanges(fromCSN, session, new AtomicBoolean());
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 1, "buildAndPublishMissingChanges should return 1 operation");
    assertTrue(opList.getFirst().getClass().equals(ModifyMsg.class));
@@ -291,10 +290,8 @@
    // Call the buildAndPublishMissingChanges and check that this method
    // correctly generates the 4 operations in the correct order.
    boolean result =
      rd1.buildAndPublishMissingChanges(
          new CSN(startTime, 0, serverId),
          session);
      CSN csn = new CSN(startTime, 0, serverId);
      boolean result = rd1.buildAndPublishMissingChanges(csn, session, new AtomicBoolean());
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 5, "buildAndPublishMissingChanges should return 5 operations");
    ReplicationMsg msg = opList.removeFirst();
@@ -336,14 +333,11 @@
  private LDAPReplicationDomain createReplicationDomain(int dsId)
          throws DirectoryException, ConfigException
  {
    DN baseDN = DN.decode(TEST_ROOT_DN_STRING);
    DomainFakeCfg domainConf =
      new DomainFakeCfg(baseDN, dsId, replServers, AssuredType.NOT_ASSURED,
      2, 1, 0, null);
    LDAPReplicationDomain replicationDomain =
      MultimasterReplication.createNewDomain(domainConf);
    final DN baseDN = DN.decode(TEST_ROOT_DN_STRING);
    final DomainFakeCfg domainConf = new DomainFakeCfg(
        baseDN, dsId, replServers, AssuredType.NOT_ASSURED, 2, 1, 0, null);
    LDAPReplicationDomain replicationDomain = MultimasterReplication.createNewDomain(domainConf);
    replicationDomain.start();
    return replicationDomain;
  }
}