mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.06.2014 9ca54cae8c40fcd3fd4b85414c4be5aa3c3c77d6
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -30,6 +30,7 @@
import java.net.*;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessage;
@@ -87,23 +88,24 @@
      new HashMap<DN, ReplicationServerDomain>();
  private final ChangelogDB changelogDB;
  private volatile boolean shutdown = false;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  private boolean stopListen = false;
  private ReplSessionSecurity replSessionSecurity;
  private final ReplSessionSecurity replSessionSecurity;
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static String eclWorkflowID =
  private static final String eclWorkflowID =
    "External Changelog Workflow ID";
  private ECLWorkflowElement eclwe;
  private AtomicReference<WorkflowImpl> eclWorkflowImpl =
  private final AtomicReference<WorkflowImpl> eclWorkflowImpl =
      new AtomicReference<WorkflowImpl>();
  /**
   * This is required for unit testing, so that we can keep track of all the
   * replication servers which are running in the VM.
   */
  private static Set<Integer> localPorts = new CopyOnWriteArraySet<Integer>();
  private static final Set<Integer> localPorts =
      new CopyOnWriteArraySet<Integer>();
  // Monitors for synchronizing domain creation with the connect thread.
  private final Object domainTicketLock = new Object();
@@ -114,7 +116,7 @@
   * Holds the list of all replication servers instantiated in this VM.
   * This allows to perform clean up of the RS databases in unit tests.
   */
  private static List<ReplicationServer> allInstances =
  private static final List<ReplicationServer> allInstances =
    new ArrayList<ReplicationServer>();
  /**
@@ -165,7 +167,6 @@
   * ports from other replication servers or from LDAP servers
   * and spawn further thread responsible for handling those connections
   */
  void runListen()
  {
    logger.info(NOTE_REPLICATION_SERVER_LISTENING,
@@ -173,12 +174,11 @@
        listenSocket.getInetAddress().getHostAddress(),
        listenSocket.getLocalPort());
    while (!shutdown && !stopListen)
    while (!shutdown.get() && !stopListen)
    {
      // Wait on the replicationServer port.
      // Read incoming messages and create LDAP or ReplicationServer listener
      // and Publisher.
      try
      {
        Session session;
@@ -192,14 +192,18 @@
          session = replSessionSecurity.createServerSession(newSocket,
              timeoutMS);
          if (session == null) // Error, go back to accept
          {
            continue;
          }
        }
        catch (Exception e)
        {
          // If problems happen during the SSL handshake, it is necessary
          // to close the socket to free the associated resources.
          if (newSocket != null)
          {
            newSocket.close();
          }
          continue;
        }
@@ -241,7 +245,7 @@
        // Just log debug information and loop.
        // Do not log the message during shutdown.
        logger.traceException(e);
        if (!shutdown)
        if (!shutdown.get())
        {
          logger.error(ERR_EXCEPTION_LISTENING, e.getLocalizedMessage());
        }
@@ -259,7 +263,7 @@
  {
    synchronized (connectThreadLock)
    {
      while (!shutdown)
      while (!shutdown.get())
      {
        HostPort localAddress = HostPort.localAddress(getReplicationPort());
        for (ReplicationServerDomain domain : getReplicationServerDomains())
@@ -336,8 +340,10 @@
    boolean sslEncryption = replSessionSecurity.isSslEncryption();
    if (logger.isTraceEnabled())
    {
      logger.trace("RS " + getMonitorInstanceName() + " connects to "
          + remoteServerAddress);
    }
    Socket socket = new Socket();
    Session session = null;
@@ -365,7 +371,7 @@
   */
  private void initialize()
  {
    shutdown = false;
    shutdown.set(false);
    try
    {
@@ -377,14 +383,16 @@
      // creates working threads: we must first connect, then start to listen.
      if (logger.isTraceEnabled())
        logger.trace("RS " +getMonitorInstanceName()+
            " creates connect thread");
      {
        logger.trace("RS " + getMonitorInstanceName() + " creates connect thread");
      }
      connectThread = new ReplicationServerConnectThread(this);
      connectThread.start();
      if (logger.isTraceEnabled())
        logger.trace("RS " +getMonitorInstanceName()+
            " creates listen thread");
      {
        logger.trace("RS " + getMonitorInstanceName() + " creates listen thread");
      }
      listenThread = new ReplicationServerListenThread(this);
      listenThread.start();
@@ -399,8 +407,9 @@
      eclwe = new ECLWorkflowElement(this);
      if (logger.isTraceEnabled())
        logger.trace("RS " +getMonitorInstanceName()+
            " successfully initialized");
      {
        logger.trace("RS " + getMonitorInstanceName() + " successfully initialized");
      }
    } catch (UnknownHostException e)
    {
      logger.error(ERR_UNKNOWN_HOSTNAME);
@@ -577,7 +586,7 @@
  /**
   * Waits for connections to this ReplicationServer.
   */
  public void waitConnections()
  void waitConnections()
  {
    // Acquire a domain ticket and wait for a complete cycle of the connect
    // thread.
@@ -599,7 +608,7 @@
    // Wait until the connect thread has processed next connect phase.
    synchronized (domainTicketLock)
    {
      while (myDomainTicket > domainTicket && !shutdown)
      while (myDomainTicket > domainTicket && !shutdown.get())
      {
        try
        {
@@ -622,10 +631,10 @@
  {
    localPorts.remove(getReplicationPort());
    if (shutdown)
    if (!shutdown.compareAndSet(false, true))
    {
      return;
    shutdown = true;
    }
    // shutdown the connect thread
    if (connectThread != null)
@@ -635,8 +644,6 @@
    // shutdown the listener thread
    close(listenSocket);
    // shutdown the listen thread
    if (listenThread != null)
    {
      listenThread.interrupt();
@@ -705,9 +712,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public ConfigChangeResult applyConfigurationChange(
      ReplicationServerCfg configuration)
@@ -856,9 +861,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean isConfigurationChangeAcceptable(
      ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
@@ -875,10 +878,8 @@
   */
  public long getGenerationId(DN baseDN)
  {
    ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
    if (rsd!=null)
      return rsd.getGenerationId();
    return -1;
    final ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
    return rsd != null ? rsd.getGenerationId() : -1;
  }
  /**
@@ -899,8 +900,9 @@
  public void remove()
  {
    if (logger.isTraceEnabled())
    {
      logger.trace("RS " + getMonitorInstanceName() + " starts removing");
    }
    shutdown();
  }
@@ -983,7 +985,9 @@
    }
    if (serversToDisconnect.isEmpty())
    {
      return;
    }
    for (ReplicationServerDomain domain: getReplicationServerDomains())
    {