mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
16.22.2014 e5e4ea1dfa436ac42413a4d9b3b1279354b7cc3b
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -30,13 +30,14 @@
import java.net.*;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.*;
import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.*;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
@@ -55,6 +56,7 @@
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.util.StaticUtils;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ReplicationMessages.*;
@@ -89,24 +91,25 @@
      new HashMap<DN, ReplicationServerDomain>();
  private final ChangelogDB changelogDB;
  private volatile boolean shutdown = false;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  private boolean stopListen = false;
  private ReplSessionSecurity replSessionSecurity;
  private final ReplSessionSecurity replSessionSecurity;
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
  private static String eclWorkflowID =
  private static final String eclWorkflowID =
    "External Changelog Workflow ID";
  private ECLWorkflowElement eclwe;
  private AtomicReference<WorkflowImpl> eclWorkflowImpl =
  private final AtomicReference<WorkflowImpl> eclWorkflowImpl =
      new AtomicReference<WorkflowImpl>();
  /**
   * This is required for unit testing, so that we can keep track of all the
   * replication servers which are running in the VM.
   */
  private static Set<Integer> localPorts = new CopyOnWriteArraySet<Integer>();
  private static final Set<Integer> localPorts =
      new CopyOnWriteArraySet<Integer>();
  // Monitors for synchronizing domain creation with the connect thread.
  private final Object domainTicketLock = new Object();
@@ -117,7 +120,7 @@
   * Holds the list of all replication servers instantiated in this VM.
   * This allows to perform clean up of the RS databases in unit tests.
   */
  private static List<ReplicationServer> allInstances =
  private static final List<ReplicationServer> allInstances =
    new ArrayList<ReplicationServer>();
  /**
@@ -184,21 +187,18 @@
   * ports from other replication servers or from LDAP servers
   * and spawn further thread responsible for handling those connections
   */
  void runListen()
  {
    Message listenMsg = NOTE_REPLICATION_SERVER_LISTENING.get(
    logError(NOTE_REPLICATION_SERVER_LISTENING.get(
        getServerId(),
        listenSocket.getInetAddress().getHostAddress(),
        listenSocket.getLocalPort());
    logError(listenMsg);
        listenSocket.getLocalPort()));
    while (!shutdown && !stopListen)
    while (!shutdown.get() && !stopListen)
    {
      // Wait on the replicationServer port.
      // Read incoming messages and create LDAP or ReplicationServer listener
      // and Publisher.
      try
      {
        Session session;
@@ -212,14 +212,18 @@
          session = replSessionSecurity.createServerSession(newSocket,
              timeoutMS);
          if (session == null) // Error, go back to accept
          {
            continue;
          }
        }
        catch (Exception e)
        {
          // If problems happen during the SSL handshake, it is necessary
          // to close the socket to free the associated resources.
          if (newSocket != null)
          {
            newSocket.close();
          }
          continue;
        }
@@ -264,7 +268,7 @@
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        if (!shutdown)
        if (!shutdown.get())
        {
          logError(ERR_EXCEPTION_LISTENING.get(e.getLocalizedMessage()));
        }
@@ -282,7 +286,7 @@
  {
    synchronized (connectThreadLock)
    {
      while (!shutdown)
      while (!shutdown.get())
      {
        HostPort localAddress = HostPort.localAddress(getReplicationPort());
        for (ReplicationServerDomain domain : getReplicationServerDomains())
@@ -359,8 +363,10 @@
    boolean sslEncryption = replSessionSecurity.isSslEncryption();
    if (debugEnabled())
    {
      TRACER.debugInfo("RS " + getMonitorInstanceName() + " connects to "
          + remoteServerAddress);
    }
    Socket socket = new Socket();
    Session session = null;
@@ -378,7 +384,9 @@
    catch (Exception e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      close(session);
      close(socket);
    }
@@ -389,7 +397,7 @@
   */
  private void initialize()
  {
    shutdown = false;
    shutdown.set(false);
    try
    {
@@ -401,14 +409,18 @@
      // creates working threads: we must first connect, then start to listen.
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates connect thread");
      {
        TRACER.debugInfo("RS " + getMonitorInstanceName()
            + " creates connect thread");
      }
      connectThread = new ReplicationServerConnectThread(this);
      connectThread.start();
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates listen thread");
      {
        TRACER.debugInfo("RS " + getMonitorInstanceName()
            + " creates listen thread");
      }
      listenThread = new ReplicationServerListenThread(this);
      listenThread.start();
@@ -423,8 +435,10 @@
      eclwe = new ECLWorkflowElement(this);
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " successfully initialized");
      {
        TRACER.debugInfo("RS " + getMonitorInstanceName()
            + " successfully initialized");
      }
    } catch (UnknownHostException e)
    {
      logError(ERR_UNKNOWN_HOSTNAME.get());
@@ -604,7 +618,7 @@
  /**
   * Waits for connections to this ReplicationServer.
   */
  public void waitConnections()
  void waitConnections()
  {
    // Acquire a domain ticket and wait for a complete cycle of the connect
    // thread.
@@ -626,7 +640,7 @@
    // Wait until the connect thread has processed next connect phase.
    synchronized (domainTicketLock)
    {
      while (myDomainTicket > domainTicket && !shutdown)
      while (myDomainTicket > domainTicket && !shutdown.get())
      {
        try
        {
@@ -649,10 +663,10 @@
  {
    localPorts.remove(getReplicationPort());
    if (shutdown)
    if (!shutdown.compareAndSet(false, true))
    {
      return;
    shutdown = true;
    }
    // shutdown the connect thread
    if (connectThread != null)
@@ -660,19 +674,8 @@
      connectThread.interrupt();
    }
    // shutdown the listener thread
    try
    {
      if (listenSocket != null)
      {
        listenSocket.close();
      }
    } catch (IOException e)
    {
      // replication Server service is closing anyway.
    }
    // shutdown the listen thread
    StaticUtils.close(listenSocket);
    if (listenThread != null)
    {
      listenThread.interrupt();
@@ -744,9 +747,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public ConfigChangeResult applyConfigurationChange(
      ReplicationServerCfg configuration)
@@ -779,7 +780,9 @@
      catch (ChangelogException e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        resultCode = ResultCode.OPERATIONS_ERROR;
      }
    }
@@ -805,13 +808,17 @@
      catch (IOException e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        logError(ERR_COULD_NOT_CLOSE_THE_SOCKET.get(e.toString()));
      }
      catch (InterruptedException e)
      {
        if (debugEnabled())
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        logError(ERR_COULD_NOT_STOP_LISTEN_THREAD.get(e.toString()));
      }
    }
@@ -898,9 +905,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean isConfigurationChangeAcceptable(
      ReplicationServerCfg configuration, List<Message> unacceptableReasons)
@@ -917,10 +922,8 @@
   */
  public long getGenerationId(DN baseDN)
  {
    ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
    if (rsd!=null)
      return rsd.getGenerationId();
    return -1;
    final ReplicationServerDomain rsd = getReplicationServerDomain(baseDN);
    return rsd != null ? rsd.getGenerationId() : -1;
  }
  /**
@@ -941,8 +944,9 @@
  public void remove()
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("RS " + getMonitorInstanceName() + " starts removing");
    }
    shutdown();
  }
@@ -1025,7 +1029,9 @@
    }
    if (serversToDisconnect.isEmpty())
    {
      return;
    }
    for (ReplicationServerDomain domain: getReplicationServerDomains())
    {