mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.06.2013 3253906b33605684c3e071a6e5c3af0f20c9e375
OPENDJ-1190 (CR-2523) Under rare circumstances the DS replication recovery thread (RSUpdater) can spin


This change is linked to the misbehaving RSUpdater thread which (wrongly):
- could be started multiple times
- would not shutdown willingly when the server is shutting down
- would try to look for replay operations in the future



LDAPReplicationDomain.java:
Added AtomicReference for RSUpdater thread.
Used DirectoryThread.isShutdownInitiated() for ServerStateFlush and RSUpdater threads.
In RSUpdater, added shutdown field to pass it down to buildAndPublishMissingChanges() + overrode initiateShutdown() to set the shutdown field to true.
In shutdown(), called initiateShutdown() for RSUpdater and RSUpdater threads.
In sessionInitiated(), only start the RSUpdaterThread if it does not already exist.
In buildAndPublishMissingChanges(), added early exits in case of shutdown.
Added now() method.

HistoricalCsnOrderingTest.java:
Consequence of the change to LDAPReplicationDomain.
2 files modified
119 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 85 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java 34 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -34,6 +34,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.DataFormatException;
import org.opends.messages.Category;
@@ -199,6 +200,8 @@
   * is not updated too early.
   */
  private final PendingChanges pendingChanges;
  private final AtomicReference<RSUpdater> rsUpdater =
      new AtomicReference<RSUpdater>(null);
  /**
   * It contain the updates that were done on other servers, transmitted
@@ -335,7 +338,7 @@
   * The thread that periodically saves the ServerState of this
   * LDAPReplicationDomain in the database.
   */
  private class  ServerStateFlush extends DirectoryThread
  private class ServerStateFlush extends DirectoryThread
  {
    protected ServerStateFlush()
    {
@@ -351,7 +354,7 @@
    {
      done = false;
      while (!shutdown)
      while (!isShutdownInitiated())
      {
        try
        {
@@ -368,6 +371,7 @@
        catch (InterruptedException e)
        {
          // Thread interrupted: check for shutdown.
          Thread.currentThread().interrupt();
        }
      }
      state.save();
@@ -383,6 +387,11 @@
  private class RSUpdater extends DirectoryThread
  {
    private final CSN startCSN;
    /**
     * Used to communicate that the current thread computation needs to
     * shutdown.
     */
    private AtomicBoolean shutdown = new AtomicBoolean(false);
    protected RSUpdater(CSN replServerMaxCSN)
    {
@@ -400,8 +409,7 @@
    {
      // Replication server is missing some of our changes: let's
      // send them to him.
      Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
      logError(message);
      logError(DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get());
      /*
       * Get all the changes that have not been seen by this
@@ -409,10 +417,9 @@
       */
      try
      {
        if (buildAndPublishMissingChanges(startCSN, broker))
        if (buildAndPublishMissingChanges(startCSN, broker, shutdown))
        {
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
          logError(DEBUG_CHANGES_SENT.get());
          synchronized(replayOperations)
          {
            replayOperations.clear();
@@ -427,8 +434,7 @@
           * Log an error for the repair tool
           * that will need to re-synchronize the servers.
           */
          message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
          logError(message);
          logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
        }
      } catch (Exception e)
      {
@@ -439,14 +445,24 @@
         * Log an error for the repair tool
         * that will need to re-synchronize the servers.
         */
        message = ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString());
        logError(message);
        logError(ERR_CANNOT_RECOVER_CHANGES.get(getBaseDNString()));
      }
      finally
      {
        broker.setRecoveryRequired(false);
        // RSUpdater thread has finished its work, let's remove it from memory
        // so another RSUpdater thread can be started if needed.
        rsUpdater.compareAndSet(this, null);
      }
    }
    /** {@inheritDoc} */
    @Override
    public void initiateShutdown()
    {
      this.shutdown.set(true);
      super.initiateShutdown();
    }
  }
@@ -2372,10 +2388,16 @@
    if (!shutdown)
    {
      shutdown = true;
      final RSUpdater rsUpdater = this.rsUpdater.get();
      if (rsUpdater != null)
      {
        rsUpdater.initiateShutdown();
      }
      // stop the thread in charge of flushing the ServerState.
      if (flushThread != null)
      {
        flushThread.initiateShutdown();
        synchronized (flushThread)
        {
          flushThread.notify();
@@ -4356,7 +4378,11 @@
        {
          pendingChanges.setRecovering(true);
          broker.setRecoveryRequired(true);
          new RSUpdater(replServerMaxCSN).start();
          final RSUpdater rsUpdater = new RSUpdater(replServerMaxCSN);
          if (this.rsUpdater.compareAndSet(null, rsUpdater))
          {
            rsUpdater.start();
          }
        }
      }
    } catch (Exception e)
@@ -4375,12 +4401,14 @@
   *          The CSN where we need to start the search
   * @param session
   *          The session to use to publish the changes
   * @param shutdown
   *          whether the current run must be stopped
   * @return A boolean indicating he success of the operation.
   * @throws Exception
   *           if an Exception happens during the search.
   */
  public boolean buildAndPublishMissingChanges(CSN startCSN,
      ReplicationBroker session) throws Exception
      ReplicationBroker session, AtomicBoolean shutdown) throws Exception
  {
    // Trim the changes in replayOperations that are older than the startCSN.
    synchronized (replayOperations)
@@ -4388,6 +4416,10 @@
      Iterator<CSN> it = replayOperations.keySet().iterator();
      while (it.hasNext())
      {
        if (shutdown.get())
        {
          return false;
        }
        if (it.next().isNewerThan(startCSN))
        {
          break;
@@ -4401,6 +4433,11 @@
    CSN currentStartCSN = startCSN;
    do
    {
      if (shutdown.get())
      {
        return false;
      }
      lastRetrievedChange = null;
      // We can't do the search in one go because we need to store the results
      // so that we are sure we send the operations in order and because the
@@ -4417,15 +4454,21 @@
      // Publish and remove all the changes from the replayOperations list
      // that are older than the endCSN.
      List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
      final List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
      synchronized (replayOperations)
      {
        Iterator<FakeOperation> itOp = replayOperations.values().iterator();
        while (itOp.hasNext())
        {
          if (shutdown.get())
          {
            return false;
          }
          FakeOperation fakeOp = itOp.next();
          if (fakeOp.getCSN().isNewerThan(endCSN) // sanity check
              || !state.cover(fakeOp.getCSN()))
              || !state.cover(fakeOp.getCSN())
              // do not look for replay operations in the future
              || endCSN.isNewerThan(now()))
          {
            break;
          }
@@ -4438,9 +4481,13 @@
      for (FakeOperation opToSend : opsToSend)
      {
        if (shutdown.get())
        {
          return false;
        }
        session.publishRecovery(opToSend.generateMessage());
      }
      opsToSend.clear();
      if (lastRetrievedChange != null)
      {
        currentStartCSN = lastRetrievedChange;
@@ -4449,13 +4496,16 @@
      {
        currentStartCSN = endCSN;
      }
    } while (pendingChanges.recoveryUntil(lastRetrievedChange)
          && op.getResultCode().equals(ResultCode.SUCCESS));
    return op.getResultCode().equals(ResultCode.SUCCESS);
  }
  private static CSN now()
  {
    return new CSN(TimeThread.getTime(), 0, 0);
  }
  /**
   * Search for the changes that happened since fromCSN based on the historical
@@ -4589,6 +4639,7 @@
        catch (InterruptedException e)
        {
          // Thread interrupted: check for shutdown.
          Thread.currentThread().interrupt();
        }
      }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalCsnOrderingTest.java
@@ -31,6 +31,7 @@
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.assertj.core.api.Assertions;
import org.opends.messages.Category;
@@ -187,10 +188,8 @@
    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
    TestBroker session = new TestBroker(opList);
    boolean result =
      rd1.buildAndPublishMissingChanges(
          new CSN(startTime, 0, serverId),
          session);
      CSN csn = new CSN(startTime, 0, serverId);
      boolean result = rd1.buildAndPublishMissingChanges(csn, session, new AtomicBoolean());
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 3, "buildAndPublishMissingChanges should return 3 operations");
    assertTrue(opList.getFirst().getClass().equals(AddMsg.class));
@@ -204,7 +203,7 @@
    opList = new LinkedList<ReplicationMsg>();
    session = new TestBroker(opList);
      result = rd1.buildAndPublishMissingChanges(fromCSN, session);
      result = rd1.buildAndPublishMissingChanges(fromCSN, session, new AtomicBoolean());
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 1, "buildAndPublishMissingChanges should return 1 operation");
    assertTrue(opList.getFirst().getClass().equals(ModifyMsg.class));
@@ -278,23 +277,21 @@
        "dn: cn=test2," + baseDN,
        "changetype: modify",
        "add: description",
    "description: foo");
        "description: foo");
    resultCode = TestCaseUtils.applyModifications(false,
        "dn: cn=test1," + baseDN,
        "changetype: modify",
        "add: description",
    "description: foo");
        "description: foo");
    assertEquals(resultCode, 0);
    LinkedList<ReplicationMsg> opList = new LinkedList<ReplicationMsg>();
    TestBroker session = new TestBroker(opList);
    // Call the buildAndPublishMissingChanges and check that this method
    // correctly generates the 4 operations in the correct order.
    boolean result =
      rd1.buildAndPublishMissingChanges(
          new CSN(startTime, 0, serverId),
          session);
      // Call the buildAndPublishMissingChanges and check that this method
      // correctly generates the 4 operations in the correct order.
      CSN csn = new CSN(startTime, 0, serverId);
      boolean result = rd1.buildAndPublishMissingChanges(csn, session, new AtomicBoolean());
    assertTrue(result, "buildAndPublishMissingChanges has failed");
    assertEquals(opList.size(), 5, "buildAndPublishMissingChanges should return 5 operations");
    ReplicationMsg msg = opList.removeFirst();
@@ -336,14 +333,11 @@
  private LDAPReplicationDomain createReplicationDomain(int dsId)
          throws DirectoryException, ConfigException
  {
    DN baseDN = DN.decode(TEST_ROOT_DN_STRING);
    DomainFakeCfg domainConf =
      new DomainFakeCfg(baseDN, dsId, replServers, AssuredType.NOT_ASSURED,
      2, 1, 0, null);
    LDAPReplicationDomain replicationDomain =
      MultimasterReplication.createNewDomain(domainConf);
    final DN baseDN = DN.decode(TEST_ROOT_DN_STRING);
    final DomainFakeCfg domainConf = new DomainFakeCfg(
        baseDN, dsId, replServers, AssuredType.NOT_ASSURED, 2, 1, 0, null);
    LDAPReplicationDomain replicationDomain = MultimasterReplication.createNewDomain(domainConf);
    replicationDomain.start();
    return replicationDomain;
  }
}