mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
04.50.2011 a5131f44a6afa554af8f4c82c7ffd3d4ceac1bd4
OPEN - issue OPENDJ-26: Fix OpenDS issue 4585: ConcurrentModificationException in ReplicationBroker 
https://bugster.forgerock.org/jira/browse/OPENDJ-26

Fix CME in ReplicationBroker and various other potential unsynchronized variable accesses in replication code.
20 files modified
714 ■■■■ changed files
opends/src/server/org/opends/server/replication/common/ServerState.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 47 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java 12 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java 19 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/SocketSession.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java 359 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java 38 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDB.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java 6 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java 42 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java 39 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ListenerThread.java 24 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 35 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 54 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.common;
@@ -48,8 +49,8 @@
 */
public class ServerState implements Iterable<Integer>
{
  private HashMap<Integer, ChangeNumber> list;
  private boolean saved = true;
  private final HashMap<Integer, ChangeNumber> list;
  private volatile boolean saved = true;
  /**
   * Creates a new empty ServerState.
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -191,8 +191,8 @@
   */
  private class ScanSearchListener implements InternalSearchListener
  {
    private ChangeNumber startingChangeNumber = null;
    private ChangeNumber endChangeNumber = null;
    private final ChangeNumber startingChangeNumber;
    private final ChangeNumber endChangeNumber;
    public ScanSearchListener(
        ChangeNumber startingChangeNumber,
@@ -262,8 +262,8 @@
  private final PersistentServerState state;
  private int numReplayedPostOpCalled = 0;
  private long generationId = -1;
  private boolean generationIdSavedStatus = false;
  private volatile long generationId = -1;
  private volatile boolean generationIdSavedStatus = false;
  private final ChangeNumberGenerator generator;
@@ -289,15 +289,15 @@
  private final DN baseDn;
  private boolean shutdown = false;
  private volatile boolean shutdown = false;
  private final InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
  private boolean solveConflictFlag = true;
  private boolean disabled = false;
  private boolean stateSavingDisabled = false;
  private volatile boolean disabled = false;
  private volatile boolean stateSavingDisabled = false;
  // This list is used to temporary store operations that needs
  // to be replayed at session establishment time.
@@ -311,7 +311,7 @@
   * Possible values are accept-updates or deny-updates, but other values
   * may be added in the future.
   */
  private IsolationPolicy isolationpolicy;
  private IsolationPolicy isolationPolicy;
  /**
   * The DN of the configuration entry of this domain.
@@ -323,7 +323,7 @@
   * A boolean indicating if the thread used to save the persistentServerState
   * is terminated.
   */
  private boolean done = true;
  private volatile boolean done = true;
  private ServerStateFlush flushThread;
@@ -374,7 +374,7 @@
   * fractional configuration (i.e with compliant fractional configuration in
   * domain root entry).
   */
  private boolean force_bad_data_set = false;
  private boolean forceBadDataSet = false;
  /**
   * This flag is used by the fractional replication ldif import plugin to
@@ -447,21 +447,24 @@
    {
      done = false;
      while (shutdown  == false)
      while (shutdown == false)
      {
        try
        {
          synchronized (this)
          {
            this.wait(1000);
            if (!disabled && !stateSavingDisabled )
            if (!disabled && !stateSavingDisabled)
            {
              // save the ServerState
              state.save();
            }
          }
        } catch (InterruptedException e)
        { }
        }
        catch (InterruptedException e)
        {
          // Thread interrupted: check for shutdown.
        }
      }
      state.save();
@@ -475,7 +478,7 @@
   */
  private class RSUpdater extends DirectoryThread
  {
    private ChangeNumber startChangeNumber;
    private final ChangeNumber startChangeNumber;
    protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
    {
      super("Replication Server Updater for server id " +
@@ -571,7 +574,7 @@
    this.baseDn = configuration.getBaseDN();
    int window  = configuration.getWindowSize();
    heartbeatInterval = configuration.getHeartbeatInterval();
    this.isolationpolicy = configuration.getIsolationPolicy();
    this.isolationPolicy = configuration.getIsolationPolicy();
    this.configDn = configuration.dn();
    this.logChangeNumber = configuration.isLogChangenumber();
    this.updateToReplayQueue = updateToReplayQueue;
@@ -2030,12 +2033,12 @@
   */
  private boolean brokerIsConnected(PreOperationOperation op)
  {
    if (isolationpolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
    if (isolationPolicy.equals(IsolationPolicy.ACCEPT_ALL_UPDATES))
    {
      // this policy imply that we always accept updates.
      return true;
    }
    if (isolationpolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
    if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
    {
      // this isolation policy specifies that the updates are denied
      // when the broker had problems during the connection phase
@@ -4429,7 +4432,7 @@
  public ConfigChangeResult applyConfigurationChange(
         ReplicationDomainCfg configuration)
  {
    isolationpolicy = configuration.getIsolationPolicy();
    isolationPolicy = configuration.getIsolationPolicy();
    logChangeNumber = configuration.isLogChangenumber();
    histPurgeDelayInMilliSec =
      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
@@ -4657,7 +4660,7 @@
  {
    // Check domain fractional configuration consistency with local
    // configuration variables
    force_bad_data_set = !isBackendFractionalConfigConsistent();
    forceBadDataSet = !isBackendFractionalConfigConsistent();
    super.sessionInitiated(
        initStatus, replicationServerState,generationID, session);
@@ -4687,7 +4690,7 @@
    }
    // Now for bad data set status if needed
    if (force_bad_data_set)
    if (forceBadDataSet)
    {
      // Go into bad data set status
      setNewStatus(StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT);
@@ -4950,7 +4953,7 @@
  {
    // Ignore message if fractional configuration is inconcsistent and
    // we have been passed into bad data set status
    if (force_bad_data_set)
    if (forceBadDataSet)
    {
      return false;
    }
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.plugin;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
@@ -54,9 +55,9 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private BlockingQueue<UpdateToReplay> updateToReplayQueue = null;
  private boolean shutdown = false;
  private boolean done = false;
  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
  private volatile boolean shutdown = false;
  private volatile boolean done = false;
  private static int count = 0;
  /**
opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2007-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.protocol;
@@ -54,25 +55,25 @@
  /**
   * The session on which heartbeats are to be monitored.
   */
  private ProtocolSession session;
  private final ProtocolSession session;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval;
  private final long heartbeatInterval;
  /**
   * Set this to stop the thread.
   */
  private boolean shutdown = false;
  private volatile boolean shutdown = false;
  /**
   * Send StopMsg before session closure or not.
   */
  private boolean sendStopBeforeClose = false;
  private final boolean sendStopBeforeClose;
  /**
@@ -131,7 +132,8 @@
              try
              {
                session.publish(new StopMsg());
              } catch(IOException ioe)
              }
              catch (IOException ioe)
              {
                // Anyway, going to close session, so nothing to do
              }
opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.protocol;
@@ -50,25 +51,25 @@
  /**
   * For test purposes only to simulate loss of heartbeats.
   */
  static private boolean heartbeatsDisabled = false;
  private static volatile boolean heartbeatsDisabled = false;
  /**
   * The session on which heartbeats are to be sent.
   */
  private ProtocolSession session;
  private final ProtocolSession session;
  /**
   * The time in milliseconds between heartbeats.
   */
  private long heartbeatInterval;
  private final long heartbeatInterval;
  /**
   * Set this to stop the thread.
   */
  private Boolean shutdown = false;
  private final Object shutdown_lock = new Object();
  private volatile boolean shutdown = false;
  private final Object shutdownLock = new Object();
  /**
@@ -136,11 +137,11 @@
            TRACER.debugVerbose("Heartbeat thread sleeping for %d", sleepTime);
          }
          synchronized (shutdown_lock)
          synchronized (shutdownLock)
          {
            if (!shutdown)
            {
              shutdown_lock.wait(sleepTime);
              shutdownLock.wait(sleepTime);
            }
          }
        }
@@ -174,10 +175,10 @@
   */
  public void shutdown()
  {
    synchronized (shutdown_lock)
    synchronized (shutdownLock)
    {
      shutdown = true;
      shutdown_lock.notifyAll();
      shutdownLock.notifyAll();
      if (debugEnabled())
      {
        TRACER.debugInfo("Going to notify Heartbeat thread.");
opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.protocol;
@@ -59,7 +60,7 @@
  /**
   * The time the last message published to this session.
   */
  private long lastPublishTime = 0;
  private volatile long lastPublishTime = 0;
  /**
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.protocol;
@@ -62,7 +63,7 @@
  /**
   * The time the last message published to this session.
   */
  private long lastPublishTime = 0;
  private volatile long lastPublishTime = 0;
  /**
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -23,16 +23,14 @@
 *
 *
 *      Copyright 2007-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.AssuredMode;
@@ -60,9 +58,9 @@
public class TopologyMsg extends ReplicationMsg
{
  // Information for the DS known in the topology
  private List<DSInfo> dsList = new ArrayList<DSInfo>();
  private final List<DSInfo> dsList;
  // Information for the RS known in the topology
  private List<RSInfo> rsList = new ArrayList<RSInfo>();
  private final List<RSInfo> rsList;
  /**
   * Creates a new changelogInfo message from its encoded form.
@@ -74,7 +72,168 @@
   */
  public TopologyMsg(byte[] in, short version) throws DataFormatException
  {
    decode(in, version);
    try
    {
      /* First byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
      {
        throw new DataFormatException(
          "Input is not a valid " + this.getClass().getCanonicalName());
      }
      int pos = 1;
      /* Read number of following DS info entries */
      byte nDsInfo = in[pos++];
      /* Read the DS info entries */
      List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo));
      while ( (nDsInfo > 0) && (pos < in.length) )
      {
        /* Read DS id */
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int dsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read RS id */
        length =
          getNextLength(in, pos);
        serverIdString =
          new String(in, pos, length, "UTF-8");
        int rsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        pos += length + 1;
        /* Read DS status */
        ServerStatus status = ServerStatus.valueOf(in[pos++]);
        /* Read DS assured flag */
        boolean assuredFlag;
        if (in[pos++] == 1)
        {
          assuredFlag = true;
        } else
        {
          assuredFlag = false;
        }
        /* Read DS assured mode */
        AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
        /* Read DS safe data level */
        byte safeDataLevel = in[pos++];
        /* Read DS group id */
        byte groupId = in[pos++];
        /* Read number of referrals URLs */
        List<String> refUrls = new ArrayList<String>();
        byte nUrls = in[pos++];
        byte nRead = 0;
        /* Read urls until expected number read */
        while ((nRead != nUrls) &&
          (pos < in.length) //security
          )
        {
          length = getNextLength(in, pos);
          String url = new String(in, pos, length, "UTF-8");
          refUrls.add(url);
          pos += length + 1;
          nRead++;
        }
        Set<String> attrs = new HashSet<String>();
        short protocolVersion = -1;
        if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          byte nAttrs = in[pos++];
          nRead = 0;
          /* Read attrs until expected number read */
          while ((nRead != nAttrs) &&
            (pos < in.length) //security
            )
          {
            length = getNextLength(in, pos);
            String attr = new String(in, pos, length, "UTF-8");
            attrs.add(attr);
            pos += length + 1;
            nRead++;
          }
          /* Read Protocol version */
          protocolVersion = Short.valueOf(in[pos++]);
        }
        /* Now create DSInfo and store it in list */
        DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status,
          assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
          protocolVersion);
        dsList.add(dsInfo);
        nDsInfo--;
      }
      /* Read number of following RS info entries */
      byte nRsInfo = in[pos++];
      /* Read the RS info entries */
      List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
      while ( (nRsInfo > 0) && (pos < in.length) )
      {
        /* Read RS id */
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int id = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        pos += length + 1;
        /* Read RS group id */
        byte groupId = in[pos++];
        int weight = 1;
        String serverUrl = null;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          length = getNextLength(in, pos);
          serverUrl = new String(in, pos, length, "UTF-8");
          pos += length + 1;
          /* Read RS weight */
          length = getNextLength(in, pos);
          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
          pos += length + 1;
        }
        /* Now create RSInfo and store it in list */
        RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
          weight);
        rsList.add(rsInfo);
        nRsInfo--;
      }
      this.dsList = Collections.unmodifiableList(dsList);
      this.rsList = Collections.unmodifiableList(rsList);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
@@ -85,10 +244,23 @@
   */
  public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
  {
    if (dsList != null) // null means no info, let empty list from init time
      this.dsList = dsList;
    if (rsList != null) // null means no info, let empty list from init time
      this.rsList = rsList;
    if (dsList == null || dsList.isEmpty())
    {
      this.dsList = Collections.emptyList();
    }
    else
    {
      this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList));
    }
    if (rsList == null || rsList.isEmpty())
    {
      this.rsList = Collections.emptyList();
    }
    else
    {
      this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList));
    }
  }
  // ============
@@ -219,172 +391,7 @@
  }
  // ============
  // Msg decoding
  // ============
  private void decode(byte[] in, short version)
  throws DataFormatException
  {
    try
    {
      /* First byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
      {
        throw new DataFormatException(
          "Input is not a valid " + this.getClass().getCanonicalName());
      }
      int pos = 1;
      /* Read number of following DS info entries */
      byte nDsInfo = in[pos++];
      /* Read the DS info entries */
      while ( (nDsInfo > 0) && (pos < in.length) )
      {
        /* Read DS id */
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int dsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read RS id */
        length =
          getNextLength(in, pos);
        serverIdString =
          new String(in, pos, length, "UTF-8");
        int rsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        pos += length + 1;
        /* Read DS status */
        ServerStatus status = ServerStatus.valueOf(in[pos++]);
        /* Read DS assured flag */
        boolean assuredFlag;
        if (in[pos++] == 1)
        {
          assuredFlag = true;
        } else
        {
          assuredFlag = false;
        }
        /* Read DS assured mode */
        AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
        /* Read DS safe data level */
        byte safeDataLevel = in[pos++];
        /* Read DS group id */
        byte groupId = in[pos++];
        /* Read number of referrals URLs */
        List<String> refUrls = new ArrayList<String>();
        byte nUrls = in[pos++];
        byte nRead = 0;
        /* Read urls until expected number read */
        while ((nRead != nUrls) &&
          (pos < in.length) //security
          )
        {
          length = getNextLength(in, pos);
          String url = new String(in, pos, length, "UTF-8");
          refUrls.add(url);
          pos += length + 1;
          nRead++;
        }
        Set<String> attrs = new HashSet<String>();
        short protocolVersion = -1;
        if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          byte nAttrs = in[pos++];
          nRead = 0;
          /* Read attrs until expected number read */
          while ((nRead != nAttrs) &&
            (pos < in.length) //security
            )
          {
            length = getNextLength(in, pos);
            String attr = new String(in, pos, length, "UTF-8");
            attrs.add(attr);
            pos += length + 1;
            nRead++;
          }
          /* Read Protocol version */
          protocolVersion = Short.valueOf(in[pos++]);
        }
        /* Now create DSInfo and store it in list */
        DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status,
          assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
          protocolVersion);
        dsList.add(dsInfo);
        nDsInfo--;
      }
      /* Read number of following RS info entries */
      byte nRsInfo = in[pos++];
      /* Read the RS info entries */
      while ( (nRsInfo > 0) && (pos < in.length) )
      {
        /* Read RS id */
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int id = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        pos += length + 1;
        /* Read RS group id */
        byte groupId = in[pos++];
        int weight = 1;
        String serverUrl = null;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          length = getNextLength(in, pos);
          serverUrl = new String(in, pos, length, "UTF-8");
          pos += length + 1;
          /* Read RS weight */
          length = getNextLength(in, pos);
          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
          pos += length + 1;
        }
        /* Now create RSInfo and store it in list */
        RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
          weight);
        rsList.add(rsInfo);
        nRsInfo--;
      }
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -52,22 +53,23 @@
public class MonitoringPublisher extends DirectoryThread
{
  private boolean shutdown = false;
  private volatile boolean shutdown = false;
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  // The domain we send monitoring for
  private ReplicationServerDomain replicationServerDomain;
  private final ReplicationServerDomain replicationServerDomain;
  // Sleep time (in ms) before sending new monitoring messages.
  private long period = 3000;
  private volatile long period;
  // Is the thread terminated ?
  private boolean done = false;
  private volatile boolean done = false;
  private final Object sleeper = new Object();
  private final Object shutdownLock = new Object();
  /**
   * Create a monitoring publisher.
@@ -104,9 +106,12 @@
      {
        try
        {
          synchronized (sleeper)
          synchronized (shutdownLock)
          {
            sleeper.wait(period);
            if (!shutdown)
            {
              shutdownLock.wait(period);
            }
          }
        } catch (InterruptedException ex)
        {
@@ -157,16 +162,17 @@
   */
  public void shutdown()
  {
    if (debugEnabled())
    synchronized (shutdownLock)
    {
      TRACER.debugInfo("Shutting down monitoring publisher for dn " +
        replicationServerDomain.getBaseDn().toString() + " in RS " +
        replicationServerDomain.getReplicationServer().getServerId());
    }
    shutdown = true;
    synchronized (sleeper)
    {
      sleeper.notify();
      shutdown = true;
      shutdownLock.notifyAll();
      if (debugEnabled())
      {
        TRACER.debugInfo("Shutting down monitoring publisher for dn " +
          replicationServerDomain.getBaseDn().toString() + " in RS " +
          replicationServerDomain.getReplicationServer().getServerId());
      }
    }
  }
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
import org.opends.messages.*;
@@ -255,13 +256,10 @@
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
      cursor.close();
    }
    catch (DatabaseException dbe)
    finally
    {
      cursor.close();
      throw dbe;
    }
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -38,7 +39,7 @@
  /**
   * The Replication Server that created this thread.
   */
  private ReplicationServer server;
  private final ReplicationServer server;
  /**
   * Creates a new instance of this directory thread with the
opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -38,7 +39,7 @@
  /**
   * The Replication Server that created this thread.
   */
  private ReplicationServer server;
  private final ReplicationServer server;
  /**
   * Creates a new instance of this directory thread with the
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -58,8 +59,8 @@
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private ProtocolSession session;
  private ServerHandler handler;
  private final ProtocolSession session;
  private final ServerHandler handler;
  /**
   * Constructor for the LDAP server reader part of the replicationServer.
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
@@ -57,10 +58,10 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private ProtocolSession session;
  private ServerHandler handler;
  private ReplicationServerDomain replicationServerDomain;
  private short protocolVersion = -1;
  private final ProtocolSession session;
  private final ServerHandler handler;
  private final ReplicationServerDomain replicationServerDomain;
  private final short protocolVersion;
  /**
   * Create a ServerWriter.
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -47,21 +48,22 @@
public class StatusAnalyzer extends DirectoryThread
{
  private boolean finished = false;
  private volatile boolean shutdown = false;
  /**
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private ReplicationServerDomain replicationServerDomain;
  private int degradedStatusThreshold = -1;
  private final ReplicationServerDomain replicationServerDomain;
  private volatile int degradedStatusThreshold = -1;
  // Sleep time for the thread, in ms.
  private int STATUS_ANALYZER_SLEEP_TIME = 5000;
  private static final int STATUS_ANALYZER_SLEEP_TIME = 5000;
  private boolean done = false;
  private volatile boolean done = false;
  private Object sleeper = new Object();
  private final Object shutdownLock = new Object();
  /**
   * Create a StatusAnalyzer.
@@ -95,13 +97,16 @@
    }
    boolean interrupted = false;
    while (!finished && !interrupted)
    while (!shutdown && !interrupted)
    {
      try
      {
        synchronized (sleeper)
        synchronized (shutdownLock)
        {
          sleeper.wait(STATUS_ANALYZER_SLEEP_TIME);
          if (!shutdown)
          {
            shutdownLock.wait(STATUS_ANALYZER_SLEEP_TIME);
          }
        }
      } catch (InterruptedException ex)
      {
@@ -192,16 +197,17 @@
   */
  public void shutdown()
  {
    if (debugEnabled())
    synchronized (shutdownLock)
    {
      TRACER.debugInfo("Shutting down status analyzer for dn " +
        replicationServerDomain.getBaseDn().toString() + " in RS " +
        replicationServerDomain.getReplicationServer().getServerId());
    }
    finished = true;
    synchronized (sleeper)
    {
      sleeper.notify();
      shutdown = true;
      shutdownLock.notifyAll();
      if (debugEnabled())
      {
        TRACER.debugInfo("Shutting down status analyzer for dn "
            + replicationServerDomain.getBaseDn().toString() + " in RS "
            + replicationServerDomain.getReplicationServer().getServerId());
      }
    }
  }
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -51,26 +52,21 @@
  private static final DebugTracer TRACER = getTracer();
  /**
   * For test purposes only to simulate loss of heartbeats.
   */
  static private boolean heartbeatsDisabled = false;
  /**
   * The session on which heartbeats are to be sent.
   */
  private ProtocolSession session;
  private final ProtocolSession session;
  /**
   * The time in milliseconds between heartbeats.
   */
  private long heartbeatInterval;
  private int serverId;
  private final long heartbeatInterval;
  private final int serverId;
  /**
   * Set this to stop the thread.
   */
  private Boolean shutdown = false;
  private final Object shutdown_lock = new Object();
  private volatile boolean shutdown = false;
  private final Object shutdownLock = new Object();
  /**
   * Create a heartbeat thread.
@@ -112,10 +108,7 @@
        if (now > session.getLastPublishTime() + heartbeatInterval)
        {
          if (!heartbeatsDisabled)
          {
            session.publish(ctHeartbeatMsg);
          }
          session.publish(ctHeartbeatMsg);
        }
        try
@@ -127,11 +120,11 @@
            sleepTime = heartbeatInterval;
          }
          synchronized (shutdown_lock)
          synchronized (shutdownLock)
          {
            if (!shutdown)
            {
              shutdown_lock.wait(sleepTime);
              shutdownLock.wait(sleepTime);
            }
          }
        }
@@ -166,20 +159,10 @@
   */
  public void shutdown()
  {
    synchronized (shutdown_lock)
    synchronized (shutdownLock)
    {
      shutdown = true;
      shutdown_lock.notifyAll();
      shutdownLock.notifyAll();
    }
  }
  /**
   * For testing purposes only to simulate loss of heartbeats.
   * @param heartbeatsDisabled Set true to prevent heartbeats from being sent.
   */
  public static void setHeartbeatsDisabled(boolean heartbeatsDisabled)
  {
    CTHeartbeatPublisherThread.heartbeatsDisabled = heartbeatsDisabled;
  }
}
opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.service;
import org.opends.messages.Message;
@@ -48,9 +49,9 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private ReplicationDomain repDomain;
  private boolean shutdown = false;
  private boolean done = false;
  private final ReplicationDomain repDomain;
  private volatile boolean shutdown = false;
  private volatile boolean done = false;
  /**
@@ -95,11 +96,17 @@
        while ((!shutdown) && ((updateMsg = repDomain.receive()) != null))
        {
          if (repDomain.processUpdate(updateMsg) == true)
          {
            repDomain.processUpdateDoneSynchronous(updateMsg);
          }
        }
        if (updateMsg == null)
        {
          shutdown = true;
      } catch (Exception e)
        }
      }
      catch (Exception e)
      {
        /*
         * catch all exceptions happening in repDomain.receive so that the
@@ -119,6 +126,8 @@
    }
  }
  /**
   * Wait for the completion of this thread.
   */
@@ -134,12 +143,13 @@
        n++;
        if (n >= FACTOR)
        {
          TRACER.debugInfo("Interrupting listener thread for dn " +
            repDomain.getServiceID() + " in DS " + repDomain.getServerId());
          TRACER.debugInfo("Interrupting listener thread for dn "
              + repDomain.getServiceID() + " in DS " + repDomain.getServerId());
          this.interrupt();
        }
      }
    } catch (InterruptedException e)
    }
    catch (InterruptedException e)
    {
      // exit the loop if this thread is interrupted.
    }
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -51,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -97,11 +98,11 @@
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private boolean shutdown = false;
  private Collection<String> servers;
  private boolean connected = false;
  private String replicationServer = "Not connected";
  private ProtocolSession session = null;
  private volatile boolean shutdown = false;
  private volatile Collection<String> servers;
  private volatile boolean connected = false;
  private volatile String replicationServer = "Not connected";
  private volatile ProtocolSession session = null;
  private final ServerState state;
  private final String baseDn;
  private final int serverId;
@@ -156,7 +157,7 @@
   * and to know that it is necessary to print a new message when the broker
   * finally succeed to connect.
   */
  private boolean connectionError = false;
  private volatile boolean connectionError = false;
  private final Object connectPhaseLock = new Object();
  /**
   * The thread that publishes messages to the RS containing the current
@@ -173,18 +174,20 @@
   */
  // Info for other DSs.
  // Warning: does not contain info for us (for our server id)
  private List<DSInfo> dsList = new ArrayList<DSInfo>();
  private long generationID;
  private int updateDoneCount = 0;
  private boolean connectRequiresRecovery = false;
  private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
  private volatile long generationID;
  private volatile int updateDoneCount = 0;
  private volatile boolean connectRequiresRecovery = false;
  /**
   * The map of replication server info initialized at connection time and
   * regularly updated. This is used to decide to which best suitable
   * replication server one wants to connect.
   * Key: replication server id
   * Value: replication server info for the matching replication server id
   * replication server one wants to connect. Key: replication server id Value:
   * replication server info for the matching replication server id
   */
  private Map<Integer, ReplicationServerInfo> replicationServerInfos = null;
  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos
    = null;
  /**
   * This integer defines when the best replication server checking algorithm
   * should be engaged.
@@ -769,7 +772,7 @@
  {
    Map<Integer, ReplicationServerInfo> rsInfos =
      new HashMap<Integer, ReplicationServerInfo>();
      new ConcurrentHashMap<Integer, ReplicationServerInfo>();
    for (String server : servers)
    {
@@ -2535,8 +2538,6 @@
   * called in a single thread or protected by a locking mechanism
   * before being called.
   *
   * @throws SocketTimeoutException if the timeout set by setSoTimeout
   *         has expired
   * @param reconnectToTheBestRS Whether broker will automatically switch
   *                             to the best suitable RS.
   * @param reconnectOnFailure   Whether broker will automatically reconnect
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -124,8 +125,8 @@
 *   must use the {@link #publish(UpdateMsg)} method.
 * <p>
 *   If the Full Initialization process is needed then implementation
 *   for {@link #importBackend(InputStream)} and
 *   {@link #exportBackend(OutputStream)} must be
 *   for {@code importBackend(InputStream)} and
 *   {@code exportBackend(OutputStream)} must be
 *   provided.
 * <p>
 *   Full Initialization of a replica can be triggered by LDAP clients
@@ -1063,19 +1064,27 @@
    private final int serverToInitialize;
    private final int initWindow;
    /**
     * Constructor for the ExportThread.
     *
     * @param serverToInitialize serverId of server that will receive entries
     * @param serverToInitialize
     *          serverId of server that will receive entries
     * @param initWindow
     *          The value of the initialization window for flow control between
     *          the importer and the exporter.
     */
    public ExportThread(int serverToInitialize, int initWindow)
    {
      super("Export thread from serverId=" + serverID
          + " to serverId=" + serverToInitialize);
      super("Export thread from serverId=" + serverID + " to serverId="
          + serverToInitialize);
      this.serverToInitialize = serverToInitialize;
      this.initWindow = initWindow;
    }
    /**
     * Run method for this class.
     */
@@ -1342,11 +1351,8 @@
   * @return The source as a integer value
   * @throws DirectoryException if the string is not valid
   */
  public int decodeTarget(String targetString)
  throws DirectoryException
  public int decodeTarget(String targetString) throws DirectoryException
  {
    int  target = 0;
    Throwable cause;
    if (targetString.equalsIgnoreCase("all"))
    {
      return RoutableMsg.ALL_SERVERS;
@@ -1355,34 +1361,26 @@
    // So should be a serverID
    try
    {
      target = Integer.decode(targetString);
      int target = Integer.decode(targetString);
      if (target >= 0)
      {
        // FIXME Could we check now that it is a know server in the domain ?
      }
      return target;
    }
    catch(Exception e)
    catch (Exception e)
    {
      cause = e;
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_INVALID_EXPORT_TARGET.get();
      throw new DirectoryException(resultCode, message, e);
    }
    ResultCode resultCode = ResultCode.OTHER;
    Message message = ERR_INVALID_EXPORT_TARGET.get();
    if (cause != null)
      throw new DirectoryException(
          resultCode, message, cause);
    else
      throw new DirectoryException(
          resultCode, message);
  }
  /**
   * Initializes a remote server from this server.
   * <p>
   * The {@link #exportBackend(OutputStream)} will therefore be called
   * on this server, and the {@link #importBackend(InputStream)}
   * The {@code exportBackend(OutputStream)} will therefore be called
   * on this server, and the {@code importBackend(InputStream)}
   * will be called on the remote server.
   * <p>
   * The InputStream and OutpuStream given as a parameter to those
@@ -2138,8 +2136,8 @@
   * When this method is called, a request for initialization will
   * be sent to the source server asking for initialization.
   * <p>
   * The {@link #exportBackend(OutputStream)} will therefore be called
   * on the source server, and the {@link #importBackend(InputStream)}
   * The {@code exportBackend(OutputStream)} will therefore be called
   * on the source server, and the {@code importBackend(InputStream)}
   * will be called on his server.
   * <p>
   * The InputStream and OutpuStream given as a parameter to those
@@ -2161,8 +2159,8 @@
  /**
   * Initializes a remote server from this server.
   * <p>
   * The {@link #exportBackend(OutputStream)} will therefore be called
   * on this server, and the {@link #importBackend(InputStream)}
   * The {@code exportBackend(OutputStream)} will therefore be called
   * on this server, and the {@code importBackend(InputStream)}
   * will be called on the remote server.
   * <p>
   * The InputStream and OutpuStream given as a parameter to those