mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
18.00.2011 172f9ff26c2a07363b37ea83bdaba4ac6ef70226
Fix issue opendj-92: improve replication thread names
18 files modified
305 ■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java 87 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 18 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 16 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 16 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 29 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ListenerThread.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 30 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -435,8 +435,9 @@
  {
    protected ServerStateFlush()
    {
      super("Replication State Saver for server id " +
            serverId + " and domain " + baseDn.toString());
      super("Replica DS(" + serverId
          + ") state checkpointer for domain \"" + baseDn.toString()
          + "\"");
    }
    /**
@@ -479,10 +480,14 @@
  private class RSUpdater extends DirectoryThread
  {
    private final ChangeNumber startChangeNumber;
    protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
    {
      super("Replication Server Updater for server id " +
            serverId + " and domain " + baseDn.toString());
      super("Replica DS(" + serverId
          + ") missing change publisher for domain \""
          + baseDn.toString() + "\"");
      this.startChangeNumber = replServerMaxChangeNumber;
    }
opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -67,7 +67,7 @@
   */
  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
  {
     super("Replication Replay thread " + count++);
     super("Replica replay thread " + count++);
     this.updateToReplayQueue = updateToReplayQueue;
  }
opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.protocol;
@@ -54,12 +55,6 @@
public class ReplSessionSecurity
{
  /**
   * Whether the replication server should listen on a secure port.
   * Set false for test purposes only.
   */
  private static boolean useSSL = true;
  /**
   * Whether replication sessions use SSL encryption.
   */
  private boolean sslEncryption;
@@ -155,7 +150,7 @@
  private boolean isSecurePort(String serverURL)
  {
    // Always true unless changed for test purposes.
    return useSSL;
    return true;
  }
  /**
@@ -236,56 +231,50 @@
  public ProtocolSession createServerSession(Socket socket, int soTimeout)
    throws ConfigException, IOException
  {
    if (useSSL)
    try
    {
      try
      {
        // Create a new SSL context every time to make sure we pick up the
        // latest contents of the trust store.
        CryptoManager cryptoManager = DirectoryConfig.getCryptoManager();
        SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname);
        SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
      // Create a new SSL context every time to make sure we pick up the
      // latest contents of the trust store.
      CryptoManager cryptoManager = DirectoryConfig.getCryptoManager();
      SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname);
      SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
        SSLSocket secureSocket = (SSLSocket)
          sslSocketFactory.createSocket(socket,
      SSLSocket secureSocket = (SSLSocket)
      sslSocketFactory.createSocket(socket,
          socket.getInetAddress().getHostName(),
          socket.getPort(), false);
        secureSocket.setUseClientMode(false);
        secureSocket.setNeedClientAuth(true);
        secureSocket.setSoTimeout(soTimeout);
      secureSocket.setUseClientMode(false);
      secureSocket.setNeedClientAuth(true);
      secureSocket.setSoTimeout(soTimeout);
        if (sslProtocols != null)
        {
          secureSocket.setEnabledProtocols(sslProtocols);
        }
        if (sslCipherSuites != null)
        {
          secureSocket.setEnabledCipherSuites(sslCipherSuites);
        }
        // Force TLS negotiation now.
        secureSocket.startHandshake();
//      SSLSession sslSession = secureSocket.getSession();
//      System.out.println("Peer      = " + sslSession.getPeerHost() + ":" +
//           sslSession.getPeerPort());
//      System.out.println("Principal = " + sslSession.getPeerPrincipal());
        return new TLSSocketSession(socket, secureSocket);
      } catch (SSLException e)
      if (sslProtocols != null)
      {
        // This is probably a connection attempt from an unexpected client
        // log that to warn the administrator.
        InetAddress remHost = socket.getInetAddress();
        Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(remHost.
          getHostName(), remHost.getHostAddress(), e.getLocalizedMessage());
        logError(message);
        return null;
        secureSocket.setEnabledProtocols(sslProtocols);
      }
    } else
      if (sslCipherSuites != null)
      {
        secureSocket.setEnabledCipherSuites(sslCipherSuites);
      }
      // Force TLS negotiation now.
      secureSocket.startHandshake();
      // SSLSession sslSession = secureSocket.getSession();
      // System.out.println("Peer      = " + sslSession.getPeerHost() + ":" +
      //   sslSession.getPeerPort());
      // System.out.println("Principal = " + sslSession.getPeerPrincipal());
      return new TLSSocketSession(socket, secureSocket);
    } catch (SSLException e)
    {
      return new SocketSession(socket);
      // This is probably a connection attempt from an unexpected client
      // log that to warn the administrator.
      InetAddress remHost = socket.getInetAddress();
      Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(remHost.
          getHostName(), remHost.getHostAddress(), e.getLocalizedMessage());
      logError(message);
      return null;
    }
  }
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -668,15 +669,19 @@
  @Override
  public String toString()
  {
    String localString;
    if (serverId != 0)
    {
      localString = "Directory Server ";
      localString += serverId + " " + serverURL + " " + getServiceId();
    } else
      localString = "Unknown server";
    return localString;
      StringBuilder builder = new StringBuilder("Replica DS(");
      builder.append(serverId);
      builder.append(") for domain \"");
      builder.append(replicationServerDomain.getBaseDn());
      builder.append("\"");
      return builder.toString();
    }
    else
    {
      return "Unknown server";
    }
  }
  /**
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -152,9 +153,10 @@
    db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
    thread = new DirectoryThread(this,
      "Replication Server db for DS " + id + " and " + baseDn + " in RS " +
      replicationServer.getServerId());
    thread = new DirectoryThread(this, "Replication server RS("
        + replicationServer.getServerId()
        + ") changelog checkpointer for Replica DS(" + id
        + ") for domain \"" + baseDn + "\"");
    thread.start();
    DirectoryServer.deregisterMonitorProvider(
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
@@ -79,7 +80,7 @@
  public ECLServerWriter(ProtocolSession session, ECLServerHandler handler,
      ReplicationServerDomain replicationServerDomain)
  {
    super(session, -1, handler, replicationServerDomain);
    super(session, handler, replicationServerDomain);
    setName("Replication ECL Writer Thread for operation " +
        handler.getOperationId());
opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -80,9 +80,10 @@
  public MonitoringPublisher(ReplicationServerDomain replicationServerDomain,
    long period)
  {
    super("Monitoring publisher for " +
      replicationServerDomain.getBaseDn() + " in RS " +
      replicationServerDomain.getReplicationServer().getServerId());
    super("Replication server RS("
        + replicationServerDomain.getReplicationServer()
            .getServerId() + ") monitor publisher for domain \""
        + replicationServerDomain.getBaseDn() + "\"");
    this.replicationServerDomain = replicationServerDomain;
    this.period = period;
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -135,7 +135,7 @@
          new ConcurrentHashMap<String, ReplicationServerDomain>();
  private String localURL = "null";
  private boolean shutdown = false;
  private volatile boolean shutdown = false;
  private ReplicationDbEnv dbEnv;
  private int rcvWindow;
  private int queueSize;
@@ -336,11 +336,10 @@
      // Read incoming messages and create LDAP or ReplicationServer listener
      // and Publisher.
      ProtocolSession session;
      Socket newSocket = null;
      try
      {
        ProtocolSession session;
        Socket newSocket = null;
        try
        {
          newSocket = listenSocket.accept();
@@ -586,18 +585,14 @@
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates connect thread");
      connectThread =
        new ReplicationServerConnectThread("Replication Server Connect " +
        serverId , this);
      connectThread = new ReplicationServerConnectThread(this);
      connectThread.start();
      if (debugEnabled())
        TRACER.debugInfo("RS " +getMonitorInstanceName()+
            " creates listen thread");
      listenThread =
        new ReplicationServerListenThread("Replication Server Listener " +
        serverId , this);
      listenThread = new ReplicationServerListenThread(this);
      listenThread.start();
      // Creates the ECL workflow elem so that DS (LDAPReplicationDomain)
@@ -1027,9 +1022,7 @@
        listenSocket = new ServerSocket();
        listenSocket.bind(new InetSocketAddress(replicationPort));
        listenThread =
          new ReplicationServerListenThread(
              "Replication Server Listener", this);
        listenThread = new ReplicationServerListenThread(this);
        listenThread.start();
      }
      catch (IOException e)
opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java
@@ -45,15 +45,13 @@
   * Creates a new instance of this directory thread with the
   * specified name.
   *
   * @param  threadName  The human-readable name to use for this
   *                     thread for debugging purposes.
   * @param  server      The ReplicationServer that will be called to
   *                     handle the connections.
   */
  public ReplicationServerConnectThread(
      String threadName, ReplicationServer server)
  public ReplicationServerConnectThread(ReplicationServer server)
  {
    super(threadName);
    super("Replication server RS(" + server.getServerId()
        + ") connector thread");
    this.server = server;
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -202,8 +202,9 @@
  {
    this.baseDn = baseDn;
    this.replicationServer = replicationServer;
    this.assuredTimeoutTimer = new Timer("Replication Assured Timer for " +
      baseDn + " in RS " + replicationServer.getServerId(), true);
    this.assuredTimeoutTimer = new Timer("Replication server RS("
        + replicationServer.getServerId()
        + ") assured timer for domain \"" + baseDn + "\"", true);
    DirectoryServer.registerMonitorProvider(this);
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -815,14 +816,19 @@
  @Override
  public String toString()
  {
    String localString;
    if (serverId != 0)
    {
      localString = "Replication Server ";
      localString += serverId + " " + serverURL + " " + getServiceId();
    } else
      localString = "Unknown server";
    return localString;
      StringBuilder builder = new StringBuilder("Replication server RS(");
      builder.append(serverId);
      builder.append(") for domain \"");
      builder.append(replicationServerDomain.getBaseDn());
      builder.append("\"");
      return builder.toString();
    }
    else
    {
      return "Unknown server";
    }
  }
  /**
   * Gets the status of the connected DS.
opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java
@@ -31,7 +31,7 @@
/**
 * This Class is used to create a thread that is responsible for listening
 * on the Replication Server thread and accept new incomng connections
 * on the Replication Server thread and accept new incoming connections
 * from other replication servers or from LDAP servers.
 */
public class ReplicationServerListenThread extends DirectoryThread
@@ -45,15 +45,14 @@
   * Creates a new instance of this directory thread with the
   * specified name.
   *
   * @param  threadName  The human-readable name to use for this
   *                     thread for debugging purposes.
   * @param  server      The ReplicationServer that will be called to
   *                     handle the connections.
   */
  public ReplicationServerListenThread(
      String threadName, ReplicationServer server)
  public ReplicationServerListenThread(ReplicationServer server)
  {
    super(threadName);
    super("Replication server RS(" + server.getServerId()
        + ") connection listener on port "
        + server.getReplicationPort());
    this.server = server;
  }
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -375,8 +376,8 @@
      // sendWindow MUST be created before starting the writer
      sendWindow = new Semaphore(sendWindowSize);
      writer = new ServerWriter(session, serverId,
          this, replicationServerDomain);
      writer = new ServerWriter(session, this,
          replicationServerDomain);
      reader = new ServerReader(session, this);
      reader.start();
@@ -385,11 +386,12 @@
      // Create a thread to send heartbeat messages.
      if (heartbeatInterval > 0)
      {
        heartbeatThread = new HeartbeatThread(
            "Replication Heartbeat to " + this +
            " in RS " + replicationServerDomain.getReplicationServer().
            getMonitorInstanceName(),
            session, heartbeatInterval / 3);
        String threadName = "Replication server RS("
            + this.getReplicationServerId()
            + ") heartbeat publisher to " + this.toString() + " at "
            + session.getReadableRemoteAddress();
        heartbeatThread = new HeartbeatThread(threadName, session,
            heartbeatInterval / 3);
        heartbeatThread.start();
      }
    }
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -62,17 +62,21 @@
  private final ProtocolSession session;
  private final ServerHandler handler;
  /**
   * Constructor for the LDAP server reader part of the replicationServer.
   *
   * @param session The ProtocolSession from which to read the data.
   * @param handler The server handler for this server reader.
   * @param session
   *          The ProtocolSession from which to read the data.
   * @param handler
   *          The server handler for this server reader.
   */
  public ServerReader(ProtocolSession session,
      ServerHandler handler)
  public ServerReader(ProtocolSession session, ServerHandler handler)
  {
    super("Replication Reader Thread for RS handler " +
        handler.getMonitorInstanceName());
    super("Replication server RS(" + handler.getReplicationServerId()
        + ") reading from " + handler.toString() + " at "
        + session.getReadableRemoteAddress());
    this.session = session;
    this.handler = handler;
  }
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -63,24 +63,25 @@
  private final ReplicationServerDomain replicationServerDomain;
  private final short protocolVersion;
  /**
   * Create a ServerWriter.
   * Then ServerWriter then waits on the ServerHandler for new updates
   * and forward them to the server
   * Create a ServerWriter. Then ServerWriter then waits on the ServerHandler
   * for new updates and forward them to the server
   *
   * @param session the ProtocolSession that will be used to send updates.
   * @param serverId the Identifier of the server.
   * @param handler handler for which the ServerWriter is created.
   * @param replicationServerDomain The ReplicationServerDomain of this
   *        ServerWriter.
   * @param session
   *          the ProtocolSession that will be used to send updates.
   * @param handler
   *          handler for which the ServerWriter is created.
   * @param replicationServerDomain
   *          The ReplicationServerDomain of this ServerWriter.
   */
  public ServerWriter(ProtocolSession session, int serverId,
                      ServerHandler handler,
                      ReplicationServerDomain replicationServerDomain)
  public ServerWriter(ProtocolSession session, ServerHandler handler,
      ReplicationServerDomain replicationServerDomain)
  {
    super("Replication Writer Thread for handler of " +
        handler.toString() +
        " in " + replicationServerDomain);
    super("Replication server RS(" + handler.getReplicationServerId()
        + ") writing to " + handler.toString() + " at "
        + session.getReadableRemoteAddress());
    this.session = session;
    this.handler = handler;
opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -75,9 +75,10 @@
  public StatusAnalyzer(ReplicationServerDomain replicationServerDomain,
    int degradedStatusThreshold)
  {
    super("Replication Server Status Analyzer for " +
      replicationServerDomain.getBaseDn() + " in RS " +
      replicationServerDomain.getReplicationServer().getServerId());
    super("Replication server RS("
        + replicationServerDomain.getReplicationServer()
            .getServerId() + ") delay monitor for domain \""
        + replicationServerDomain.getBaseDn() + "\"");
    this.replicationServerDomain = replicationServerDomain;
    this.degradedStatusThreshold = degradedStatusThreshold;
opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -61,9 +61,10 @@
   */
  public ListenerThread(ReplicationDomain repDomain)
  {
     super("Replication Listener for server id " + repDomain.getServerId() +
         " and domain " + repDomain.getServiceID());
     this.repDomain = repDomain;
    super("Replica DS(" + repDomain.getServerId()
        + ") listener for domain \"" + repDomain.getServiceID()
        + "\"");
    this.repDomain = repDomain;
  }
  /**
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2247,12 +2247,16 @@
    // Start a heartbeat monitor thread.
    if (heartbeatInterval > 0)
    {
      heartbeatMonitor =
        new HeartbeatMonitor("Replication Heartbeat Monitor on RS " +
        getReplicationServer() + " " + rsServerId + " for " + baseDn +
        " in DS " + serverId,
        session, heartbeatInterval, (protocolVersion >=
        ProtocolVersion.REPLICATION_PROTOCOL_V4));
      String threadName = "Replica DS("
          + this.getServerId() + ") heartbeat monitor for domain \""
          + this.baseDn + "\" from RS(" + this.getRsServerId()
          + ") at " + session.getReadableRemoteAddress();
      heartbeatMonitor = new HeartbeatMonitor(
          threadName,
          session,
          heartbeatInterval,
          (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4));
      heartbeatMonitor.start();
    }
  }
@@ -3142,11 +3146,15 @@
    // Start a CN heartbeat thread.
    if (changeTimeHeartbeatSendInterval > 0)
    {
      ctHeartbeatPublisherThread =
        new CTHeartbeatPublisherThread(
        "Replication CN Heartbeat sender for " +
        baseDn + " with " + getReplicationServer(),
        session, changeTimeHeartbeatSendInterval, serverId);
      String threadName = "Replica DS("
          + this.getServerId()
          + ") change time heartbeat publisher for domain \""
          + this.baseDn + "\" to RS(" + this.getRsServerId()
          + ") at " + session.getReadableRemoteAddress();
      ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
          threadName, session, changeTimeHeartbeatSendInterval,
          serverId);
      ctHeartbeatPublisherThread.start();
    } else
    {