| | |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedHashSet; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.Iterator; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.ChangeNumber; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.MonitorMsg; |
| | | import org.opends.server.replication.protocol.MonitorRequestMsg; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.AttributeBuilder; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.util.TimeThread; |
| | | import com.sleepycat.je.DatabaseException; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import org.opends.server.replication.common.AssuredMode; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.ProtocolVersion; |
| | | |
| | | /** |
| | | * This class define an in-memory cache that will be used to store |
| | |
| | | * to this replication server. |
| | | * |
| | | */ |
| | | private final Map<Short, ServerHandler> directoryServers = |
| | | new ConcurrentHashMap<Short, ServerHandler>(); |
| | | private final Map<Short, DataServerHandler> directoryServers = |
| | | new ConcurrentHashMap<Short, DataServerHandler>(); |
| | | |
| | | /* |
| | | * This map contains one ServerHandler for each replication servers |
| | |
| | | * We add new TreeSet in the HashMap when a new replication server register |
| | | * to this replication server. |
| | | */ |
| | | private final Map<Short, ServerHandler> replicationServers = |
| | | new ConcurrentHashMap<Short, ServerHandler>(); |
| | | private final Map<Short, ReplicationServerHandler> replicationServers = |
| | | new ConcurrentHashMap<Short, ReplicationServerHandler>(); |
| | | |
| | | private final ConcurrentLinkedQueue<MessageHandler> otherHandlers = |
| | | new ConcurrentLinkedQueue<MessageHandler>(); |
| | | |
| | | /* |
| | | * This map contains the List of updates received from each |
| | |
| | | new ConcurrentHashMap<Short, DbHandler>(); |
| | | private ReplicationServer replicationServer; |
| | | |
| | | /* GenerationId management */ |
| | | // GenerationId management |
| | | private long generationId = -1; |
| | | private boolean generationIdSavedStatus = false; |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | |
| | | // The tracer object for the debug logger. |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /* Monitor data management */ |
| | | |
| | | // Monitor data management |
| | | /** |
| | | * The monitor data consolidated over the topology. |
| | | */ |
| | |
| | | /* |
| | | * Push the message to the replication servers |
| | | */ |
| | | if (sourceHandler.isLDAPserver()) |
| | | if (sourceHandler.isDataServer()) |
| | | { |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | /** |
| | | * Ignore updates to RS with bad gen id |
| | |
| | | /* |
| | | * Push the message to the LDAP servers |
| | | */ |
| | | for (ServerHandler handler : directoryServers.values()) |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | { |
| | | // Don't forward the change to the server that just sent it |
| | | if (handler == sourceHandler) |
| | |
| | | handler.add(update, sourceHandler); |
| | | } |
| | | } |
| | | |
| | | // Push the message to the other subscribing handlers |
| | | Iterator<MessageHandler> otherIter = otherHandlers.iterator(); |
| | | while (otherIter.hasNext()) |
| | | { |
| | | MessageHandler handler = otherIter.next(); |
| | | handler.add(update, sourceHandler); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | if (sourceGroupId == groupId) |
| | | // Assured feature does not cross different group ids |
| | | { |
| | | if (sourceHandler.isLDAPserver()) |
| | | if (sourceHandler.isDataServer()) |
| | | { |
| | | // Look for RS eligible for assured |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId) |
| | | // No ack expected from a RS with different group id |
| | |
| | | } |
| | | |
| | | // Look for DS eligible for assured |
| | | for (ServerHandler handler : directoryServers.values()) |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | { |
| | | // Don't forward the change to the server that just sent it |
| | | if (handler == sourceHandler) |
| | |
| | | (generationId == sourceHandler.getGenerationId())) |
| | | // Ignore assured updates from wrong generationId servers |
| | | { |
| | | if (sourceHandler.isLDAPserver()) |
| | | if (sourceHandler.isDataServer()) |
| | | { |
| | | if (safeDataLevel == (byte) 1) |
| | | { |
| | |
| | | List<Short> expectedServers = new ArrayList<Short>(); |
| | | if (interestedInAcks) |
| | | { |
| | | if (sourceHandler.isLDAPserver()) |
| | | if (sourceHandler.isDataServer()) |
| | | { |
| | | // Look for RS eligible for assured |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId) |
| | | // No ack expected from a RS with different group id |
| | |
| | | origServer.incrementAssuredSrReceivedUpdatesTimeout(); |
| | | } else |
| | | { |
| | | if (origServer.isLDAPserver()) |
| | | if (origServer.isDataServer()) |
| | | { |
| | | origServer.incrementAssuredSdReceivedUpdatesTimeout(); |
| | | } |
| | |
| | | */ |
| | | public void stopReplicationServers(Collection<String> replServers) |
| | | { |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (replServers.contains(handler.getServerAddressURL())) |
| | | stopServer(handler); |
| | |
| | | public void stopAllServers() |
| | | { |
| | | // Close session with other replication servers |
| | | for (ServerHandler serverHandler : replicationServers.values()) |
| | | for (ReplicationServerHandler serverHandler : replicationServers.values()) |
| | | { |
| | | stopServer(serverHandler); |
| | | } |
| | | |
| | | // Close session with other LDAP servers |
| | | for (ServerHandler serverHandler : directoryServers.values()) |
| | | for (DataServerHandler serverHandler : directoryServers.values()) |
| | | { |
| | | stopServer(serverHandler); |
| | | } |
| | |
| | | * @param handler the DS we want to check |
| | | * @return true if this is not a duplicate server |
| | | */ |
| | | public boolean checkForDuplicateDS(ServerHandler handler) |
| | | public boolean checkForDuplicateDS(DataServerHandler handler) |
| | | { |
| | | ServerHandler oldHandler = directoryServers.get(handler.getServerId()); |
| | | DataServerHandler oldHandler = directoryServers.get(handler.getServerId()); |
| | | |
| | | if (directoryServers.containsKey(handler.getServerId())) |
| | | { |
| | | // looks like two LDAP servers have the same serverId |
| | | // log an error message and drop this connection. |
| | | Message message = ERR_DUPLICATE_SERVER_ID.get( |
| | | replicationServer.getMonitorInstanceName(), oldHandler.toString(), |
| | | handler.toString(), handler.getServerId()); |
| | |
| | | */ |
| | | public void stopServer(ServerHandler handler) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " domain=" + this + |
| | | " stopServer(SH)" + handler.getMonitorInstanceName() + |
| | | " " + stackTraceToSingleLineString(new Exception())); |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | | * instance this code is called from dying ServerReader but also dying |
| | |
| | | * or not (is already being processed or not). |
| | | */ |
| | | if (!handler.engageShutdown()) |
| | | // Only do this once (prevent other thread to enter here again) |
| | | // Only do this once (prevent other thread to enter here again) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " stopServer " + handler.getMonitorInstanceName()); |
| | | try |
| | | { |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() |
| | | // method of ServerHandler) |
| | | lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Try doing job anyway... |
| | | } |
| | | |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | if (replicationServers.containsValue(handler)) |
| | | { |
| | | unregisterServerHandler(handler); |
| | | handler.shutdown(); |
| | | |
| | | // Check if generation id has to be resetted |
| | | mayResetGenerationId(); |
| | | // Warn our DSs that a RS or DS has quit (does not use this |
| | | // handler as already removed from list) |
| | | buildAndSendTopoInfoToDSs(null); |
| | | } |
| | | } else |
| | | { |
| | | if (directoryServers.containsValue(handler)) |
| | | { |
| | | // If this is the last DS for the domain, |
| | | // shutdown the status analyzer |
| | | if (directoryServers.size() == 1) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | replicationServer.getMonitorInstanceName() + |
| | | " remote server " + handler.getMonitorInstanceName() + |
| | | " is the last DS to be stopped: stopping status analyzer"); |
| | | stopStatusAnalyzer(); |
| | | } |
| | | |
| | | unregisterServerHandler(handler); |
| | | handler.shutdown(); |
| | | |
| | | // Check if generation id has to be resetted |
| | | mayResetGenerationId(); |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | buildAndSendTopoInfoToRSs(); |
| | | // Warn our DSs that a RS or DS has quit (does not use this |
| | | // handler as already removed from list) |
| | | buildAndSendTopoInfoToDSs(null); |
| | | } |
| | | else if (otherHandlers.contains(handler)) |
| | | { |
| | | unRegisterHandler(handler); |
| | | handler.shutdown(); |
| | | } |
| | | } |
| | | |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | release(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stop the handler. |
| | | * @param handler The handler to stop. |
| | | */ |
| | | public void stopServer(MessageHandler handler) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " domain=" + this + |
| | | " stopServer(MH)" + handler.getMonitorInstanceName() + |
| | | " " + stackTraceToSingleLineString(new Exception())); |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | | * instance this code is called from dying ServerReader but also dying |
| | | * ServerWriter at the same time, or from a thread that wants to shut down |
| | | * the handler. So use a thread safe flag to know if the job must be done |
| | | * or not (is already being processed or not). |
| | | */ |
| | | if (!handler.engageShutdown()) |
| | | // Only do this once (prevent other thread to enter here again) |
| | | { |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() |
| | |
| | | { |
| | | // Try doing job anyway... |
| | | } |
| | | |
| | | if (handler.isReplicationServer()) |
| | | if (otherHandlers.contains(handler)) |
| | | { |
| | | if (replicationServers.containsValue(handler)) |
| | | { |
| | | replicationServers.remove(handler.getServerId()); |
| | | handler.shutdown(); |
| | | |
| | | // Check if generation id has to be resetted |
| | | mayResetGenerationId(); |
| | | // Warn our DSs that a RS or DS has quit (does not use this |
| | | // handler as already removed from list) |
| | | sendTopoInfoToDSs(null); |
| | | } |
| | | } else |
| | | { |
| | | if (directoryServers.containsValue(handler)) |
| | | { |
| | | // If this is the last DS for the domain, shutdown the status analyzer |
| | | if (directoryServers.size() == 1) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | replicationServer.getMonitorInstanceName() + |
| | | " remote server " + handler.getMonitorInstanceName() + |
| | | " is the last DS to be stopped: stopping status analyzer"); |
| | | stopStatusAnalyzer(); |
| | | } |
| | | |
| | | directoryServers.remove(handler.getServerId()); |
| | | handler.shutdown(); |
| | | |
| | | // Check if generation id has to be resetted |
| | | mayResetGenerationId(); |
| | | // Update the remote replication servers with our list |
| | | // of connected LDAP servers |
| | | sendTopoInfoToRSs(); |
| | | // Warn our DSs that a RS or DS has quit (does not use this |
| | | // handler as already removed from list) |
| | | sendTopoInfoToDSs(null); |
| | | } |
| | | unRegisterHandler(handler); |
| | | handler.shutdown(); |
| | | } |
| | | } |
| | | release(); |
| | | } |
| | | |
| | | release(); |
| | | /** |
| | | * Unregister this handler from the list of handlers registered to this |
| | | * domain. |
| | | * @param handler the provided handler to unregister. |
| | | */ |
| | | protected void unregisterServerHandler(ServerHandler handler) |
| | | { |
| | | if (handler.isReplicationServer()) |
| | | { |
| | | replicationServers.remove(handler.getServerId()); |
| | | } |
| | | else |
| | | { |
| | | directoryServers.remove(handler.getServerId()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Resets the generationId for this domain if there is no LDAP |
| | | * server currently connected and if the generationId has never |
| | | * been saved. |
| | | * This method resets the generationId for this domain if there is no LDAP |
| | | * server currently connected in the whole topology on this domain and |
| | | * if the generationId has never been saved. |
| | | * |
| | | * - test emtpyness of directoryServers list |
| | | * - traverse replicationServers list and test for each if DS are connected |
| | | * So it strongly relies on the directoryServers list |
| | | */ |
| | | protected void mayResetGenerationId() |
| | | { |
| | |
| | | boolean lDAPServersConnectedInTheTopology = false; |
| | | if (directoryServers.isEmpty()) |
| | | { |
| | | for (ServerHandler rsh : replicationServers.values()) |
| | | for (ReplicationServerHandler rsh : replicationServers.values()) |
| | | { |
| | | if (generationId != rsh.getGenerationId()) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Checks that a RS is not already connected. |
| | | * |
| | | * @param handler the RS we want to check |
| | | * @return true if this is not a duplicate server |
| | | * Checks that a remote RS is not already connected to this hosting RS. |
| | | * @param handler The handler for the remote RS. |
| | | * @return flag specifying whether the remote RS is already connected. |
| | | * @throws DirectoryException when a problem occurs. |
| | | */ |
| | | public boolean checkForDuplicateRS(ServerHandler handler) |
| | | public boolean checkForDuplicateRS(ReplicationServerHandler handler) |
| | | throws DirectoryException |
| | | { |
| | | ServerHandler oldHandler = replicationServers.get(handler.getServerId()); |
| | | ReplicationServerHandler oldHandler = |
| | | replicationServers.get(handler.getServerId()); |
| | | if ((oldHandler != null)) |
| | | { |
| | | if (oldHandler.getServerAddressURL().equals( |
| | |
| | | // have been sent at about the same time and 2 connections |
| | | // have been established. |
| | | // Silently drop this connection. |
| | | } else |
| | | return false; |
| | | } |
| | | else |
| | | { |
| | | // looks like two replication servers have the same serverId |
| | | // log an error message and drop this connection. |
| | |
| | | replicationServer.getMonitorInstanceName(), oldHandler. |
| | | getServerAddressURL(), handler.getServerAddressURL(), |
| | | handler.getServerId()); |
| | | logError(message); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | |
| | | { |
| | | LinkedHashSet<String> mySet = new LinkedHashSet<String>(); |
| | | |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | mySet.add(handler.getServerAddressURL()); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return a Set containing the servers known by this replicationServer. |
| | | * Return a set containing the server that produced update and known by |
| | | * this replicationServer from all over the topology, |
| | | * whatever directly connected of connected to another RS. |
| | | * @return a set containing the servers known by this replicationServer. |
| | | */ |
| | | public Set<Short> getServers() |
| | |
| | | { |
| | | List<String> mySet = new ArrayList<String>(0); |
| | | |
| | | for (ServerHandler handler : directoryServers.values()) |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | { |
| | | mySet.add(String.valueOf(handler.getServerId())); |
| | | } |
| | |
| | | { |
| | | // Send to all replication servers with a least one remote |
| | | // server connected |
| | | for (ServerHandler rsh : replicationServers.values()) |
| | | for (ReplicationServerHandler rsh : replicationServers.values()) |
| | | { |
| | | if (rsh.hasRemoteLDAPServers()) |
| | | { |
| | |
| | | } |
| | | |
| | | // Sends to all connected LDAP servers |
| | | for (ServerHandler destinationHandler : directoryServers.values()) |
| | | for (DataServerHandler destinationHandler : directoryServers.values()) |
| | | { |
| | | // Don't loop on the sender |
| | | if (destinationHandler == senderHandler) |
| | |
| | | } else |
| | | { |
| | | // Destination is one server |
| | | ServerHandler destinationHandler = |
| | | DataServerHandler destinationHandler = |
| | | directoryServers.get(msg.getDestination()); |
| | | if (destinationHandler != null) |
| | | { |
| | |
| | | // the targeted server is NOT connected |
| | | // Let's search for THE changelog server that MAY |
| | | // have the targeted server connected. |
| | | if (senderHandler.isLDAPserver()) |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | for (ServerHandler h : replicationServers.values()) |
| | | for (ReplicationServerHandler h : replicationServers.values()) |
| | | { |
| | | // Send to all replication servers with a least one remote |
| | | // server connected |
| | |
| | | // build the full list of all servers in the topology |
| | | // and send back a MonitorMsg with the full list of all the servers |
| | | // in the topology. |
| | | if (senderHandler.isLDAPserver()) |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | MonitorMsg returnMsg = |
| | | new MonitorMsg(msg.getDestination(), msg.getsenderID()); |
| | |
| | | // from the states stored in the serverHandler. |
| | | // - the server state |
| | | // - the older missing change |
| | | for (ServerHandler lsh : this.directoryServers.values()) |
| | | for (DataServerHandler lsh : this.directoryServers.values()) |
| | | { |
| | | monitorMsg.setServerState( |
| | | lsh.getServerId(), |
| | |
| | | } |
| | | |
| | | // Same for the connected RS |
| | | for (ServerHandler rsh : this.replicationServers.values()) |
| | | for (ReplicationServerHandler rsh : this.replicationServers.values()) |
| | | { |
| | | monitorMsg.setServerState( |
| | | rsh.getServerId(), |
| | |
| | | */ |
| | | public void checkAllSaturation() throws IOException |
| | | { |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | handler.checkWindow(); |
| | | } |
| | | |
| | | for (ServerHandler handler : directoryServers.values()) |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | { |
| | | handler.checkWindow(); |
| | | } |
| | |
| | | * @return true if the server can restart sending changes. |
| | | * false if the server can't restart sending changes. |
| | | */ |
| | | public boolean restartAfterSaturation(ServerHandler sourceHandler) |
| | | public boolean restartAfterSaturation(MessageHandler sourceHandler) |
| | | { |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | for (MessageHandler handler : replicationServers.values()) |
| | | { |
| | | if (!handler.restartAfterSaturation(sourceHandler)) |
| | | return false; |
| | | } |
| | | |
| | | for (ServerHandler handler : directoryServers.values()) |
| | | for (MessageHandler handler : directoryServers.values()) |
| | | { |
| | | if (!handler.restartAfterSaturation(sourceHandler)) |
| | | return false; |
| | |
| | | * @param notThisOne If not null, the topology message will not be sent to |
| | | * this passed server. |
| | | */ |
| | | public void sendTopoInfoToDSs(ServerHandler notThisOne) |
| | | public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne) |
| | | { |
| | | for (ServerHandler handler : directoryServers.values()) |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | { |
| | | if ((notThisOne == null) || // All DSs requested |
| | | ((notThisOne != null) && (handler != notThisOne))) |
| | |
| | | * Send a TopologyMsg to all the connected replication servers |
| | | * in order to let them know our connected LDAP servers. |
| | | */ |
| | | public void sendTopoInfoToRSs() |
| | | public void buildAndSendTopoInfoToRSs() |
| | | { |
| | | TopologyMsg topoMsg = createTopologyMsgForRS(); |
| | | for (ServerHandler handler : replicationServers.values()) |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | try |
| | | { |
| | |
| | | List<DSInfo> dsInfos = new ArrayList<DSInfo>(); |
| | | |
| | | // Go through every DSs |
| | | for (ServerHandler serverHandler : directoryServers.values()) |
| | | for (DataServerHandler serverHandler : directoryServers.values()) |
| | | { |
| | | dsInfos.add(serverHandler.toDSInfo()); |
| | | } |
| | |
| | | List<RSInfo> rsInfos = new ArrayList<RSInfo>(); |
| | | |
| | | // Go through every DSs (except recipient of msg) |
| | | for (ServerHandler serverHandler : directoryServers.values()) |
| | | for (DataServerHandler serverHandler : directoryServers.values()) |
| | | { |
| | | if (serverHandler.getServerId() == destDsId) |
| | | continue; |
| | |
| | | |
| | | // Go through every peer RSs (and get their connected DSs), also add info |
| | | // for RSs |
| | | for (ServerHandler serverHandler : replicationServers.values()) |
| | | for (ReplicationServerHandler serverHandler : replicationServers.values()) |
| | | { |
| | | // Put RS info |
| | | rsInfos.add(serverHandler.toRSInfo()); |
| | |
| | | synchronized public long setGenerationId(long generationId, |
| | | boolean savedStatus) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " RCache.set GenerationId=" + generationId); |
| | | |
| | | long oldGenerationId = this.generationId; |
| | | |
| | | if (this.generationId != generationId) |
| | |
| | | // After we'll have sent the message , the remote RS will adopt |
| | | // the new genId |
| | | rsHandler.setGenerationId(newGenId); |
| | | if (senderHandler.isLDAPserver()) |
| | | if (senderHandler.isDataServer()) |
| | | { |
| | | rsHandler.forwardGenerationIdToRS(genIdMsg); |
| | | } |
| | |
| | | |
| | | // Change status of the connected DSs according to the requested new |
| | | // reference generation id |
| | | for (ServerHandler dsHandler : directoryServers.values()) |
| | | for (DataServerHandler dsHandler : directoryServers.values()) |
| | | { |
| | | try |
| | | { |
| | |
| | | // (consecutive to reset gen id message), we prefer advertising once for |
| | | // all after changes (less packet sent), here at the end of the reset msg |
| | | // treatment. |
| | | sendTopoInfoToDSs(null); |
| | | sendTopoInfoToRSs(); |
| | | buildAndSendTopoInfoToDSs(null); |
| | | buildAndSendTopoInfoToRSs(); |
| | | |
| | | Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(), |
| | | Long.toString(newGenId)); |
| | |
| | | * that changed his status. |
| | | * @param csMsg The message containing the new status |
| | | */ |
| | | public void processNewStatus(ServerHandler senderHandler, |
| | | public void processNewStatus(DataServerHandler senderHandler, |
| | | ChangeStatusMsg csMsg) |
| | | { |
| | | if (debugEnabled()) |
| | |
| | | } |
| | | |
| | | // Update every peers (RS/DS) with topology changes |
| | | sendTopoInfoToDSs(senderHandler); |
| | | sendTopoInfoToRSs(); |
| | | buildAndSendTopoInfoToDSs(senderHandler); |
| | | buildAndSendTopoInfoToRSs(); |
| | | |
| | | Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get( |
| | | Short.toString(senderHandler.getServerId()), |
| | |
| | | * @param event The event to be used for new status computation |
| | | * @return True if we have been interrupted (must stop), false otherwise |
| | | */ |
| | | public boolean changeStatusFromStatusAnalyzer(ServerHandler serverHandler, |
| | | public boolean changeStatusFromStatusAnalyzer(DataServerHandler serverHandler, |
| | | StatusMachineEvent event) |
| | | { |
| | | try |
| | |
| | | } |
| | | |
| | | // Update every peers (RS/DS) with topology changes |
| | | sendTopoInfoToDSs(serverHandler); |
| | | sendTopoInfoToRSs(); |
| | | buildAndSendTopoInfoToDSs(serverHandler); |
| | | buildAndSendTopoInfoToRSs(); |
| | | |
| | | release(); |
| | | return false; |
| | |
| | | * @param allowResetGenId True for allowing to reset the generation id ( |
| | | * when called after initial handshake) |
| | | * @throws IOException If an error occurred. |
| | | * @throws DirectoryException If an error occurred. |
| | | */ |
| | | public void receiveTopoInfoFromRS(TopologyMsg topoMsg, ServerHandler handler, |
| | | public void receiveTopoInfoFromRS(TopologyMsg topoMsg, |
| | | ReplicationServerHandler handler, |
| | | boolean allowResetGenId) |
| | | throws IOException |
| | | throws IOException, DirectoryException |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | | if (!hasLock()) |
| | | lock(); |
| | | } catch (InterruptedException ex) |
| | | { |
| | | // Try doing job anyway... |
| | |
| | | * Sends the currently known topology information to every connected |
| | | * DS we have. |
| | | */ |
| | | sendTopoInfoToDSs(null); |
| | | buildAndSendTopoInfoToDSs(null); |
| | | |
| | | release(); |
| | | } |
| | |
| | | { |
| | | // this is the latency of the remote RSi regarding another RSj |
| | | // let's update the latency of the LSes connected to RSj |
| | | ServerHandler rsjHdr = replicationServers.get(rsid); |
| | | ReplicationServerHandler rsjHdr = replicationServers.get(rsid); |
| | | if (rsjHdr != null) |
| | | { |
| | | for (short remotelsid : rsjHdr.getConnectedDirectoryServerIds()) |
| | |
| | | * Get the map of connected DSs. |
| | | * @return The map of connected DSs |
| | | */ |
| | | public Map<Short, ServerHandler> getConnectedDSs() |
| | | public Map<Short, DataServerHandler> getConnectedDSs() |
| | | { |
| | | return directoryServers; |
| | | } |
| | |
| | | * Get the map of connected RSs. |
| | | * @return The map of connected RSs |
| | | */ |
| | | public Map<Short, ServerHandler> getConnectedRSs() |
| | | public Map<Short, ReplicationServerHandler> getConnectedRSs() |
| | | { |
| | | return replicationServers; |
| | | } |
| | |
| | | |
| | | return attributes; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Register in the domain an handler that subscribes to changes. |
| | | * @param handler the provided subscribing handler. |
| | | */ |
| | | public void registerHandler(MessageHandler handler) |
| | | { |
| | | this.otherHandlers.add(handler); |
| | | } |
| | | |
| | | /** |
| | | * Unregister from the domain an handler. |
| | | * @param handler the provided unsubscribing handler. |
| | | * @return Whether this handler has been unregistered with success. |
| | | */ |
| | | public boolean unRegisterHandler(MessageHandler handler) |
| | | { |
| | | return this.otherHandlers.remove(handler); |
| | | } |
| | | |
| | | /** |
| | | * Return the state that contain for each server the time of eligibility. |
| | | * @return the state. |
| | | */ |
| | | public ServerState getHeartbeatState() |
| | | { |
| | | // TODO:ECL Eligility must be supported |
| | | return this.getDbServerState(); |
| | | } |
| | | /** |
| | | * Computes the change number eligible to the ECL. |
| | | * @return null if the domain does not play in eligibility. |
| | | */ |
| | | public ChangeNumber computeEligibleCN() |
| | | { |
| | | ChangeNumber elligibleCN = null; |
| | | ServerState heartbeatState = getHeartbeatState(); |
| | | |
| | | if (heartbeatState==null) |
| | | return null; |
| | | |
| | | // compute elligible CN |
| | | ServerState hbState = heartbeatState.duplicate(); |
| | | |
| | | Iterator<Short> it = hbState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | short sid = it.next(); |
| | | ChangeNumber storedCN = hbState.getMaxChangeNumber(sid); |
| | | |
| | | // If the most recent UpdateMsg or CLHeartbeatMsg received is very old |
| | | // then the server is considered down and not considered for eligibility |
| | | if (TimeThread.getTime()-storedCN.getTime()>2000) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "For RSD." + this.baseDn + " Server " + sid |
| | | + " is not considered for eligibility ... potentially down"); |
| | | continue; |
| | | } |
| | | |
| | | if ((elligibleCN == null) || (storedCN.older(elligibleCN))) |
| | | { |
| | | elligibleCN = storedCN; |
| | | } |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "For RSD." + this.baseDn + " ElligibleCN()=" + elligibleCN); |
| | | return elligibleCN; |
| | | } |
| | | |
| | | /** |
| | | * Computes the eligible server state by minimizing the dbServerState and the |
| | | * elligibleCN. |
| | | * @return The computed eligible server state. |
| | | */ |
| | | public ServerState getCLElligibleState() |
| | | { |
| | | // ChangeNumber elligibleCN = computeEligibleCN(); |
| | | ServerState res = new ServerState(); |
| | | ServerState dbState = this.getDbServerState(); |
| | | res = dbState; |
| | | |
| | | /* TODO:ECL Eligibility is not yet implemented |
| | | Iterator<Short> it = dbState.iterator(); |
| | | while (it.hasNext()) |
| | | { |
| | | Short sid = it.next(); |
| | | DbHandler h = sourceDbHandlers.get(sid); |
| | | ChangeNumber dbCN = dbState.getMaxChangeNumber(sid); |
| | | try |
| | | { |
| | | if ((elligibleCN!=null)&&(elligibleCN.older(dbCN))) |
| | | { |
| | | // some CN exist in the db newer than elligible CN |
| | | ReplicationIterator ri = h.generateIterator(elligibleCN); |
| | | ChangeNumber newCN = ri.getCurrentCN(); |
| | | res.update(newCN); |
| | | ri.releaseCursor(); |
| | | } |
| | | else |
| | | { |
| | | // no CN exist in the db newer than elligible CN |
| | | res.update(dbCN); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | */ |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + this.getName() |
| | | + " getCLElligibleState returns:" + res); |
| | | return res; |
| | | } |
| | | |
| | | /** |
| | | * Returns the start state of the domain, made of the first (oldest) |
| | | * change stored for each serverId. |
| | | * @return t start state of the domain. |
| | | */ |
| | | public ServerState getStartState() |
| | | { |
| | | ServerState domainStartState = new ServerState(); |
| | | Iterator<Short> it = this.getDbServerState().iterator(); |
| | | for (DbHandler dbHandler : sourceDbHandlers.values()) |
| | | { |
| | | domainStartState.update(dbHandler.getFirstChange()); |
| | | } |
| | | return domainStartState; |
| | | } |
| | | } |