/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006 Sun Microsystems, Inc. */ package org.opends.server.changelog; import static org.opends.server.loggers.Error.logError; import static org.opends.server.messages.MessageHandler.getMessage; import static org.opends.server.synchronization.SynchMessages.*; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; import org.opends.server.api.MonitorProvider; import org.opends.server.config.ConfigEntry; import org.opends.server.config.ConfigException; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.DN; import org.opends.server.types.ErrorLogCategory; import org.opends.server.types.ErrorLogSeverity; import org.opends.server.core.DirectoryServer; import org.opends.server.core.InitializationException; import org.opends.server.synchronization.AckMessage; import org.opends.server.synchronization.ChangeNumber; import org.opends.server.synchronization.ChangelogStartMessage; import org.opends.server.synchronization.ServerStartMessage; import org.opends.server.synchronization.ServerState; import org.opends.server.synchronization.SynchronizationMessage; import org.opends.server.synchronization.UpdateMessage; import org.opends.server.util.TimeThread; /** * This class defines a server handler, which handles all interaction with a * changelog server. */ public class ServerHandler extends MonitorProvider { private short serverId; private ProtocolSession session; private MsgQueue msgQueue = new MsgQueue(); private MsgQueue lateQueue = new MsgQueue(); private Map waitingAcks = new HashMap();; private ChangelogCache changelogCache = null; private String serverURL; private int outCount = 0; // number of update sent to the server private int inCount = 0; // number of updates received from the server private int inAckCount = 0; private int outAckCount = 0; private int maxReceiveQueue = 0; private int maxSendQueue = 0; private int maxReceiveDelay = 0; private int maxSendDelay = 0; private int restartReceiveQueue; private int restartSendQueue; private int restartReceiveDelay; private int restartSendDelay; private boolean serverIsLDAPserver; private boolean following = false; private ServerState serverState; private boolean active = true; private ServerWriter writer = null; private DN baseDn = null; private static Map changelogsWaitingAcks = new HashMap(); /** * Creates a new server handler instance with the provided socket. * * @param session The ProtocolSession used by the ServerHandler to * communicate with the remote entity. */ public ServerHandler(ProtocolSession session) { super("Server Handler"); this.session = session; } /** * Do the exchange of start messages to know if the remote * server is an LDAP or changelog server and to exchange serverID. * Then create the reader and writer thread. * * @param baseDn baseDn of the ServerHandler when this is an outgoing conn. * null if this is an incoming connection. */ public void start(DN baseDn) { try { if (baseDn != null) { this.baseDn = baseDn; changelogCache = Changelog.getChangelogCache(baseDn); ServerState localServerState = changelogCache.getDbServerState(); ChangelogStartMessage msg = new ChangelogStartMessage(Changelog.getServerId(), Changelog.getServerURL(), baseDn, localServerState); session.publish(msg); } SynchronizationMessage msg = session.receive(); if (msg instanceof ServerStartMessage) { ServerStartMessage receivedMsg = (ServerStartMessage) msg; serverId = receivedMsg.getServerId(); serverURL = receivedMsg.getServerURL(); this.baseDn = receivedMsg.getBaseDn(); this.serverState = receivedMsg.getServerState(); maxReceiveDelay = receivedMsg.getMaxReceiveDelay(); maxReceiveQueue = receivedMsg.getMaxReceiveQueue(); maxSendDelay = receivedMsg.getMaxSendDelay(); maxSendQueue = receivedMsg.getMaxSendQueue(); if (maxReceiveQueue > 0) restartReceiveQueue = (maxReceiveQueue > 1000 ? maxReceiveQueue - 200 : maxReceiveQueue*8/10); else restartReceiveQueue = 0; if (maxSendQueue > 0) restartSendQueue = (maxSendQueue > 1000 ? maxSendQueue - 200 : maxSendQueue*8/10); else restartSendQueue = 0; if (maxReceiveDelay > 0) restartReceiveDelay = (maxReceiveDelay > 10 ? maxReceiveDelay -1 : maxReceiveDelay); else restartReceiveDelay = 0; if (maxSendDelay > 0) restartSendDelay = (maxSendDelay > 10 ? maxSendDelay -1 : maxSendDelay); else restartSendDelay = 0; serverIsLDAPserver = true; changelogCache = Changelog.getChangelogCache(this.baseDn); ServerState localServerState = changelogCache.getDbServerState(); ChangelogStartMessage myStartMsg = new ChangelogStartMessage(Changelog.getServerId(), Changelog.getServerURL(), this.baseDn, localServerState); session.publish(myStartMsg); } else if (msg.getClass() == Class.forName( "org.opends.server.synchronization.ChangelogStartMessage")) { ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg; serverId = receivedMsg.getServerId(); serverURL = receivedMsg.getServerURL(); serverIsLDAPserver = false; this.baseDn = receivedMsg.getBaseDn(); if (baseDn == null) { changelogCache = Changelog.getChangelogCache(this.baseDn); ServerState serverState = changelogCache.getDbServerState(); ChangelogStartMessage outMsg = new ChangelogStartMessage(Changelog.getServerId(), Changelog.getServerURL(), this.baseDn, serverState); session.publish(outMsg); } else this.baseDn = baseDn; this.serverState = receivedMsg.getServerState(); } else { // TODO : log error return; // we did not recognize the message, ignore it } changelogCache = Changelog.getChangelogCache(this.baseDn); if (serverIsLDAPserver) { changelogCache.startServer(this); } else { changelogCache.startChangelog(this); } writer = new ServerWriter(session, serverId, this, changelogCache); ServerReader reader = new ServerReader(session, serverId, this, changelogCache); reader.start(); writer.start(); DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); DirectoryServer.registerMonitorProvider(this); } catch (Exception e) { // some problem happened, reject the connection int msgID = MSGID_CHANGELOG_CONNECTION_ERROR; String message = getMessage(msgID, this.toString()) + stackTraceToSingleLineString(e); logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR, message, msgID); try { session.close(); } catch (IOException e1) { // ignore } } } /** * get the Server Id. * * @return the ID of the server to which this object is linked */ public short getServerId() { return serverId; } /** * Retrieves the URL for this server handler. * * @return The URL for this server handler, in the form of an address and * port separated by a colon. */ public String getServerURL() { return serverURL; } /** * Increase the counter of updates sent to the server. */ public void incrementOutCount() { outCount++; } /** * Increase the counter of update received from the server. */ public void incrementInCount() { inCount++; } /** * Get the count of updates received from the server. * @return the count of update received from the server. */ public int getInCount() { return inCount; } /** * Get the count of updates sent to this server. * @return The count of update sent to this server. */ public int getOutCount() { return outCount; } /** * Get the number of Ack received from the server managed by this handler. * * @return Returns the inAckCount. */ public int getInAckCount() { return inAckCount; } /** * Get the number of Ack sent to the server managed by this handler. * * @return Returns the outAckCount. */ public int getOutAckCount() { return outAckCount; } /** * Check is this server is saturated (this server has already been * sent a bunch of updates and has not processed them so they are staying * in the message queue for this server an the size of the queue * for this server is above the configured limit. * * The limit can be defined in number of updates or with a maximum delay * * @param changeNumber The changenumber to use to make the delay calculations. * @param sourceHandler The ServerHandler which is sending the update. * @return true is saturated false if not saturated. */ public boolean isSaturated(ChangeNumber changeNumber, ServerHandler sourceHandler) { synchronized (msgQueue) { int size = msgQueue.size(); if ((maxReceiveQueue > 0) && (size >= maxReceiveQueue)) return true; if ((sourceHandler.maxSendQueue > 0) && (size >= sourceHandler.maxSendQueue)) return true; if (!msgQueue.isEmpty()) { UpdateMessage firstUpdate = msgQueue.first(); if (firstUpdate != null) { long timeDiff = changeNumber.getTimeSec() - firstUpdate.getChangeNumber().getTimeSec(); if ((maxReceiveDelay > 0) && (timeDiff >= maxReceiveDelay)) return true; if ((sourceHandler.maxSendDelay > 0) && (timeDiff >= sourceHandler.maxSendDelay)) return true; } } return false; } } /** * Check that the size of the Server Handler messages Queue has lowered * below the limit and therefore allowing the reception of messages * from other servers to restart. * @param source The ServerHandler which was sending the update. * can be null. * @return true if the processing can restart */ public boolean restartAfterSaturation(ServerHandler source) { synchronized (msgQueue) { if (msgQueue == null) return true; synchronized (msgQueue) { int queueSize = msgQueue.size(); if ((maxReceiveQueue > 0) && (queueSize >= restartReceiveQueue)) return false; if ((source != null) && (source.maxSendQueue > 0) && (queueSize >= source.restartSendQueue)) return false; if (!msgQueue.isEmpty()) { UpdateMessage firstUpdate = msgQueue.first(); UpdateMessage lastUpdate = msgQueue.last(); if ((firstUpdate != null) && (lastUpdate != null)) { long timeDiff = lastUpdate.getChangeNumber().getTimeSec() - firstUpdate.getChangeNumber().getTimeSec(); if ((maxReceiveDelay > 0) && (timeDiff >= restartReceiveDelay)) return false; if ((source != null) && (source.maxSendDelay > 0) && (timeDiff >= source.restartSendDelay)) return false; } } } return true; } } /** * Check if the server associated to this ServerHandler is a changelog server. * @return true if the server associated to this ServerHandler is a * changelog server. */ public boolean isChangelogServer() { return (!serverIsLDAPserver); } /** * Get the number of message in the receive message queue. * @return Size of the receive message queue. */ public int getRcvMsgQueueSize() { synchronized (msgQueue) { /* * TODO : When the server is not able to follow, the msgQueue * may become too large and therefore won't contain all the * changes. Some changes may only be stored in the backing DB * of the servers. * The calculation should be done by asking to the each dbHandler * how many changes need to be replicated and making the sum * For now just return maxint in this case */ if (isFollowing()) return msgQueue.size(); else return Integer.MAX_VALUE; } } /** * Get an approximation of the delay by looking at the age of the odest * message that has not been sent to this server. * This is an approximation because the age is calculated using the * clock of the servee where the changelog is currently running * while it should be calculated using the clock of the server * that originally processed the change. * * The approximation error is therefore the time difference between * * @return the approximate delay for the connected server. */ public long getApproxDelay() { long olderUpdateTime = getOlderUpdateTime(); if (olderUpdateTime == 0) return 0; long currentTime = TimeThread.getTime(); return ((currentTime - olderUpdateTime)/1000); } /** * Get the age of the older change that has not yet been replicated * to the server handled by this ServerHandler. * * @return The age if the older change has not yet been replicated * to the server handled by this ServerHandler. */ public long getOlderUpdateTime() { synchronized (msgQueue) { if (isFollowing()) { if (msgQueue.isEmpty()) return 0; UpdateMessage msg = msgQueue.first(); return msg.getChangeNumber().getTime(); } else { if (lateQueue.isEmpty()) return 0; UpdateMessage msg = lateQueue.first(); return msg.getChangeNumber().getTime(); } } } /** * Check if the LDAP server can follow the speed of the other servers. * @return true when the server has all the not yet sent changes * in its queue. */ public boolean isFollowing() { return following; } /** * Set the following flag of this server. * @param following the value that should be set. */ public void setFollowing(boolean following) { this.following = following; } /** * Add an update the list of updates that must be sent to the server * managed by this ServerHandler. * * @param update The update that must be added to the list of updates. */ public void add(UpdateMessage update) { synchronized (msgQueue) { /* * If queue was empty the writer thread was probably asleep * waiting for some changes, wake it up */ if (msgQueue.isEmpty()) msgQueue.notify(); msgQueue.add(update); /* TODO : size should be configurable * and larger than max-receive-queue-size */ while (msgQueue.size() > 10000) { following = false; msgQueue.removeFirst(); } } } /** * Select the next update that must be sent to the server managed by this * ServerHandler. * * @return the next update that must be sent to the server managed by this * ServerHandler. */ public UpdateMessage take() { UpdateMessage msg; do { if (following == false) { /* this server is late with regard to some other masters * in the topology or just joined the topology. * In such cases, we can't keep all changes in the queue * without saturating the memory, we therefore use * a lateQueue that is filled with a few changes from the changelogDB * If this server is able to close the gap, it will start using again * the regular msgQueue later. */ if (lateQueue.isEmpty()) { /* * Start from the server State * Loop until the queue high mark or until no more changes * for each known LDAP master * get the next CSN after this last one : * - try to get next from the file * - if not found in the file * - try to get the next from the queue * select the smallest of changes * check if it is in the memory tree * yes : lock memory tree. * check all changes from the list, remove the ones that * are already sent * unlock memory tree * restart as usual * load this change on the delayList * */ ChangelogIteratorComparator comparator = new ChangelogIteratorComparator(); SortedSet iteratorSortedSet = new TreeSet(comparator); /* fill the lateQueue */ for (short serverId : changelogCache.getServers()) { ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); ChangelogIterator iterator = changelogCache.getChangelogIterator(serverId, lastCsn); if ((iterator != null) && (iterator.getChange() != null)) { iteratorSortedSet.add(iterator); } } while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100)) { ChangelogIterator iterator = iteratorSortedSet.first(); iteratorSortedSet.remove(iterator); lateQueue.add(iterator.getChange()); if (iterator.next()) iteratorSortedSet.add(iterator); else iterator.releaseCursor(); } for (ChangelogIterator iterator : iteratorSortedSet) { iterator.releaseCursor(); } /* * Check if the first change in the lateQueue is also on the regular * queue */ if (lateQueue.isEmpty()) { synchronized (msgQueue) { if (msgQueue.size() < 10000) { following = true; } } } else { msg = lateQueue.first(); synchronized (msgQueue) { if (msgQueue.contains(msg)) { /* we finally catched up with the regular queue */ following = true; lateQueue.clear(); UpdateMessage msg1; do { msg1 = msgQueue.removeFirst(); } while (!msg.getChangeNumber().equals(msg1.getChangeNumber())); this.updateServerState(msg); this.incrementOutCount(); return msg; } } } } else { /* get the next change from the lateQueue */ msg = lateQueue.removeFirst(); this.updateServerState(msg); this.incrementOutCount(); return msg; } } synchronized (msgQueue) { if (following == true) { try { while (msgQueue.isEmpty()) { msgQueue.wait(500); if (!active) return null; } } catch (InterruptedException e) { return null; } msg = msgQueue.removeFirst(); if (this.updateServerState(msg)) { /* * Only push the message if it has not yet been seen * by the other server. * Otherwise just loop to select the next message. */ this.incrementOutCount(); return msg; } } } /* * Need to loop because following flag may have gone to false between * the first check at the beginning of this method * and the second check just above. */ } while (active == true); return null; } /** * Update the serverState with the last message sent. * * @param msg the last update sent. * @return boolean indicating if the update was meaningfull. */ public boolean updateServerState(UpdateMessage msg) { return serverState.update(msg.getChangeNumber()); } /** * Stop this server handler processing. */ public void stopHandler() { active = false; synchronized (msgQueue) { /* wake up the writer thread on an empty queue so that it disappear */ msgQueue.clear(); msgQueue.notify(); msgQueue.notifyAll(); } DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); } /** * Send the ack to the server that did the original modification. * * @param changeNumber The ChangeNumber of the update that is acked. * @throws IOException In case of Exception thrown sending the ack. */ public void sendAck(ChangeNumber changeNumber) throws IOException { AckMessage ack = new AckMessage(changeNumber); session.publish(ack); outAckCount++; } /** * Do the work when an ack message has been received from another server. * * @param message The ack message that was received. * @param ackingServerId The id of the server that acked the change. */ public void ack(AckMessage message, short ackingServerId) { ChangeNumber changeNumber = message.getChangeNumber(); AckMessageList ackList; boolean completedFlag; synchronized (waitingAcks) { ackList = waitingAcks.get(changeNumber); if (ackList == null) return; ackList.addAck(ackingServerId); completedFlag = ackList.completed(); if (completedFlag) { waitingAcks.remove(changeNumber); } } if (completedFlag) { changelogCache.sendAck(changeNumber, true); } } /** * Process reception of an for an update that was received from a * Changelog Server. * * @param message the ack message that was received. * @param ackingServerId The id of the server that acked the change. */ public static void ackChangelog(AckMessage message, short ackingServerId) { ChangeNumber changeNumber = message.getChangeNumber(); ChangelogAckMessageList ackList; boolean completedFlag; synchronized (changelogsWaitingAcks) { ackList = changelogsWaitingAcks.get(changeNumber); if (ackList == null) return; ackList.addAck(ackingServerId); completedFlag = ackList.completed(); if (completedFlag) { changelogsWaitingAcks.remove(changeNumber); } } if (completedFlag) { ChangelogCache changelogCache = ackList.getChangelogCache(); changelogCache.sendAck(changeNumber, false, ackList.getChangelogServerId()); } } /** * Add an update to the list of update waiting for acks. * * @param update the update that must be added to the list * @param nbWaitedAck The number of ack that must be received before * the update is fully acked. */ public void addWaitingAck(UpdateMessage update, int nbWaitedAck) { AckMessageList ackList = new AckMessageList(update.getChangeNumber(), nbWaitedAck); synchronized(waitingAcks) { waitingAcks.put(update.getChangeNumber(), ackList); } } /** * Add an update to the list of update received from a changelog server and * waiting for acks. * * @param update The update that must be added to the list. * @param ChangelogServerId The identifier of the changelog that sent the * update. * @param changelogCache The ChangelogCache from which the change was * processed and to which the ack must later be sent. * @param nbWaitedAck The number of ack that must be received before * the update is fully acked. */ public static void addWaitingAck(UpdateMessage update, short ChangelogServerId, ChangelogCache changelogCache, int nbWaitedAck) { ChangelogAckMessageList ackList = new ChangelogAckMessageList(update.getChangeNumber(), nbWaitedAck, ChangelogServerId, changelogCache); synchronized(changelogsWaitingAcks) { changelogsWaitingAcks.put(update.getChangeNumber(), ackList); } } /** * Get the size of the list of update waiting for acks. * * @return the size of the list of update waiting for acks. */ public int getWaitingAckSize() { synchronized (waitingAcks) { return waitingAcks.size(); } } /** * Increment the count of Acks received from this server. */ public void incrementInAckCount() { inAckCount++; } /** * Check type of server handled. * * @return true if the handled server is an LDAP server. * false if the handled server is a changelog server */ public boolean isLDAPserver() { return serverIsLDAPserver; } /** * Initializes this monitor provider based on the information in the provided * configuration entry. * * @param configEntry The configuration entry that contains the information * to use to initialize this monitor provider. * * @throws ConfigException If an unrecoverable problem arises in the * process of performing the initialization. * * @throws InitializationException If a problem occurs during initialization * that is not related to the server * configuration. */ @Override public void initializeMonitorProvider(ConfigEntry configEntry) throws ConfigException,InitializationException { // Nothing to do for now } /** * Retrieves the name of this monitor provider. It should be unique among all * monitor providers, including all instances of the same monitor provider. * * @return The name of this monitor provider. */ @Override public String getMonitorInstanceName() { String str = changelogCache.getBaseDn().toString() + " " + serverURL + " " + String.valueOf(serverId); if (serverIsLDAPserver) return "LDAP Server " + str; else return "Changelog Server " + str; } /** * Retrieves the length of time in milliseconds that should elapse between * calls to the updateMonitorData() method. A negative or zero * return value indicates that the updateMonitorData() method * should not be periodically invoked. * * @return The length of time in milliseconds that should elapse between * calls to the updateMonitorData() method. */ @Override public long getUpdateInterval() { /* we don't wont to do polling on this monitor */ return 0; } /** * Performs any processing periodic processing that may be desired to update * the information associated with this monitor. Note that best-effort * attempts will be made to ensure that calls to this method come * getUpdateInterval() milliseconds apart, but no guarantees will * be made. */ @Override public void updateMonitorData() { // As long as getUpdateInterval() returns 0, this will never get called } /** * Retrieves a set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is requested. * * @return A set of attributes containing monitor data that should be * returned to the client if the corresponding monitor entry is * requested. */ @Override public ArrayList getMonitorData() { ArrayList attributes = new ArrayList(); if (serverIsLDAPserver) attributes.add(new Attribute("LDAP-Server", serverURL)); else attributes.add(new Attribute("Changelog-Server", serverURL)); attributes.add(new Attribute("server-id", String.valueOf(serverId))); attributes.add(new Attribute("base-dn", changelogCache.getBaseDn().toString())); attributes.add(new Attribute("waiting-changes", String.valueOf(getRcvMsgQueueSize()))); attributes.add(new Attribute("update-waiting-acks", String.valueOf(getWaitingAckSize()))); attributes.add(new Attribute("update-sent", String.valueOf(getOutCount()))); attributes.add(new Attribute("update-received", String.valueOf(getInCount()))); attributes.add(new Attribute("ack-sent", String.valueOf(getOutAckCount()))); attributes.add(new Attribute("ack-received", String.valueOf(getInAckCount()))); attributes.add(new Attribute("approximate-delay", String.valueOf(getApproxDelay()))); long olderUpdateTime = getOlderUpdateTime(); if (olderUpdateTime != 0) { Date date = new Date(getOlderUpdateTime()); attributes.add(new Attribute("older-change-not-synchronized", String.valueOf(date.toString()))); } /* get the Server State */ final String ATTR_SERVER_STATE = "server-state"; AttributeType type = DirectoryServer.getDefaultAttributeType(ATTR_SERVER_STATE); LinkedHashSet values = new LinkedHashSet(); for (String str : serverState.toStringSet()) { values.add(new AttributeValue(type,str)); } Attribute attr = new Attribute(type, ATTR_SERVER_STATE, values); attributes.add(attr); return attributes; } /** * Shutdown This ServerHandler. */ public void shutdown() { try { session.close(); } catch (IOException e) { // Service is closing. } stopHandler(); } /** * {@inheritDoc} */ @Override public String toString() { String localString; if (serverId != 0) { if (serverIsLDAPserver) localString = "Directory Server "; else localString = "Changelog Server "; localString += serverId + " " + serverURL + " " + baseDn; } else localString = "Unknown server"; return localString; } }