Fix issue opendj-92: improve replication thread names
| | |
| | | { |
| | | protected ServerStateFlush() |
| | | { |
| | | super("Replication State Saver for server id " + |
| | | serverId + " and domain " + baseDn.toString()); |
| | | super("Replica DS(" + serverId |
| | | + ") state checkpointer for domain \"" + baseDn.toString() |
| | | + "\""); |
| | | } |
| | | |
| | | /** |
| | |
| | | private class RSUpdater extends DirectoryThread |
| | | { |
| | | private final ChangeNumber startChangeNumber; |
| | | |
| | | |
| | | |
| | | protected RSUpdater(ChangeNumber replServerMaxChangeNumber) |
| | | { |
| | | super("Replication Server Updater for server id " + |
| | | serverId + " and domain " + baseDn.toString()); |
| | | super("Replica DS(" + serverId |
| | | + ") missing change publisher for domain \"" |
| | | + baseDn.toString() + "\""); |
| | | this.startChangeNumber = replServerMaxChangeNumber; |
| | | } |
| | | |
| | |
| | | */ |
| | | public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue) |
| | | { |
| | | super("Replication Replay thread " + count++); |
| | | super("Replica replay thread " + count++); |
| | | this.updateToReplayQueue = updateToReplayQueue; |
| | | } |
| | | |
| | |
| | | * |
| | | * |
| | | * Copyright 2008 Sun Microsystems, Inc. |
| | | * Portions copyright 2011 ForgeRock AS |
| | | */ |
| | | |
| | | package org.opends.server.replication.protocol; |
| | |
| | | public class ReplSessionSecurity |
| | | { |
| | | /** |
| | | * Whether the replication server should listen on a secure port. |
| | | * Set false for test purposes only. |
| | | */ |
| | | private static boolean useSSL = true; |
| | | |
| | | /** |
| | | * Whether replication sessions use SSL encryption. |
| | | */ |
| | | private boolean sslEncryption; |
| | |
| | | private boolean isSecurePort(String serverURL) |
| | | { |
| | | // Always true unless changed for test purposes. |
| | | return useSSL; |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | |
| | | public ProtocolSession createServerSession(Socket socket, int soTimeout) |
| | | throws ConfigException, IOException |
| | | { |
| | | if (useSSL) |
| | | { |
| | | try |
| | | { |
| | | // Create a new SSL context every time to make sure we pick up the |
| | |
| | | logError(message); |
| | | return null; |
| | | } |
| | | } else |
| | | { |
| | | return new SocketSession(socket); |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | String localString; |
| | | if (serverId != 0) |
| | | { |
| | | localString = "Directory Server "; |
| | | localString += serverId + " " + serverURL + " " + getServiceId(); |
| | | } else |
| | | localString = "Unknown server"; |
| | | |
| | | return localString; |
| | | StringBuilder builder = new StringBuilder("Replica DS("); |
| | | builder.append(serverId); |
| | | builder.append(") for domain \""); |
| | | builder.append(replicationServerDomain.getBaseDn()); |
| | | builder.append("\""); |
| | | return builder.toString(); |
| | | } |
| | | else |
| | | { |
| | | return "Unknown server"; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import org.opends.messages.MessageBuilder; |
| | |
| | | db = new ReplicationDB(id, baseDn, replicationServer, dbenv); |
| | | firstChange = db.readFirstChange(); |
| | | lastChange = db.readLastChange(); |
| | | thread = new DirectoryThread(this, |
| | | "Replication Server db for DS " + id + " and " + baseDn + " in RS " + |
| | | replicationServer.getServerId()); |
| | | thread = new DirectoryThread(this, "Replication server RS(" |
| | | + replicationServer.getServerId() |
| | | + ") changelog checkpointer for Replica DS(" + id |
| | | + ") for domain \"" + baseDn + "\""); |
| | | thread.start(); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider( |
| | |
| | | * |
| | | * |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | * Portions copyright 2011 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | public ECLServerWriter(ProtocolSession session, ECLServerHandler handler, |
| | | ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super(session, -1, handler, replicationServerDomain); |
| | | super(session, handler, replicationServerDomain); |
| | | |
| | | setName("Replication ECL Writer Thread for operation " + |
| | | handler.getOperationId()); |
| | |
| | | public MonitoringPublisher(ReplicationServerDomain replicationServerDomain, |
| | | long period) |
| | | { |
| | | super("Monitoring publisher for " + |
| | | replicationServerDomain.getBaseDn() + " in RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId()); |
| | | super("Replication server RS(" |
| | | + replicationServerDomain.getReplicationServer() |
| | | .getServerId() + ") monitor publisher for domain \"" |
| | | + replicationServerDomain.getBaseDn() + "\""); |
| | | |
| | | this.replicationServerDomain = replicationServerDomain; |
| | | this.period = period; |
| | |
| | | new ConcurrentHashMap<String, ReplicationServerDomain>(); |
| | | |
| | | private String localURL = "null"; |
| | | private boolean shutdown = false; |
| | | private volatile boolean shutdown = false; |
| | | private ReplicationDbEnv dbEnv; |
| | | private int rcvWindow; |
| | | private int queueSize; |
| | |
| | | // Read incoming messages and create LDAP or ReplicationServer listener |
| | | // and Publisher. |
| | | |
| | | ProtocolSession session; |
| | | Socket newSocket = null; |
| | | |
| | | try |
| | | { |
| | | ProtocolSession session; |
| | | Socket newSocket = null; |
| | | try |
| | | { |
| | | newSocket = listenSocket.accept(); |
| | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " creates connect thread"); |
| | | connectThread = |
| | | new ReplicationServerConnectThread("Replication Server Connect " + |
| | | serverId , this); |
| | | connectThread = new ReplicationServerConnectThread(this); |
| | | connectThread.start(); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("RS " +getMonitorInstanceName()+ |
| | | " creates listen thread"); |
| | | |
| | | listenThread = |
| | | new ReplicationServerListenThread("Replication Server Listener " + |
| | | serverId , this); |
| | | listenThread = new ReplicationServerListenThread(this); |
| | | listenThread.start(); |
| | | |
| | | // Creates the ECL workflow elem so that DS (LDAPReplicationDomain) |
| | |
| | | listenSocket = new ServerSocket(); |
| | | listenSocket.bind(new InetSocketAddress(replicationPort)); |
| | | |
| | | listenThread = |
| | | new ReplicationServerListenThread( |
| | | "Replication Server Listener", this); |
| | | listenThread = new ReplicationServerListenThread(this); |
| | | listenThread.start(); |
| | | } |
| | | catch (IOException e) |
| | |
| | | * Creates a new instance of this directory thread with the |
| | | * specified name. |
| | | * |
| | | * @param threadName The human-readable name to use for this |
| | | * thread for debugging purposes. |
| | | * @param server The ReplicationServer that will be called to |
| | | * handle the connections. |
| | | */ |
| | | public ReplicationServerConnectThread( |
| | | String threadName, ReplicationServer server) |
| | | public ReplicationServerConnectThread(ReplicationServer server) |
| | | { |
| | | super(threadName); |
| | | super("Replication server RS(" + server.getServerId() |
| | | + ") connector thread"); |
| | | this.server = server; |
| | | } |
| | | |
| | |
| | | { |
| | | this.baseDn = baseDn; |
| | | this.replicationServer = replicationServer; |
| | | this.assuredTimeoutTimer = new Timer("Replication Assured Timer for " + |
| | | baseDn + " in RS " + replicationServer.getServerId(), true); |
| | | this.assuredTimeoutTimer = new Timer("Replication server RS(" |
| | | + replicationServer.getServerId() |
| | | + ") assured timer for domain \"" + baseDn + "\"", true); |
| | | |
| | | DirectoryServer.registerMonitorProvider(this); |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | String localString; |
| | | if (serverId != 0) |
| | | { |
| | | localString = "Replication Server "; |
| | | localString += serverId + " " + serverURL + " " + getServiceId(); |
| | | } else |
| | | localString = "Unknown server"; |
| | | return localString; |
| | | StringBuilder builder = new StringBuilder("Replication server RS("); |
| | | builder.append(serverId); |
| | | builder.append(") for domain \""); |
| | | builder.append(replicationServerDomain.getBaseDn()); |
| | | builder.append("\""); |
| | | return builder.toString(); |
| | | } |
| | | else |
| | | { |
| | | return "Unknown server"; |
| | | } |
| | | } |
| | | /** |
| | | * Gets the status of the connected DS. |
| | |
| | | |
| | | /** |
| | | * This Class is used to create a thread that is responsible for listening |
| | | * on the Replication Server thread and accept new incomng connections |
| | | * on the Replication Server thread and accept new incoming connections |
| | | * from other replication servers or from LDAP servers. |
| | | */ |
| | | public class ReplicationServerListenThread extends DirectoryThread |
| | |
| | | * Creates a new instance of this directory thread with the |
| | | * specified name. |
| | | * |
| | | * @param threadName The human-readable name to use for this |
| | | * thread for debugging purposes. |
| | | * @param server The ReplicationServer that will be called to |
| | | * handle the connections. |
| | | */ |
| | | public ReplicationServerListenThread( |
| | | String threadName, ReplicationServer server) |
| | | public ReplicationServerListenThread(ReplicationServer server) |
| | | { |
| | | super(threadName); |
| | | super("Replication server RS(" + server.getServerId() |
| | | + ") connection listener on port " |
| | | + server.getReplicationPort()); |
| | | this.server = server; |
| | | } |
| | | |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | // sendWindow MUST be created before starting the writer |
| | | sendWindow = new Semaphore(sendWindowSize); |
| | | |
| | | writer = new ServerWriter(session, serverId, |
| | | this, replicationServerDomain); |
| | | writer = new ServerWriter(session, this, |
| | | replicationServerDomain); |
| | | reader = new ServerReader(session, this); |
| | | |
| | | reader.start(); |
| | |
| | | // Create a thread to send heartbeat messages. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatThread = new HeartbeatThread( |
| | | "Replication Heartbeat to " + this + |
| | | " in RS " + replicationServerDomain.getReplicationServer(). |
| | | getMonitorInstanceName(), |
| | | session, heartbeatInterval / 3); |
| | | String threadName = "Replication server RS(" |
| | | + this.getReplicationServerId() |
| | | + ") heartbeat publisher to " + this.toString() + " at " |
| | | + session.getReadableRemoteAddress(); |
| | | heartbeatThread = new HeartbeatThread(threadName, session, |
| | | heartbeatInterval / 3); |
| | | heartbeatThread.start(); |
| | | } |
| | | } |
| | |
| | | private final ProtocolSession session; |
| | | private final ServerHandler handler; |
| | | |
| | | |
| | | |
| | | /** |
| | | * Constructor for the LDAP server reader part of the replicationServer. |
| | | * |
| | | * @param session The ProtocolSession from which to read the data. |
| | | * @param handler The server handler for this server reader. |
| | | * @param session |
| | | * The ProtocolSession from which to read the data. |
| | | * @param handler |
| | | * The server handler for this server reader. |
| | | */ |
| | | public ServerReader(ProtocolSession session, |
| | | ServerHandler handler) |
| | | public ServerReader(ProtocolSession session, ServerHandler handler) |
| | | { |
| | | super("Replication Reader Thread for RS handler " + |
| | | handler.getMonitorInstanceName()); |
| | | super("Replication server RS(" + handler.getReplicationServerId() |
| | | + ") reading from " + handler.toString() + " at " |
| | | + session.getReadableRemoteAddress()); |
| | | this.session = session; |
| | | this.handler = handler; |
| | | } |
| | |
| | | private final ReplicationServerDomain replicationServerDomain; |
| | | private final short protocolVersion; |
| | | |
| | | |
| | | |
| | | /** |
| | | * Create a ServerWriter. |
| | | * Then ServerWriter then waits on the ServerHandler for new updates |
| | | * and forward them to the server |
| | | * Create a ServerWriter. Then ServerWriter then waits on the ServerHandler |
| | | * for new updates and forward them to the server |
| | | * |
| | | * @param session the ProtocolSession that will be used to send updates. |
| | | * @param serverId the Identifier of the server. |
| | | * @param handler handler for which the ServerWriter is created. |
| | | * @param replicationServerDomain The ReplicationServerDomain of this |
| | | * ServerWriter. |
| | | * @param session |
| | | * the ProtocolSession that will be used to send updates. |
| | | * @param handler |
| | | * handler for which the ServerWriter is created. |
| | | * @param replicationServerDomain |
| | | * The ReplicationServerDomain of this ServerWriter. |
| | | */ |
| | | public ServerWriter(ProtocolSession session, int serverId, |
| | | ServerHandler handler, |
| | | public ServerWriter(ProtocolSession session, ServerHandler handler, |
| | | ReplicationServerDomain replicationServerDomain) |
| | | { |
| | | super("Replication Writer Thread for handler of " + |
| | | handler.toString() + |
| | | " in " + replicationServerDomain); |
| | | super("Replication server RS(" + handler.getReplicationServerId() |
| | | + ") writing to " + handler.toString() + " at " |
| | | + session.getReadableRemoteAddress()); |
| | | |
| | | this.session = session; |
| | | this.handler = handler; |
| | |
| | | public StatusAnalyzer(ReplicationServerDomain replicationServerDomain, |
| | | int degradedStatusThreshold) |
| | | { |
| | | super("Replication Server Status Analyzer for " + |
| | | replicationServerDomain.getBaseDn() + " in RS " + |
| | | replicationServerDomain.getReplicationServer().getServerId()); |
| | | super("Replication server RS(" |
| | | + replicationServerDomain.getReplicationServer() |
| | | .getServerId() + ") delay monitor for domain \"" |
| | | + replicationServerDomain.getBaseDn() + "\""); |
| | | |
| | | this.replicationServerDomain = replicationServerDomain; |
| | | this.degradedStatusThreshold = degradedStatusThreshold; |
| | |
| | | */ |
| | | public ListenerThread(ReplicationDomain repDomain) |
| | | { |
| | | super("Replication Listener for server id " + repDomain.getServerId() + |
| | | " and domain " + repDomain.getServiceID()); |
| | | super("Replica DS(" + repDomain.getServerId() |
| | | + ") listener for domain \"" + repDomain.getServiceID() |
| | | + "\""); |
| | | this.repDomain = repDomain; |
| | | } |
| | | |
| | |
| | | // Start a heartbeat monitor thread. |
| | | if (heartbeatInterval > 0) |
| | | { |
| | | heartbeatMonitor = |
| | | new HeartbeatMonitor("Replication Heartbeat Monitor on RS " + |
| | | getReplicationServer() + " " + rsServerId + " for " + baseDn + |
| | | " in DS " + serverId, |
| | | session, heartbeatInterval, (protocolVersion >= |
| | | ProtocolVersion.REPLICATION_PROTOCOL_V4)); |
| | | String threadName = "Replica DS(" |
| | | + this.getServerId() + ") heartbeat monitor for domain \"" |
| | | + this.baseDn + "\" from RS(" + this.getRsServerId() |
| | | + ") at " + session.getReadableRemoteAddress(); |
| | | |
| | | heartbeatMonitor = new HeartbeatMonitor( |
| | | threadName, |
| | | session, |
| | | heartbeatInterval, |
| | | (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)); |
| | | heartbeatMonitor.start(); |
| | | } |
| | | } |
| | |
| | | // Start a CN heartbeat thread. |
| | | if (changeTimeHeartbeatSendInterval > 0) |
| | | { |
| | | ctHeartbeatPublisherThread = |
| | | new CTHeartbeatPublisherThread( |
| | | "Replication CN Heartbeat sender for " + |
| | | baseDn + " with " + getReplicationServer(), |
| | | session, changeTimeHeartbeatSendInterval, serverId); |
| | | String threadName = "Replica DS(" |
| | | + this.getServerId() |
| | | + ") change time heartbeat publisher for domain \"" |
| | | + this.baseDn + "\" to RS(" + this.getRsServerId() |
| | | + ") at " + session.getReadableRemoteAddress(); |
| | | |
| | | ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( |
| | | threadName, session, changeTimeHeartbeatSendInterval, |
| | | serverId); |
| | | ctHeartbeatPublisherThread.start(); |
| | | } else |
| | | { |