/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.*; import org.opends.server.loggers.debug.DebugTracer; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.IOException; import java.util.List; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.Semaphore; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.MonitorProvider; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.ProtocolVersion; import org.opends.server.replication.protocol.AckMessage; import org.opends.server.replication.protocol.ReplServerStartMessage; import org.opends.server.replication.protocol.HeartbeatThread; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.RoutableMessage; import org.opends.server.replication.protocol.ServerStartMessage; import org.opends.server.replication.protocol.ReplicationMessage; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.replication.protocol.WindowMessage; import org.opends.server.replication.protocol.WindowProbe; import org.opends.server.replication.protocol.ReplServerInfoMessage; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.types.InitializationException; import org.opends.server.util.TimeThread; /** * This class defines a server handler, which handles all interaction with a * replication server. */ public class ServerHandler extends MonitorProvider { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * Time during which the server will wait for existing thread to stop * during the shutdown. */ private static final int SHUTDOWN_JOIN_TIMEOUT = 30000; private short serverId; private ProtocolSession session; private final MsgQueue msgQueue = new MsgQueue(); private MsgQueue lateQueue = new MsgQueue(); private final Map waitingAcks = new HashMap(); private ReplicationCache replicationCache = null; private String serverURL; private int outCount = 0; // number of update sent to the server private int inCount = 0; // number of updates received from the server private int inAckCount = 0; private int outAckCount = 0; private int maxReceiveQueue = 0; private int maxSendQueue = 0; private int maxReceiveDelay = 0; private int maxSendDelay = 0; private int maxQueueSize = 10000; private int restartReceiveQueue; private int restartSendQueue; private int restartReceiveDelay; private int restartSendDelay; private boolean serverIsLDAPserver; private boolean following = false; private ServerState serverState; private boolean active = true; private ServerWriter writer = null; private DN baseDn = null; private String serverAddressURL; private int rcvWindow; private int rcvWindowSizeHalf; private int maxRcvWindow; private ServerReader reader; private Semaphore sendWindow; private int sendWindowSize; private boolean flowControl = false; // indicate that the server is // flow controled and should // be stopped from sending messsages. private int saturationCount = 0; private short replicationServerId; private short protocolVersion; /** * When this Handler is connected to a changelog server this collection * will contain the list of LDAP servers connected to the remote changelog * server. */ private List remoteLDAPservers = new ArrayList(); /** * The time in milliseconds between heartbeats from the replication * server. Zero means heartbeats are off. */ private long heartbeatInterval = 0; /** * The thread that will send heartbeats. */ HeartbeatThread heartbeatThread = null; private static final Map changelogsWaitingAcks = new HashMap(); /** * Creates a new server handler instance with the provided socket. * * @param session The ProtocolSession used by the ServerHandler to * communicate with the remote entity. * @param queueSize The maximum number of update that will be kept * in memory by this ServerHandler. */ public ServerHandler(ProtocolSession session, int queueSize) { super("Server Handler"); this.session = session; this.maxQueueSize = queueSize; this.protocolVersion = ProtocolVersion.currentVersion(); } /** * Do the exchange of start messages to know if the remote * server is an LDAP or replication server and to exchange serverID. * Then create the reader and writer thread. * * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. * null if this is an incoming connection. * @param replicationServerId The identifier of the replicationServer that * creates this server handler. * @param replicationServerURL The URL of the replicationServer that creates * this server handler. * @param windowSize the window size that this server handler must use. * @param replicationServer the ReplicationServer that created this server * handler. */ public void start(DN baseDn, short replicationServerId, String replicationServerURL, int windowSize, ReplicationServer replicationServer) { this.replicationServerId = replicationServerId; rcvWindowSizeHalf = windowSize/2; maxRcvWindow = windowSize; rcvWindow = windowSize; try { if (baseDn != null) { // This is an outgoing connection. Publish our start message. this.baseDn = baseDn; replicationCache = replicationServer.getReplicationCache(baseDn); ServerState localServerState = replicationCache.getDbServerState(); ReplServerStartMessage msg = new ReplServerStartMessage(replicationServerId, replicationServerURL, baseDn, windowSize, localServerState, protocolVersion); session.publish(msg); } // Wait and process ServerStart or ReplServerStart ReplicationMessage msg = session.receive(); if (msg instanceof ServerStartMessage) { // The remote server is an LDAP Server ServerStartMessage receivedMsg = (ServerStartMessage) msg; protocolVersion = ProtocolVersion.minWithCurrent( receivedMsg.getVersion()); serverId = receivedMsg.getServerId(); serverURL = receivedMsg.getServerURL(); this.baseDn = receivedMsg.getBaseDn(); this.serverState = receivedMsg.getServerState(); maxReceiveDelay = receivedMsg.getMaxReceiveDelay(); maxReceiveQueue = receivedMsg.getMaxReceiveQueue(); maxSendDelay = receivedMsg.getMaxSendDelay(); maxSendQueue = receivedMsg.getMaxSendQueue(); heartbeatInterval = receivedMsg.getHeartbeatInterval(); if (maxReceiveQueue > 0) restartReceiveQueue = (maxReceiveQueue > 1000 ? maxReceiveQueue - 200 : maxReceiveQueue*8/10); else restartReceiveQueue = 0; if (maxSendQueue > 0) restartSendQueue = (maxSendQueue > 1000 ? maxSendQueue - 200 : maxSendQueue*8/10); else restartSendQueue = 0; if (maxReceiveDelay > 0) restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay -1 : maxReceiveDelay); else restartReceiveDelay = 0; if (maxSendDelay > 0) restartSendDelay = (maxSendDelay > 10 ? maxSendDelay -1 : maxSendDelay); else restartSendDelay = 0; if (heartbeatInterval < 0) { heartbeatInterval = 0; } serverIsLDAPserver = true; // This an incoming connection. Publish our start message replicationCache = replicationServer.getReplicationCache(this.baseDn); ServerState localServerState = replicationCache.getDbServerState(); ReplServerStartMessage myStartMsg = new ReplServerStartMessage(replicationServerId, replicationServerURL, this.baseDn, windowSize, localServerState, protocolVersion); session.publish(myStartMsg); sendWindowSize = receivedMsg.getWindowSize(); } else if (msg instanceof ReplServerStartMessage) { // The remote server is a replication server ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg; protocolVersion = ProtocolVersion.minWithCurrent( receivedMsg.getVersion()); serverId = receivedMsg.getServerId(); serverURL = receivedMsg.getServerURL(); int separator = serverURL.lastIndexOf(':'); serverAddressURL = session.getRemoteAddress() + ":" + serverURL.substring(separator + 1); serverIsLDAPserver = false; this.baseDn = receivedMsg.getBaseDn(); if (baseDn == null) { replicationCache = replicationServer.getReplicationCache(this.baseDn); ServerState serverState = replicationCache.getDbServerState(); // Publish our start message ReplServerStartMessage outMsg = new ReplServerStartMessage(replicationServerId, replicationServerURL, this.baseDn, windowSize, serverState, protocolVersion); session.publish(outMsg); } else { this.baseDn = baseDn; } this.serverState = receivedMsg.getServerState(); sendWindowSize = receivedMsg.getWindowSize(); } else { // TODO : log error return; // we did not recognize the message, ignore it } replicationCache = replicationServer.getReplicationCache(this.baseDn); boolean started; if (serverIsLDAPserver) { started = replicationCache.startServer(this); } else { started = replicationCache.startReplicationServer(this); } if (started) { writer = new ServerWriter(session, serverId, this, replicationCache); reader = new ServerReader(session, serverId, this, replicationCache); reader.start(); writer.start(); // Create a thread to send heartbeat messages. if (heartbeatInterval > 0) { heartbeatThread = new HeartbeatThread("replication Heartbeat", session, heartbeatInterval); heartbeatThread.start(); } DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); DirectoryServer.registerMonitorProvider(this); } else { // the connection is not valid, close it. try { session.close(); } catch (IOException e1) { // ignore } } } catch (Exception e) { // some problem happened, reject the connection int msgID = MSGID_CHANGELOG_CONNECTION_ERROR; String message = getMessage(msgID, this.toString()) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); try { session.close(); } catch (IOException e1) { // ignore } } sendWindow = new Semaphore(sendWindowSize); } /** * get the Server Id. * * @return the ID of the server to which this object is linked */ public short getServerId() { return serverId; } /** * Retrieves the Address URL for this server handler. * * @return The Address URL for this server handler, * in the form of an IP address and port separated by a colon. */ public String getServerAddressURL() { return serverAddressURL; } /** * Retrieves the URL for this server handler. * * @return The URL for this server handler, in the form of an address and * port separated by a colon. */ public String getServerURL() { return serverURL; } /** * Increase the counter of updates sent to the server. */ public void incrementOutCount() { outCount++; } /** * Increase the counter of update received from the server. */ public void incrementInCount() { inCount++; } /** * Get the count of updates received from the server. * @return the count of update received from the server. */ public int getInCount() { return inCount; } /** * Get the count of updates sent to this server. * @return The count of update sent to this server. */ public int getOutCount() { return outCount; } /** * Get the number of Ack received from the server managed by this handler. * * @return Returns the inAckCount. */ public int getInAckCount() { return inAckCount; } /** * Get the number of Ack sent to the server managed by this handler. * * @return Returns the outAckCount. */ public int getOutAckCount() { return outAckCount; } /** * Check is this server is saturated (this server has already been * sent a bunch of updates and has not processed them so they are staying * in the message queue for this server an the size of the queue * for this server is above the configured limit. * * The limit can be defined in number of updates or with a maximum delay * * @param changeNumber The changenumber to use to make the delay calculations. * @param sourceHandler The ServerHandler which is sending the update. * @return true is saturated false if not saturated. */ public boolean isSaturated(ChangeNumber changeNumber, ServerHandler sourceHandler) { synchronized (msgQueue) { int size = msgQueue.size(); if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue)) return true; if ((sourceHandler.maxSendQueue > 0) && (size >= sourceHandler.maxSendQueue)) return true; if (!msgQueue.isEmpty()) { UpdateMessage firstUpdate = msgQueue.first(); if (firstUpdate != null) { long timeDiff = changeNumber.getTimeSec() - firstUpdate.getChangeNumber().getTimeSec(); if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay)) return true; if ((sourceHandler.maxSendDelay > 0) && (timeDiff >= sourceHandler.maxSendDelay)) return true; } } return false; } } /** * Check that the size of the Server Handler messages Queue has lowered * below the limit and therefore allowing the reception of messages * from other servers to restart. * @param source The ServerHandler which was sending the update. * can be null. * @return true if the processing can restart */ public boolean restartAfterSaturation(ServerHandler source) { synchronized (msgQueue) { int queueSize = msgQueue.size(); if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue)) return false; if ((source != null) && (source.maxSendQueue > 0) && (queueSize >= source.restartSendQueue)) return false; if (!msgQueue.isEmpty()) { UpdateMessage firstUpdate = msgQueue.first(); UpdateMessage lastUpdate = msgQueue.last(); if ((firstUpdate != null) && (lastUpdate != null)) { long timeDiff = lastUpdate.getChangeNumber().getTimeSec() - firstUpdate.getChangeNumber().getTimeSec(); if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay)) return false; if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >= source.restartSendDelay)) return false; } } } return true; } /** * Check if the server associated to this ServerHandler is a replication * server. * @return true if the server associated to this ServerHandler is a * replication server. */ public boolean isReplicationServer() { return (!serverIsLDAPserver); } /** * Get the number of message in the receive message queue. * @return Size of the receive message queue. */ public int getRcvMsgQueueSize() { synchronized (msgQueue) { /* * When the server is up to date or close to be up to date, * the number of updates to be sent is the size of the receive queue. */ if (isFollowing()) return msgQueue.size(); else { /* * When the server is not able to follow, the msgQueue * may become too large and therefore won't contain all the * changes. Some changes may only be stored in the backing DB * of the servers. * The total size of teh receieve queue is calculated by doing * the sum of the number of missing changes for every dbHandler. */ int totalCount = 0; ServerState dbState = replicationCache.getDbServerState(); for (short id : dbState) { int max = dbState.getMaxChangeNumber(id).getSeqnum(); ChangeNumber currentChange = serverState.getMaxChangeNumber(id); if (currentChange != null) { int current = currentChange.getSeqnum(); if (current == max) { } else if (current < max) { totalCount += max - current; } else { totalCount += Integer.MAX_VALUE - (current - max) + 1; } } else { totalCount += max; } } return totalCount; } } } /** * Get an approximation of the delay by looking at the age of the odest * message that has not been sent to this server. * This is an approximation because the age is calculated using the * clock of the servee where the replicationServer is currently running * while it should be calculated using the clock of the server * that originally processed the change. * * The approximation error is therefore the time difference between * * @return the approximate delay for the connected server. */ public long getApproxDelay() { long olderUpdateTime = getOlderUpdateTime(); if (olderUpdateTime == 0) return 0; long currentTime = TimeThread.getTime(); return ((currentTime - olderUpdateTime)/1000); } /** * Get the age of the older change that has not yet been replicated * to the server handled by this ServerHandler. * * @return The age if the older change has not yet been replicated * to the server handled by this ServerHandler. */ public long getOlderUpdateTime() { synchronized (msgQueue) { if (isFollowing()) { if (msgQueue.isEmpty()) return 0; UpdateMessage msg = msgQueue.first(); return msg.getChangeNumber().getTime(); } else { if (lateQueue.isEmpty()) return 0; UpdateMessage msg = lateQueue.first(); return msg.getChangeNumber().getTime(); } } } /** * Check if the LDAP server can follow the speed of the other servers. * @return true when the server has all the not yet sent changes * in its queue. */ public boolean isFollowing() { return following; } /** * Set the following flag of this server. * @param following the value that should be set. */ public void setFollowing(boolean following) { this.following = following; } /** * Add an update the list of updates that must be sent to the server * managed by this ServerHandler. * * @param update The update that must be added to the list of updates. * @param sourceHandler The server that sent the update. */ public void add(UpdateMessage update, ServerHandler sourceHandler) { synchronized (msgQueue) { /* * If queue was empty the writer thread was probably asleep * waiting for some changes, wake it up */ if (msgQueue.isEmpty()) msgQueue.notify(); msgQueue.add(update); /* TODO : size should be configurable * and larger than max-receive-queue-size */ while (msgQueue.size() > maxQueueSize) { following = false; msgQueue.removeFirst(); } } if (isSaturated(update.getChangeNumber(), sourceHandler)) { sourceHandler.setSaturated(true); } } private void setSaturated(boolean value) { flowControl = value; } /** * Select the next update that must be sent to the server managed by this * ServerHandler. * * @return the next update that must be sent to the server managed by this * ServerHandler. */ public UpdateMessage take() { boolean interrupted = true; UpdateMessage msg = getnextMessage(); /* * When we remove a message from the queue we need to check if another * server is waiting in flow control because this queue was too long. * This check might cause a performance penalty an therefore it * is not done for every message removed but only every few messages. */ if (++saturationCount > 10) { saturationCount = 0; try { replicationCache.checkAllSaturation(); } catch (IOException e) { } } do { try { sendWindow.acquire(); interrupted = false; } catch (InterruptedException e) { // loop until not interrupted } } while (interrupted); this.incrementOutCount(); return msg; } /** * Get the next update that must be sent to the server * from the message queue or from the database. * * @return The next update that must be sent to the server. */ private UpdateMessage getnextMessage() { UpdateMessage msg; while (active == true) { if (following == false) { /* this server is late with regard to some other masters * in the topology or just joined the topology. * In such cases, we can't keep all changes in the queue * without saturating the memory, we therefore use * a lateQueue that is filled with a few changes from the changelogDB * If this server is able to close the gap, it will start using again * the regular msgQueue later. */ if (lateQueue.isEmpty()) { /* * Start from the server State * Loop until the queue high mark or until no more changes * for each known LDAP master * get the next CSN after this last one : * - try to get next from the file * - if not found in the file * - try to get the next from the queue * select the smallest of changes * check if it is in the memory tree * yes : lock memory tree. * check all changes from the list, remove the ones that * are already sent * unlock memory tree * restart as usual * load this change on the delayList * */ ReplicationIteratorComparator comparator = new ReplicationIteratorComparator(); SortedSet iteratorSortedSet = new TreeSet(comparator); /* fill the lateQueue */ for (short serverId : replicationCache.getServers()) { ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); ReplicationIterator iterator = replicationCache.getChangelogIterator(serverId, lastCsn); if ((iterator != null) && (iterator.getChange() != null)) { iteratorSortedSet.add(iterator); } } while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100)) { ReplicationIterator iterator = iteratorSortedSet.first(); iteratorSortedSet.remove(iterator); lateQueue.add(iterator.getChange()); if (iterator.next()) iteratorSortedSet.add(iterator); else iterator.releaseCursor(); } for (ReplicationIterator iterator : iteratorSortedSet) { iterator.releaseCursor(); } /* * Check if the first change in the lateQueue is also on the regular * queue */ if (lateQueue.isEmpty()) { synchronized (msgQueue) { if (msgQueue.size() < maxQueueSize) { following = true; } } } else { msg = lateQueue.first(); synchronized (msgQueue) { if (msgQueue.contains(msg)) { /* we finally catched up with the regular queue */ following = true; lateQueue.clear(); UpdateMessage msg1; do { msg1 = msgQueue.removeFirst(); } while (!msg.getChangeNumber().equals(msg1.getChangeNumber())); this.updateServerState(msg); return msg; } } } } else { /* get the next change from the lateQueue */ msg = lateQueue.removeFirst(); this.updateServerState(msg); return msg; } } synchronized (msgQueue) { if (following == true) { try { while (msgQueue.isEmpty()) { msgQueue.wait(500); if (!active) return null; } } catch (InterruptedException e) { return null; } msg = msgQueue.removeFirst(); if (this.updateServerState(msg)) { /* * Only push the message if it has not yet been seen * by the other server. * Otherwise just loop to select the next message. */ return msg; } } } /* * Need to loop because following flag may have gone to false between * the first check at the beginning of this method * and the second check just above. */ } return null; } /** * Update the serverState with the last message sent. * * @param msg the last update sent. * @return boolean indicating if the update was meaningfull. */ public boolean updateServerState(UpdateMessage msg) { return serverState.update(msg.getChangeNumber()); } /** * Stop this server handler processing. */ public void stopHandler() { active = false; try { session.close(); } catch (IOException e) { // ignore. } synchronized (msgQueue) { /* wake up the writer thread on an empty queue so that it disappear */ msgQueue.clear(); msgQueue.notify(); msgQueue.notifyAll(); } // Stop the heartbeat thread. if (heartbeatThread != null) { heartbeatThread.shutdown(); } DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); } /** * Send the ack to the server that did the original modification. * * @param changeNumber The ChangeNumber of the update that is acked. * @throws IOException In case of Exception thrown sending the ack. */ public void sendAck(ChangeNumber changeNumber) throws IOException { AckMessage ack = new AckMessage(changeNumber); session.publish(ack); outAckCount++; } /** * Do the work when an ack message has been received from another server. * * @param message The ack message that was received. * @param ackingServerId The id of the server that acked the change. */ public void ack(AckMessage message, short ackingServerId) { ChangeNumber changeNumber = message.getChangeNumber(); AckMessageList ackList; boolean completedFlag; synchronized (waitingAcks) { ackList = waitingAcks.get(changeNumber); if (ackList == null) return; ackList.addAck(ackingServerId); completedFlag = ackList.completed(); if (completedFlag) { waitingAcks.remove(changeNumber); } } if (completedFlag) { replicationCache.sendAck(changeNumber, true); } } /** * Process reception of an for an update that was received from a * ReplicationServer. * * @param message the ack message that was received. * @param ackingServerId The id of the server that acked the change. */ public static void ackChangelog(AckMessage message, short ackingServerId) { ChangeNumber changeNumber = message.getChangeNumber(); ReplServerAckMessageList ackList; boolean completedFlag; synchronized (changelogsWaitingAcks) { ackList = changelogsWaitingAcks.get(changeNumber); if (ackList == null) return; ackList.addAck(ackingServerId); completedFlag = ackList.completed(); if (completedFlag) { changelogsWaitingAcks.remove(changeNumber); } } if (completedFlag) { ReplicationCache replicationCache = ackList.getChangelogCache(); replicationCache.sendAck(changeNumber, false, ackList.getReplicationServerId()); } } /** * Add an update to the list of update waiting for acks. * * @param update the update that must be added to the list * @param nbWaitedAck The number of ack that must be received before * the update is fully acked. */ public void addWaitingAck(UpdateMessage update, int nbWaitedAck) { AckMessageList ackList = new AckMessageList(update.getChangeNumber(), nbWaitedAck); synchronized(waitingAcks) { waitingAcks.put(update.getChangeNumber(), ackList); } } /** * Add an update to the list of update received from a replicationServer and * waiting for acks. * * @param update The update that must be added to the list. * @param ChangelogServerId The identifier of the replicationServer that sent * the update. * @param replicationCache The ReplicationCache from which the change was * processed and to which the ack must later be sent. * @param nbWaitedAck The number of ack that must be received before * the update is fully acked. */ public static void addWaitingAck( UpdateMessage update, short ChangelogServerId, ReplicationCache replicationCache, int nbWaitedAck) { ReplServerAckMessageList ackList = new ReplServerAckMessageList(update.getChangeNumber(), nbWaitedAck, ChangelogServerId, replicationCache); synchronized(changelogsWaitingAcks) { changelogsWaitingAcks.put(update.getChangeNumber(), ackList); } } /** * Get the size of the list of update waiting for acks. * * @return the size of the list of update waiting for acks. */ public int getWaitingAckSize() { synchronized (waitingAcks) { return waitingAcks.size(); } } /** * Increment the count of Acks received from this server. */ public void incrementInAckCount() { inAckCount++; } /** * Check type of server handled. * * @return true if the handled server is an LDAP server. * false if the handled server is a replicationServer */ public boolean isLDAPserver() { return serverIsLDAPserver; } /** * {@inheritDoc} */ @Override public void initializeMonitorProvider(MonitorProviderCfg configuration) throws ConfigException,InitializationException { // Nothing to do for now } /** * Retrieves the name of this monitor provider. It should be unique among all * monitor providers, including all instances of the same monitor provider. * * @return The name of this monitor provider. */ @Override public String getMonitorInstanceName() { String str = baseDn.toString() + " " + serverURL + " " + String.valueOf(serverId); if (serverIsLDAPserver) return "Remote LDAP Server " + str; else return "Remote Replication Server " + str; } /** * Retrieves the length of time in milliseconds that should elapse between * calls to the updateMonitorData() method. A negative or zero * return value indicates that the updateMonitorData() method * should not be periodically invoked. * * @return The length of time in milliseconds that should elapse between * calls to the updateMonitorData() method. */ @Override public long getUpdateInterval() { /* we don't wont to do polling on this monitor */ return 0; } /** * Performs any processing periodic processing that may be desired to update * the information associated with this monitor. Note that best-effort * attempts will be made to ensure that calls to this method come * getUpdateInterval() milliseconds apart, but no guarantees will * be made. */ @Override public void updateMonitorData() { // As long as getUpdateInterval() returns 0, this will never get called } /** * Retrieves a set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is requested. * * @return A set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is * requested. */ @Override public ArrayList getMonitorData() { ArrayList attributes = new ArrayList(); if (serverIsLDAPserver) attributes.add(new Attribute("LDAP-Server", serverURL)); else attributes.add(new Attribute("ReplicationServer-Server", serverURL)); attributes.add(new Attribute("server-id", String.valueOf(serverId))); attributes.add(new Attribute("base-dn", baseDn.toString())); attributes.add(new Attribute("waiting-changes", String.valueOf(getRcvMsgQueueSize()))); attributes.add(new Attribute("max-waiting-changes", String.valueOf(maxQueueSize))); attributes.add(new Attribute("update-waiting-acks", String.valueOf(getWaitingAckSize()))); attributes.add(new Attribute("update-sent", String.valueOf(getOutCount()))); attributes.add(new Attribute("update-received", String.valueOf(getInCount()))); attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount()))); attributes.add(new Attribute("ack-received", String.valueOf(getInAckCount()))); attributes.add(new Attribute("approximate-delay", String.valueOf(getApproxDelay()))); attributes.add(new Attribute("max-send-window", String.valueOf(sendWindowSize))); attributes.add(new Attribute("current-send-window", String.valueOf(sendWindow.availablePermits()))); attributes.add(new Attribute("max-rcv-window", String.valueOf(maxRcvWindow))); attributes.add(new Attribute("current-rcv-window", String.valueOf(rcvWindow))); long olderUpdateTime = getOlderUpdateTime(); if (olderUpdateTime != 0) { Date date = new Date(getOlderUpdateTime()); attributes.add(new Attribute("older-change-not-synchronized", String.valueOf(date.toString()))); } /* get the Server State */ final String ATTR_SERVER_STATE = "server-state"; AttributeType type = DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE); LinkedHashSet values = new LinkedHashSet(); for (String str : serverState.toStringSet()) { values.add(new AttributeValue(type,str)); } Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values); attributes.add(attr); return attributes; } /** * Shutdown This ServerHandler. */ public void shutdown() { try { session.close(); } catch (IOException e) { // Service is closing. } stopHandler(); try { writer.join(SHUTDOWN_JOIN_TIMEOUT); reader.join(SHUTDOWN_JOIN_TIMEOUT); } catch (InterruptedException e) { // don't try anymore to join and return. } } /** * {@inheritDoc} */ @Override public String toString() { String localString; if (serverId != 0) { if (serverIsLDAPserver) localString = "Directory Server "; else localString = "Replication Server "; localString += serverId + " " + serverURL + " " + baseDn; } else localString = "Unknown server"; return localString; } /** * Decrement the protocol window, then check if it is necessary * to send a WindowMessage and send it. * * @throws IOException when the session becomes unavailable. */ public synchronized void decAndCheckWindow() throws IOException { rcvWindow--; checkWindow(); } /** * Check the protocol window and send WindowMessage if necessary. * * @throws IOException when the session becomes unavailable. */ public synchronized void checkWindow() throws IOException { if (rcvWindow < rcvWindowSizeHalf) { if (flowControl) { if (replicationCache.restartAfterSaturation(this)) { flowControl = false; } } if (!flowControl) { WindowMessage msg = new WindowMessage(rcvWindowSizeHalf); session.publish(msg); outAckCount++; rcvWindow += rcvWindowSizeHalf; } } } /** * Update the send window size based on the credit specified in the * given window message. * * @param windowMsg The Window Message containing the information * necessary for updating the window size. */ public void updateWindow(WindowMessage windowMsg) { sendWindow.release(windowMsg.getNumAck()); } /** * Get our heartbeat interval. * @return Our heartbeat interval. */ public long getHeartbeatInterval() { return heartbeatInterval; } /** * Processes a routable message. * * @param msg The message to be processed. */ public void process(RoutableMessage msg) { if (debugEnabled()) TRACER.debugInfo("SH(" + replicationServerId + ") receives " + msg + " from " + serverId); replicationCache.process(msg, this); } /** * Sends the provided ReplServerInfoMessage. * * @param info The ReplServerInfoMessage message to be sent. * @throws IOException When it occurs while sending the message, * */ public void sendInfo(ReplServerInfoMessage info) throws IOException { session.publish(info); } /** * * Sets the replication server from the message provided. * * @param infoMsg The information message. */ public void setReplServerInfo(ReplServerInfoMessage infoMsg) { remoteLDAPservers = infoMsg.getConnectedServers(); } /** * When this handler is connected to a replication server, specifies if * a wanted server is connected to this replication server. * * @param wantedServer The server we want to know if it is connected * to the replication server represented by this handler. * @return boolean True is the wanted server is connected to the server * represented by this handler. */ public boolean isRemoteLDAPServer(short wantedServer) { for (String server : remoteLDAPservers) { if (wantedServer == Short.valueOf(server)) { return true; } } return false; } /** * When the handler is connected to a replication server, specifies the * replication server has remote LDAP servers connected to it. * * @return boolean True is the replication server has remote LDAP servers * connected to it. */ public List getRemoteLDAPServers() { return remoteLDAPservers; } /** * Send an InitializeRequestMessage to the server connected through this * handler. * * @param msg The message to be processed * @throws IOException when raised by the underlying session */ public void send(RoutableMessage msg) throws IOException { if (debugEnabled()) TRACER.debugInfo("SH(" + replicationServerId + ") forwards " + msg + " to " + serverId); session.publish(msg); } /** * Process the reception of a WindowProbe message. * * @param windowProbeMsg The message to process. * * @throws IOException When the session becomes unavailable. */ public void process(WindowProbe windowProbeMsg) throws IOException { if (rcvWindow > 0) { // The LDAP server believes that its window is closed // while it is not, this means that some problem happened in the // window exchange procedure ! // lets update the LDAP server with out current window size and hope // that everything will work better in the futur. // TODO also log an error message. WindowMessage msg = new WindowMessage(rcvWindow); session.publish(msg); outAckCount++; } else { // Both the LDAP server and the replication server believes that the // window is closed. Lets check the flowcontrol in case we // can now resume operations and send a windowMessage if necessary. checkWindow(); } } }