opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -435,8 +435,9 @@ { protected ServerStateFlush() { super("Replication State Saver for server id " + serverId + " and domain " + baseDn.toString()); super("Replica DS(" + serverId + ") state checkpointer for domain \"" + baseDn.toString() + "\""); } /** @@ -479,10 +480,14 @@ private class RSUpdater extends DirectoryThread { private final ChangeNumber startChangeNumber; protected RSUpdater(ChangeNumber replServerMaxChangeNumber) { super("Replication Server Updater for server id " + serverId + " and domain " + baseDn.toString()); super("Replica DS(" + serverId + ") missing change publisher for domain \"" + baseDn.toString() + "\""); this.startChangeNumber = replServerMaxChangeNumber; } opends/src/server/org/opends/server/replication/plugin/ReplayThread.java
@@ -67,7 +67,7 @@ */ public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue) { super("Replication Replay thread " + count++); super("Replica replay thread " + count++); this.updateToReplayQueue = updateToReplayQueue; } opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -23,6 +23,7 @@ * * * Copyright 2008 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.protocol; @@ -54,12 +55,6 @@ public class ReplSessionSecurity { /** * Whether the replication server should listen on a secure port. * Set false for test purposes only. */ private static boolean useSSL = true; /** * Whether replication sessions use SSL encryption. */ private boolean sslEncryption; @@ -155,7 +150,7 @@ private boolean isSecurePort(String serverURL) { // Always true unless changed for test purposes. return useSSL; return true; } /** @@ -236,56 +231,50 @@ public ProtocolSession createServerSession(Socket socket, int soTimeout) throws ConfigException, IOException { if (useSSL) try { try { // Create a new SSL context every time to make sure we pick up the // latest contents of the trust store. CryptoManager cryptoManager = DirectoryConfig.getCryptoManager(); SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname); SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); // Create a new SSL context every time to make sure we pick up the // latest contents of the trust store. CryptoManager cryptoManager = DirectoryConfig.getCryptoManager(); SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname); SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory(); SSLSocket secureSocket = (SSLSocket) sslSocketFactory.createSocket(socket, SSLSocket secureSocket = (SSLSocket) sslSocketFactory.createSocket(socket, socket.getInetAddress().getHostName(), socket.getPort(), false); secureSocket.setUseClientMode(false); secureSocket.setNeedClientAuth(true); secureSocket.setSoTimeout(soTimeout); secureSocket.setUseClientMode(false); secureSocket.setNeedClientAuth(true); secureSocket.setSoTimeout(soTimeout); if (sslProtocols != null) { secureSocket.setEnabledProtocols(sslProtocols); } if (sslCipherSuites != null) { secureSocket.setEnabledCipherSuites(sslCipherSuites); } // Force TLS negotiation now. secureSocket.startHandshake(); // SSLSession sslSession = secureSocket.getSession(); // System.out.println("Peer = " + sslSession.getPeerHost() + ":" + // sslSession.getPeerPort()); // System.out.println("Principal = " + sslSession.getPeerPrincipal()); return new TLSSocketSession(socket, secureSocket); } catch (SSLException e) if (sslProtocols != null) { // This is probably a connection attempt from an unexpected client // log that to warn the administrator. InetAddress remHost = socket.getInetAddress(); Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(remHost. getHostName(), remHost.getHostAddress(), e.getLocalizedMessage()); logError(message); return null; secureSocket.setEnabledProtocols(sslProtocols); } } else if (sslCipherSuites != null) { secureSocket.setEnabledCipherSuites(sslCipherSuites); } // Force TLS negotiation now. secureSocket.startHandshake(); // SSLSession sslSession = secureSocket.getSession(); // System.out.println("Peer = " + sslSession.getPeerHost() + ":" + // sslSession.getPeerPort()); // System.out.println("Principal = " + sslSession.getPeerPrincipal()); return new TLSSocketSession(socket, secureSocket); } catch (SSLException e) { return new SocketSession(socket); // This is probably a connection attempt from an unexpected client // log that to warn the administrator. InetAddress remHost = socket.getInetAddress(); Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(remHost. getHostName(), remHost.getHostAddress(), e.getLocalizedMessage()); logError(message); return null; } } opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -668,15 +669,19 @@ @Override public String toString() { String localString; if (serverId != 0) { localString = "Directory Server "; localString += serverId + " " + serverURL + " " + getServiceId(); } else localString = "Unknown server"; return localString; StringBuilder builder = new StringBuilder("Replica DS("); builder.append(serverId); builder.append(") for domain \""); builder.append(replicationServerDomain.getBaseDn()); builder.append("\""); return builder.toString(); } else { return "Unknown server"; } } /** opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; import org.opends.messages.MessageBuilder; @@ -152,9 +153,10 @@ db = new ReplicationDB(id, baseDn, replicationServer, dbenv); firstChange = db.readFirstChange(); lastChange = db.readLastChange(); thread = new DirectoryThread(this, "Replication Server db for DS " + id + " and " + baseDn + " in RS " + replicationServer.getServerId()); thread = new DirectoryThread(this, "Replication server RS(" + replicationServer.getServerId() + ") changelog checkpointer for Replica DS(" + id + ") for domain \"" + baseDn + "\""); thread.start(); DirectoryServer.deregisterMonitorProvider( opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -23,6 +23,7 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; @@ -79,7 +80,7 @@ public ECLServerWriter(ProtocolSession session, ECLServerHandler handler, ReplicationServerDomain replicationServerDomain) { super(session, -1, handler, replicationServerDomain); super(session, handler, replicationServerDomain); setName("Replication ECL Writer Thread for operation " + handler.getOperationId()); opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -80,9 +80,10 @@ public MonitoringPublisher(ReplicationServerDomain replicationServerDomain, long period) { super("Monitoring publisher for " + replicationServerDomain.getBaseDn() + " in RS " + replicationServerDomain.getReplicationServer().getServerId()); super("Replication server RS(" + replicationServerDomain.getReplicationServer() .getServerId() + ") monitor publisher for domain \"" + replicationServerDomain.getBaseDn() + "\""); this.replicationServerDomain = replicationServerDomain; this.period = period; opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -135,7 +135,7 @@ new ConcurrentHashMap<String, ReplicationServerDomain>(); private String localURL = "null"; private boolean shutdown = false; private volatile boolean shutdown = false; private ReplicationDbEnv dbEnv; private int rcvWindow; private int queueSize; @@ -336,11 +336,10 @@ // Read incoming messages and create LDAP or ReplicationServer listener // and Publisher. ProtocolSession session; Socket newSocket = null; try { ProtocolSession session; Socket newSocket = null; try { newSocket = listenSocket.accept(); @@ -586,18 +585,14 @@ if (debugEnabled()) TRACER.debugInfo("RS " +getMonitorInstanceName()+ " creates connect thread"); connectThread = new ReplicationServerConnectThread("Replication Server Connect " + serverId , this); connectThread = new ReplicationServerConnectThread(this); connectThread.start(); if (debugEnabled()) TRACER.debugInfo("RS " +getMonitorInstanceName()+ " creates listen thread"); listenThread = new ReplicationServerListenThread("Replication Server Listener " + serverId , this); listenThread = new ReplicationServerListenThread(this); listenThread.start(); // Creates the ECL workflow elem so that DS (LDAPReplicationDomain) @@ -1027,9 +1022,7 @@ listenSocket = new ServerSocket(); listenSocket.bind(new InetSocketAddress(replicationPort)); listenThread = new ReplicationServerListenThread( "Replication Server Listener", this); listenThread = new ReplicationServerListenThread(this); listenThread.start(); } catch (IOException e) opends/src/server/org/opends/server/replication/server/ReplicationServerConnectThread.java
@@ -45,15 +45,13 @@ * Creates a new instance of this directory thread with the * specified name. * * @param threadName The human-readable name to use for this * thread for debugging purposes. * @param server The ReplicationServer that will be called to * handle the connections. */ public ReplicationServerConnectThread( String threadName, ReplicationServer server) public ReplicationServerConnectThread(ReplicationServer server) { super(threadName); super("Replication server RS(" + server.getServerId() + ") connector thread"); this.server = server; } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -202,8 +202,9 @@ { this.baseDn = baseDn; this.replicationServer = replicationServer; this.assuredTimeoutTimer = new Timer("Replication Assured Timer for " + baseDn + " in RS " + replicationServer.getServerId(), true); this.assuredTimeoutTimer = new Timer("Replication server RS(" + replicationServer.getServerId() + ") assured timer for domain \"" + baseDn + "\"", true); DirectoryServer.registerMonitorProvider(this); } opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -815,14 +816,19 @@ @Override public String toString() { String localString; if (serverId != 0) { localString = "Replication Server "; localString += serverId + " " + serverURL + " " + getServiceId(); } else localString = "Unknown server"; return localString; StringBuilder builder = new StringBuilder("Replication server RS("); builder.append(serverId); builder.append(") for domain \""); builder.append(replicationServerDomain.getBaseDn()); builder.append("\""); return builder.toString(); } else { return "Unknown server"; } } /** * Gets the status of the connected DS. opends/src/server/org/opends/server/replication/server/ReplicationServerListenThread.java
@@ -31,7 +31,7 @@ /** * This Class is used to create a thread that is responsible for listening * on the Replication Server thread and accept new incomng connections * on the Replication Server thread and accept new incoming connections * from other replication servers or from LDAP servers. */ public class ReplicationServerListenThread extends DirectoryThread @@ -45,15 +45,14 @@ * Creates a new instance of this directory thread with the * specified name. * * @param threadName The human-readable name to use for this * thread for debugging purposes. * @param server The ReplicationServer that will be called to * handle the connections. */ public ReplicationServerListenThread( String threadName, ReplicationServer server) public ReplicationServerListenThread(ReplicationServer server) { super(threadName); super("Replication server RS(" + server.getServerId() + ") connection listener on port " + server.getReplicationPort()); this.server = server; } opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -23,6 +23,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2011 ForgeRock AS */ package org.opends.server.replication.server; @@ -375,8 +376,8 @@ // sendWindow MUST be created before starting the writer sendWindow = new Semaphore(sendWindowSize); writer = new ServerWriter(session, serverId, this, replicationServerDomain); writer = new ServerWriter(session, this, replicationServerDomain); reader = new ServerReader(session, this); reader.start(); @@ -385,11 +386,12 @@ // Create a thread to send heartbeat messages. if (heartbeatInterval > 0) { heartbeatThread = new HeartbeatThread( "Replication Heartbeat to " + this + " in RS " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName(), session, heartbeatInterval / 3); String threadName = "Replication server RS(" + this.getReplicationServerId() + ") heartbeat publisher to " + this.toString() + " at " + session.getReadableRemoteAddress(); heartbeatThread = new HeartbeatThread(threadName, session, heartbeatInterval / 3); heartbeatThread.start(); } } opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -62,17 +62,21 @@ private final ProtocolSession session; private final ServerHandler handler; /** * Constructor for the LDAP server reader part of the replicationServer. * * @param session The ProtocolSession from which to read the data. * @param handler The server handler for this server reader. * @param session * The ProtocolSession from which to read the data. * @param handler * The server handler for this server reader. */ public ServerReader(ProtocolSession session, ServerHandler handler) public ServerReader(ProtocolSession session, ServerHandler handler) { super("Replication Reader Thread for RS handler " + handler.getMonitorInstanceName()); super("Replication server RS(" + handler.getReplicationServerId() + ") reading from " + handler.toString() + " at " + session.getReadableRemoteAddress()); this.session = session; this.handler = handler; } opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -63,24 +63,25 @@ private final ReplicationServerDomain replicationServerDomain; private final short protocolVersion; /** * Create a ServerWriter. * Then ServerWriter then waits on the ServerHandler for new updates * and forward them to the server * Create a ServerWriter. Then ServerWriter then waits on the ServerHandler * for new updates and forward them to the server * * @param session the ProtocolSession that will be used to send updates. * @param serverId the Identifier of the server. * @param handler handler for which the ServerWriter is created. * @param replicationServerDomain The ReplicationServerDomain of this * ServerWriter. * @param session * the ProtocolSession that will be used to send updates. * @param handler * handler for which the ServerWriter is created. * @param replicationServerDomain * The ReplicationServerDomain of this ServerWriter. */ public ServerWriter(ProtocolSession session, int serverId, ServerHandler handler, ReplicationServerDomain replicationServerDomain) public ServerWriter(ProtocolSession session, ServerHandler handler, ReplicationServerDomain replicationServerDomain) { super("Replication Writer Thread for handler of " + handler.toString() + " in " + replicationServerDomain); super("Replication server RS(" + handler.getReplicationServerId() + ") writing to " + handler.toString() + " at " + session.getReadableRemoteAddress()); this.session = session; this.handler = handler; opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -75,9 +75,10 @@ public StatusAnalyzer(ReplicationServerDomain replicationServerDomain, int degradedStatusThreshold) { super("Replication Server Status Analyzer for " + replicationServerDomain.getBaseDn() + " in RS " + replicationServerDomain.getReplicationServer().getServerId()); super("Replication server RS(" + replicationServerDomain.getReplicationServer() .getServerId() + ") delay monitor for domain \"" + replicationServerDomain.getBaseDn() + "\""); this.replicationServerDomain = replicationServerDomain; this.degradedStatusThreshold = degradedStatusThreshold; opends/src/server/org/opends/server/replication/service/ListenerThread.java
@@ -61,9 +61,10 @@ */ public ListenerThread(ReplicationDomain repDomain) { super("Replication Listener for server id " + repDomain.getServerId() + " and domain " + repDomain.getServiceID()); this.repDomain = repDomain; super("Replica DS(" + repDomain.getServerId() + ") listener for domain \"" + repDomain.getServiceID() + "\""); this.repDomain = repDomain; } /** opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2247,12 +2247,16 @@ // Start a heartbeat monitor thread. if (heartbeatInterval > 0) { heartbeatMonitor = new HeartbeatMonitor("Replication Heartbeat Monitor on RS " + getReplicationServer() + " " + rsServerId + " for " + baseDn + " in DS " + serverId, session, heartbeatInterval, (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)); String threadName = "Replica DS(" + this.getServerId() + ") heartbeat monitor for domain \"" + this.baseDn + "\" from RS(" + this.getRsServerId() + ") at " + session.getReadableRemoteAddress(); heartbeatMonitor = new HeartbeatMonitor( threadName, session, heartbeatInterval, (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)); heartbeatMonitor.start(); } } @@ -3142,11 +3146,15 @@ // Start a CN heartbeat thread. if (changeTimeHeartbeatSendInterval > 0) { ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( "Replication CN Heartbeat sender for " + baseDn + " with " + getReplicationServer(), session, changeTimeHeartbeatSendInterval, serverId); String threadName = "Replica DS(" + this.getServerId() + ") change time heartbeat publisher for domain \"" + this.baseDn + "\" to RS(" + this.getRsServerId() + ") at " + session.getReadableRemoteAddress(); ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread( threadName, session, changeTimeHeartbeatSendInterval, serverId); ctHeartbeatPublisherThread.start(); } else {