| | |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.IOException; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | |
| | | public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | private final String baseDn; |
| | | // The Status analyzer that periodically verifies if the connected DSs are |
| | | // late or not |
| | | /** |
| | | * The Status analyzer that periodically verifies whether the connected DSs |
| | | * are late. |
| | | */ |
| | | private StatusAnalyzer statusAnalyzer = null; |
| | | |
| | | // The monitoring publisher that periodically sends monitoring messages to the |
| | | // topology |
| | | /** |
| | | * The monitoring publisher that periodically sends monitoring messages to the |
| | | * topology. |
| | | */ |
| | | private MonitoringPublisher monitoringPublisher = null; |
| | | |
| | | /* |
| | | * The following map contains one balanced tree for each replica ID |
| | | * to which we are currently publishing |
| | | * the first update in the balanced tree is the next change that we |
| | | * must push to this particular server |
| | | * |
| | | * We add new TreeSet in the HashMap when a new server register |
| | | * to this replication server. |
| | | * |
| | | /** |
| | | * The following map contains one balanced tree for each replica ID to which |
| | | * we are currently publishing the first update in the balanced tree is the |
| | | * next change that we must push to this particular server. |
| | | * <p> |
| | | * We add new TreeSet in the HashMap when a new server register to this |
| | | * replication server. |
| | | */ |
| | | private final Map<Integer, DataServerHandler> directoryServers = |
| | | new ConcurrentHashMap<Integer, DataServerHandler>(); |
| | | |
| | | /* |
| | | * This map contains one ServerHandler for each replication servers |
| | | * with which we are connected (so normally all the replication servers) |
| | | * the first update in the balanced tree is the next change that we |
| | | * must push to this particular server |
| | | * |
| | | * We add new TreeSet in the HashMap when a new replication server register |
| | | * to this replication server. |
| | | /** |
| | | * This map contains one ServerHandler for each replication servers with which |
| | | * we are connected (so normally all the replication servers) the first update |
| | | * in the balanced tree is the next change that we must push to this |
| | | * particular server. |
| | | * <p> |
| | | * We add new TreeSet in the HashMap when a new replication server register to |
| | | * this replication server. |
| | | */ |
| | | private final Map<Integer, ReplicationServerHandler> replicationServers = |
| | | new ConcurrentHashMap<Integer, ReplicationServerHandler>(); |
| | | |
| | | private final ConcurrentLinkedQueue<MessageHandler> otherHandlers = |
| | | private final Queue<MessageHandler> otherHandlers = |
| | | new ConcurrentLinkedQueue<MessageHandler>(); |
| | | |
| | | /* |
| | | * This map contains the List of updates received from each |
| | | * LDAP server |
| | | /** |
| | | * This map contains the List of updates received from each LDAP server. |
| | | */ |
| | | private final Map<Integer, DbHandler> sourceDbHandlers = |
| | | new ConcurrentHashMap<Integer, DbHandler>(); |
| | | private ReplicationServer replicationServer; |
| | | |
| | | // GenerationId management |
| | | /** GenerationId management. */ |
| | | private volatile long generationId = -1; |
| | | private boolean generationIdSavedStatus = false; |
| | | |
| | | // The tracer object for the debug logger. |
| | | /** The tracer object for the debug logger. */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | // Monitor data management |
| | |
| | | */ |
| | | private volatile MonitorData monitorData = new MonitorData(); |
| | | |
| | | // This lock guards against multiple concurrent monitor data recalculation. |
| | | /** |
| | | * This lock guards against multiple concurrent monitor data recalculation. |
| | | */ |
| | | private final Object pendingMonitorLock = new Object(); |
| | | |
| | | // Guarded by pendingMonitorLock. |
| | | /** Guarded by pendingMonitorLock. */ |
| | | private long monitorDataLastBuildDate = 0; |
| | | |
| | | // The set of replication servers which are already known to be slow to send |
| | | // monitor data. |
| | | // |
| | | // Guarded by pendingMonitorLock. |
| | | /** |
| | | * The set of replication servers which are already known to be slow to send |
| | | * monitor data. |
| | | * <p> |
| | | * Guarded by pendingMonitorLock. |
| | | */ |
| | | private final Set<Integer> monitorDataLateServers = new HashSet<Integer>(); |
| | | |
| | | // This lock serializes updates to the pending monitor data. |
| | | /** This lock serializes updates to the pending monitor data. */ |
| | | private final Object pendingMonitorDataLock = new Object(); |
| | | |
| | | // Monitor data which is currently being calculated. |
| | | // |
| | | // Guarded by pendingMonitorDataLock. |
| | | /** |
| | | * Monitor data which is currently being calculated. Guarded by |
| | | * pendingMonitorDataLock. |
| | | */ |
| | | private MonitorData pendingMonitorData; |
| | | |
| | | // A set containing the IDs of servers from which we are currently expecting |
| | | // monitor responses. When a response is received from a server we remove the |
| | | // ID from this table, and count down the latch if the ID was in the table. |
| | | // |
| | | // Guarded by pendingMonitorDataLock. |
| | | /** |
| | | * A set containing the IDs of servers from which we are currently expecting |
| | | * monitor responses. When a response is received from a server we remove the |
| | | * ID from this table, and count down the latch if the ID was in the table. |
| | | * <p> |
| | | * Guarded by pendingMonitorDataLock. |
| | | */ |
| | | private final Set<Integer> pendingMonitorDataServerIDs = |
| | | new HashSet<Integer>(); |
| | | |
| | | // This latch is non-null and is used in order to count incoming responses as |
| | | // they arrive. Since incoming response may arrive at any time, even when |
| | | // there is no pending monitor request, access to the latch must be guarded. |
| | | // |
| | | // Guarded by pendingMonitorDataLock. |
| | | /** |
| | | * This latch is non-null and is used in order to count incoming responses as |
| | | * they arrive. Since incoming response may arrive at any time, even when |
| | | * there is no pending monitor request, access to the latch must be guarded. |
| | | * <p> |
| | | * Guarded by pendingMonitorDataLock. |
| | | */ |
| | | private CountDownLatch pendingMonitorDataLatch = null; |
| | | |
| | | // TODO: Remote monitor data cache lifetime is 500ms/should be configurable |
| | | /** |
| | | * TODO: Remote monitor data cache lifetime is 500ms/should be configurable. |
| | | */ |
| | | private final long monitorDataLifeTime = 500; |
| | | |
| | | |
| | |
| | | /** |
| | | * The needed info for each received assured update message we are waiting |
| | | * acks for. |
| | | * <p> |
| | | * Key: a change number matching a received update message which requested |
| | | * assured mode usage (either safe read or safe data mode) |
| | | * <p> |
| | | * Value: The object holding every info needed about the already received acks |
| | | * as well as the acks to be received. |
| | | * For more details, see ExpectedAcksInfo and its sub classes javadoc. |
| | | * |
| | | * @see ExpectedAcksInfo For more details, see ExpectedAcksInfo and its sub |
| | | * classes javadoc. |
| | | */ |
| | | private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks = |
| | | new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>(); |
| | | |
| | | // The timer used to run the timeout code (timer tasks) for the assured update |
| | | // messages we are waiting acks for. |
| | | /** |
| | | * The timer used to run the timeout code (timer tasks) for the assured update |
| | | * messages we are waiting acks for. |
| | | */ |
| | | private Timer assuredTimeoutTimer = null; |
| | | // Counter used to purge the timer tasks references in assuredTimeoutTimer, |
| | | // every n number of treated assured messages |
| | | /** |
| | | * Counter used to purge the timer tasks references in assuredTimeoutTimer, |
| | | * every n number of treated assured messages. |
| | | */ |
| | | private int assuredTimeoutTimerPurgeCounter = 0; |
| | | |
| | | private ServerState ctHeartbeatState = null; |
| | |
| | | // Purge timer every 100 treated messages |
| | | assuredTimeoutTimerPurgeCounter++; |
| | | if ((assuredTimeoutTimerPurgeCounter % 100) == 0) |
| | | { |
| | | assuredTimeoutTimer.purge(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (expectedServers == null) |
| | | { |
| | | expectedServers = Collections.emptyList(); |
| | | } |
| | | |
| | | /** |
| | | * The update message equivalent to the originally received update message, |
| | | * but with assured flag disabled. This message is the one that should be |
| | |
| | | * Ignore updates to RS with bad gen id |
| | | * (no system managed status for a RS) |
| | | */ |
| | | if ( (generationId>0) && (generationId != handler.getGenerationId()) ) |
| | | if (isDifferentGenerationId(handler.getGenerationId())) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + "Replication Server " + |
| | | replicationServer.getReplicationPort() + " " + |
| | | baseDn + " " + replicationServer.getServerId() + |
| | | " for dn " + baseDn + ", update " + update.getChangeNumber() + |
| | | " will not be sent to replication server " + |
| | | handler.getServerId() + " with generation id " + |
| | | handler.getGenerationId() + " different from local " + |
| | | "generation id " + generationId); |
| | | { |
| | | TRACER.debugInfo("In Replication Server " |
| | | + replicationServer.getReplicationPort() + " " + baseDn + " " |
| | | + replicationServer.getServerId() + " for dn " + baseDn |
| | | + ", update " + update.getChangeNumber() |
| | | + " will not be sent to replication server " |
| | | + handler.getServerId() + " with generation id " |
| | | + handler.getGenerationId() + " different from local " |
| | | + "generation id " + generationId); |
| | | } |
| | | |
| | | continue; |
| | | } |
| | | |
| | | if (assuredMessage) |
| | | { |
| | | // Assured mode: post an assured or not assured matching update |
| | | // message according to what has been computed for the destination |
| | | // server |
| | | if ((expectedServers != null) && expectedServers.contains(handler. |
| | | getServerId())) |
| | | { |
| | | handler.add(update, sourceHandler); |
| | | } else |
| | | { |
| | | if (notAssuredUpdate == null) |
| | | { |
| | | notAssuredUpdate = new NotAssuredUpdateMsg(update); |
| | | } |
| | | handler.add(notAssuredUpdate, sourceHandler); |
| | | } |
| | | } else |
| | | { |
| | | handler.add(update, sourceHandler); |
| | | } |
| | | notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate, |
| | | assuredMessage, expectedServers); |
| | | } |
| | | } |
| | | |
| | |
| | | * allows to have better performances in normal mode (most of the time). |
| | | */ |
| | | ServerStatus dsStatus = handler.getStatus(); |
| | | if ( (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) || |
| | | (dsStatus == ServerStatus.FULL_UPDATE_STATUS) ) |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS |
| | | || dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS) |
| | | TRACER.debugInfo("In " + this + |
| | | " for dn " + baseDn + ", update " + update.getChangeNumber() + |
| | | " will not be sent to directory server " + |
| | | handler.getServerId() + " with generation id " + |
| | | handler.getGenerationId() + " different from local " + |
| | | "generation id " + generationId); |
| | | { |
| | | TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update " |
| | | + update.getChangeNumber() |
| | | + " will not be sent to directory server " |
| | | + handler.getServerId() + " with generation id " |
| | | + handler.getGenerationId() + " different from local " |
| | | + "generation id " + generationId); |
| | | } |
| | | if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) |
| | | TRACER.debugInfo("In RS " + |
| | | replicationServer.getServerId() + |
| | | " for dn " + baseDn + ", update " + update.getChangeNumber() + |
| | | " will not be sent to directory server " + handler.getServerId() + |
| | | " as it is in full update"); |
| | | { |
| | | TRACER.debugInfo("In RS " + replicationServer.getServerId() |
| | | + " for dn " + baseDn + ", update " + update.getChangeNumber() |
| | | + " will not be sent to directory server " |
| | | + handler.getServerId() + " as it is in full update"); |
| | | } |
| | | } |
| | | |
| | | continue; |
| | | } |
| | | |
| | | if (assuredMessage) |
| | | { |
| | | // Assured mode: post an assured or not assured matching update |
| | | // message according to what has been computed for the destination |
| | | // server |
| | | if ((expectedServers != null) && expectedServers.contains(handler. |
| | | getServerId())) |
| | | { |
| | | handler.add(update, sourceHandler); |
| | | } else |
| | | { |
| | | if (notAssuredUpdate == null) |
| | | { |
| | | notAssuredUpdate = new NotAssuredUpdateMsg(update); |
| | | } |
| | | handler.add(notAssuredUpdate, sourceHandler); |
| | | } |
| | | } else |
| | | { |
| | | handler.add(update, sourceHandler); |
| | | } |
| | | notAssuredUpdate = addUpdate(handler, update, notAssuredUpdate, |
| | | assuredMessage, expectedServers); |
| | | } |
| | | |
| | | // Push the message to the other subscribing handlers |
| | | for (MessageHandler handler : otherHandlers) { |
| | | handler.add(update, sourceHandler); |
| | | handler.add(update); |
| | | } |
| | | } |
| | | |
| | | private NotAssuredUpdateMsg addUpdate(ServerHandler handler, |
| | | UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate, |
| | | boolean assuredMessage, List<Integer> expectedServers) |
| | | throws UnsupportedEncodingException |
| | | { |
| | | if (assuredMessage) |
| | | { |
| | | // Assured mode: post an assured or not assured matching update |
| | | // message according to what has been computed for the destination server |
| | | if (expectedServers.contains(handler.getServerId())) |
| | | { |
| | | handler.add(update); |
| | | } |
| | | else |
| | | { |
| | | if (notAssuredUpdate == null) |
| | | { |
| | | notAssuredUpdate = new NotAssuredUpdateMsg(update); |
| | | } |
| | | handler.add(notAssuredUpdate); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | handler.add(update); |
| | | } |
| | | return notAssuredUpdate; |
| | | } |
| | | |
| | | /** |
| | | * Helper class to be the return type of a method that processes a just |
| | | * received assured update message: |
| | |
| | | * should be not null. |
| | | * Servers that are not in this list are servers not eligible for an ack |
| | | * request. |
| | | * |
| | | */ |
| | | public List<Integer> expectedServers = null; |
| | | |
| | |
| | | { |
| | | if (sourceHandler.isDataServer()) |
| | | { |
| | | // Look for RS eligible for assured |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId) |
| | | // No ack expected from a RS with different group id |
| | | { |
| | | if ((generationId > 0) && |
| | | (generationId == handler.getGenerationId())) |
| | | // No ack expected from a RS with bad gen id |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | } |
| | | collectRSsEligibleForAssuredReplication(groupId, expectedServers); |
| | | } |
| | | |
| | | // Look for DS eligible for assured |
| | |
| | | if (serverStatus == ServerStatus.NORMAL_STATUS) |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | } else |
| | | } else if (serverStatus == ServerStatus.DEGRADED_STATUS) { |
| | | // No ack expected from a DS with wrong status |
| | | { |
| | | if (serverStatus == ServerStatus.DEGRADED_STATUS) |
| | | { |
| | | wrongStatusServers.add(handler.getServerId()); |
| | | } |
| | | /** |
| | | * else |
| | | * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS: |
| | | * We do not want this to be reported as an error to the update |
| | | * maker -> no pollution or potential misunderstanding when |
| | | * reading logs or monitoring and it was just administration (for |
| | | * instance new server is being configured in topo: it goes in bad |
| | | * gen then then full full update). |
| | | */ |
| | | wrongStatusServers.add(handler.getServerId()); |
| | | } |
| | | /* |
| | | * else |
| | | * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS: |
| | | * We do not want this to be reported as an error to the update |
| | | * maker -> no pollution or potential misunderstanding when |
| | | * reading logs or monitoring and it was just administration (for |
| | | * instance new server is being configured in topo: it goes in bad |
| | | * gen then full update). |
| | | */ |
| | | } |
| | | } |
| | | } |
| | |
| | | Integer.toString(replicationServer.getServerId()), |
| | | Byte.toString(safeDataLevel), baseDn, update.toString()); |
| | | logError(errorMsg); |
| | | } else if (sourceGroupId != groupId) |
| | | } else if (sourceGroupId == groupId |
| | | // Assured feature does not cross different group IDS |
| | | && isSameGenerationId(sourceHandler.getGenerationId())) |
| | | // Ignore assured updates from wrong generationId servers |
| | | { |
| | | // Assured feature does not cross different group IDS |
| | | } else |
| | | { |
| | | if ((generationId > 0) && |
| | | (generationId == sourceHandler.getGenerationId())) |
| | | // Ignore assured updates from wrong generationId servers |
| | | { |
| | | if (sourceHandler.isDataServer()) |
| | | { |
| | | if (safeDataLevel == (byte) 1) |
| | |
| | | sourceHandler.send(new AckMsg(cn)); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | List<Integer> expectedServers = new ArrayList<Integer>(); |
| | | if (interestedInAcks && sourceHandler.isDataServer()) |
| | | { |
| | | // Look for RS eligible for assured |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId |
| | | // No ack expected from a RS with different group id |
| | | && generationId > 0 && (generationId == handler.getGenerationId())) |
| | | // No ack expected from a RS with bad gen id |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | collectRSsEligibleForAssuredReplication(groupId, expectedServers); |
| | | } |
| | | |
| | | // Return computed structures |
| | |
| | | // servers: the level is a best effort thing, we do not want to timeout |
| | | // at every assured SD update for instance if a RS has had his gen id |
| | | // reseted |
| | | byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ? |
| | | byte finalSdl = (nExpectedServers >= neededAdditionalServers) ? |
| | | (byte)sdl : // Keep level as it was |
| | | (byte)(nExpectedServers+1)); // Change level to match what's available |
| | | (byte)(nExpectedServers+1); // Change level to match what's available |
| | | preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn, |
| | | sourceHandler, finalSdl, expectedServers); |
| | | preparedAssuredInfo.expectedServers = expectedServers; |
| | |
| | | return preparedAssuredInfo; |
| | | } |
| | | |
| | | private void collectRSsEligibleForAssuredReplication(byte groupId, |
| | | List<Integer> expectedServers) |
| | | { |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (handler.getGroupId() == groupId |
| | | // No ack expected from a RS with different group id |
| | | && isSameGenerationId(handler.getGenerationId()) |
| | | // No ack expected from a RS with bad gen id |
| | | ) |
| | | { |
| | | expectedServers.add(handler.getServerId()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private boolean isSameGenerationId(long generationId) |
| | | { |
| | | return this.generationId > 0 && this.generationId == generationId; |
| | | } |
| | | |
| | | private boolean isDifferentGenerationId(long generationId) |
| | | { |
| | | return this.generationId > 0 && this.generationId != generationId; |
| | | } |
| | | |
| | | /** |
| | | * Process an ack received from a given server. |
| | | * |
| | |
| | | |
| | | /** |
| | | * Constructor for the timer task. |
| | | * @param cn The changenumber of the assured update we are waiting acks for |
| | | * @param cn The changeNumber of the assured update we are waiting acks for |
| | | */ |
| | | public AssuredTimeoutTask(ChangeNumber cn) |
| | | { |
| | |
| | | AckMsg finalAck = expectedAcksInfo.createAck(true); |
| | | ServerHandler origServer = expectedAcksInfo.getRequesterServer(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + replicationServer.getServerId() + " for " + baseDn + |
| | | ", sending timeout for assured update with change " + " number " + |
| | | cn + " to server id " + origServer.getServerId()); |
| | | { |
| | | TRACER.debugInfo("In RS " + replicationServer.getServerId() |
| | | + " for "+ baseDn |
| | | + ", sending timeout for assured update with change " |
| | | + " number " + cn + " to server id " |
| | | + origServer.getServerId()); |
| | | } |
| | | try |
| | | { |
| | | origServer.send(finalAck); |
| | |
| | | List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers(); |
| | | for (Integer serverId : serversInTimeout) |
| | | { |
| | | ServerHandler expectedServerInTimeout = |
| | | directoryServers.get(serverId); |
| | | if (expectedServerInTimeout != null) |
| | | ServerHandler expectedDSInTimeout = directoryServers.get(serverId); |
| | | ServerHandler expectedRSInTimeout = |
| | | replicationServers.get(serverId); |
| | | if (expectedDSInTimeout != null) |
| | | { |
| | | // Was a DS |
| | | if (safeRead) |
| | | { |
| | | expectedServerInTimeout.incrementAssuredSrSentUpdatesTimeout(); |
| | | expectedDSInTimeout.incrementAssuredSrSentUpdatesTimeout(); |
| | | } else |
| | | { |
| | | // No SD update sent to a DS (meaningless) |
| | | } |
| | | } else |
| | | } else if (expectedRSInTimeout != null) |
| | | { |
| | | expectedServerInTimeout = |
| | | replicationServers.get(serverId); |
| | | if (expectedServerInTimeout != null) |
| | | if (safeRead) |
| | | { |
| | | // Was a RS |
| | | if (safeRead) |
| | | { |
| | | expectedServerInTimeout. |
| | | incrementAssuredSrSentUpdatesTimeout(); |
| | | } else |
| | | { |
| | | expectedServerInTimeout. |
| | | incrementAssuredSdSentUpdatesTimeout(); |
| | | } |
| | | expectedRSInTimeout.incrementAssuredSrSentUpdatesTimeout(); |
| | | } |
| | | /* else server disappeared ? Let's forget about it. */ |
| | | else |
| | | { |
| | | expectedRSInTimeout.incrementAssuredSdSentUpdatesTimeout(); |
| | | } |
| | | } |
| | | // else server disappeared ? Let's forget about it. |
| | | } |
| | | // Mark the ack info object as completed to prevent potential |
| | | // processAck() code parallel run |
| | |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | if (replServers.contains(handler.getServerAddressURL())) |
| | | { |
| | | stopServer(handler, false); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | public void stopServer(ServerHandler handler, boolean shutdown) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " domain=" + this + " stopServer() on the server handler " + |
| | | handler.getMonitorInstanceName()); |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + " domain=" + this + " stopServer() on the server handler " |
| | | + handler.getMonitorInstanceName()); |
| | | } |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | | * instance this code is called from dying ServerReader but also dying |
| | |
| | | if ( (directoryServers.size() + replicationServers.size() )== 1) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | replicationServer.getMonitorInstanceName() + |
| | | " remote server " + handler.getMonitorInstanceName() + " is " + |
| | | "the last RS/DS to be stopped: stopping monitoring publisher"); |
| | | { |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() |
| | | + " remote server " + handler.getMonitorInstanceName() |
| | | + " is the last RS/DS to be stopped:" |
| | | + " stopping monitoring publisher"); |
| | | } |
| | | stopMonitoringPublisher(); |
| | | } |
| | | |
| | |
| | | if (directoryServers.size() == 1) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + |
| | | replicationServer.getMonitorInstanceName() + |
| | | " remote server " + handler.getMonitorInstanceName() + |
| | | " is the last DS to be stopped: stopping status analyzer"); |
| | | { |
| | | TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() |
| | | + " remote server " + handler.getMonitorInstanceName() |
| | | + " is the last DS to be stopped: stopping status analyzer"); |
| | | } |
| | | stopStatusAnalyzer(); |
| | | } |
| | | |
| | |
| | | public void stopServer(MessageHandler handler) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() |
| | | { |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + " domain=" + this + " stopServer() on the message handler " |
| | | + handler.getMonitorInstanceName()); |
| | | } |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | | * instance this code is called from dying ServerReader but also dying |
| | |
| | | * server currently connected in the whole topology on this domain and |
| | | * if the generationId has never been saved. |
| | | * |
| | | * - test emtpyness of directoryServers list |
| | | * - test emptiness of directoryServers list |
| | | * - traverse replicationServers list and test for each if DS are connected |
| | | * So it strongly relies on the directoryServers list |
| | | */ |
| | | private void mayResetGenerationId() |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " mayResetGenerationId generationIdSavedStatus=" + |
| | | generationIdSavedStatus); |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.replicationServer.getMonitorInstanceName() + " for " + baseDn |
| | | + " " + " mayResetGenerationId generationIdSavedStatus=" |
| | | + generationIdSavedStatus); |
| | | } |
| | | |
| | | // If there is no more any LDAP server connected to this domain in the |
| | | // topology and the generationId has never been saved, then we can reset |
| | |
| | | if (generationId != rsh.getGenerationId()) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() + |
| | | " that has different genId"); |
| | | } else |
| | | { |
| | | if (rsh.hasRemoteLDAPServers()) |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.replicationServer.getMonitorInstanceName() + " for " |
| | | + baseDn + " " + " mayResetGenerationId skip RS" |
| | | + rsh.getMonitorInstanceName() + " that has different genId"); |
| | | } |
| | | } else if (rsh.hasRemoteLDAPServers()) |
| | | { |
| | | lDAPServersConnectedInTheTopology = true; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " mayResetGenerationId RS" + rsh.getMonitorInstanceName() + |
| | | " has servers connected to it - will not reset generationId"); |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.replicationServer.getMonitorInstanceName() |
| | | + " for "+ baseDn + " mayResetGenerationId RS" |
| | | + rsh.getMonitorInstanceName() |
| | | + " has servers connected to it" |
| | | + " - will not reset generationId"); |
| | | } |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | } else |
| | | { |
| | | lDAPServersConnectedInTheTopology = true; |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In RS " + this.replicationServer.getMonitorInstanceName() + |
| | | " for " + baseDn + " " + |
| | | " has servers connected to it - will not reset generationId"); |
| | | { |
| | | TRACER.debugInfo("In RS " |
| | | + this.replicationServer.getMonitorInstanceName() + " for " |
| | | + baseDn + " " |
| | | + " has servers connected to it - will not reset generationId"); |
| | | } |
| | | } |
| | | |
| | | if (!lDAPServersConnectedInTheTopology && !this.generationIdSavedStatus |
| | |
| | | * The next change to send is always the first one in the tree |
| | | * So this methods simply need to check that dependencies are OK |
| | | * and update this replicaId RUV |
| | | * |
| | | */ |
| | | return handler.take(); |
| | | } |
| | |
| | | */ |
| | | public Set<String> getChangelogs() |
| | | { |
| | | LinkedHashSet<String> mySet = new LinkedHashSet<String>(); |
| | | |
| | | Set<String> results = new LinkedHashSet<String>(); |
| | | for (ReplicationServerHandler handler : replicationServers.values()) |
| | | { |
| | | mySet.add(handler.getServerAddressURL()); |
| | | results.add(handler.getServerAddressURL()); |
| | | } |
| | | |
| | | return mySet; |
| | | return results; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public List<String> getConnectedLDAPservers() |
| | | { |
| | | List<String> mySet = new ArrayList<String>(0); |
| | | |
| | | List<String> results = new ArrayList<String>(0); |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | { |
| | | mySet.add(String.valueOf(handler.getServerId())); |
| | | results.add(String.valueOf(handler.getServerId())); |
| | | } |
| | | return mySet; |
| | | return results; |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | DbHandler handler = sourceDbHandlers.get(serverId); |
| | | if (handler == null) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | ReplicationIterator it; |
| | | try |
| | |
| | | * @param from lower limit changenumber. |
| | | * @param to upper limit changenumber. |
| | | * @return the number of changes. |
| | | * |
| | | */ |
| | | public int getCount(int serverId, |
| | | ChangeNumber from, ChangeNumber to) |
| | | public long getCount(int serverId, ChangeNumber from, ChangeNumber to) |
| | | { |
| | | DbHandler handler = sourceDbHandlers.get(serverId); |
| | | if (handler == null) |
| | | return 0; |
| | | |
| | | return handler.getCount(from, to); |
| | | if (handler != null) |
| | | { |
| | | return handler.getCount(from, to); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | |
| | | private List<ServerHandler> getDestinationServers(RoutableMsg msg, |
| | | ServerHandler senderHandler) |
| | | { |
| | | List<ServerHandler> servers = |
| | | new ArrayList<ServerHandler>(); |
| | | List<ServerHandler> servers = new ArrayList<ServerHandler>(); |
| | | |
| | | if (msg.getDestination() == RoutableMsg.THE_CLOSEST_SERVER) |
| | | { |
| | |
| | | { |
| | | // Don't loop on the sender |
| | | if (destinationHandler == senderHandler) |
| | | { |
| | | continue; |
| | | } |
| | | servers.add(destinationHandler); |
| | | } |
| | | } else |
| | |
| | | } |
| | | |
| | | // Populate the RS state in the msg from the DbState |
| | | monitorMsg.setReplServerDbState(this.getDbServerState()); |
| | | monitorMsg.setReplServerDbState(getDbServerState()); |
| | | return monitorMsg; |
| | | } |
| | | finally |
| | |
| | | { |
| | | for (DataServerHandler handler : directoryServers.values()) |
| | | { |
| | | if ((notThisOne == null) || (handler != notThisOne)) |
| | | if (notThisOne == null || handler != notThisOne) |
| | | // All except passed one |
| | | { |
| | | for (int i=1; i<=2; i++) |
| | |
| | | |
| | | // Create info for the local RS |
| | | List<RSInfo> rsInfos = new ArrayList<RSInfo>(); |
| | | RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(), |
| | | replicationServer.getServerURL(), generationId, |
| | | replicationServer.getGroupId(), replicationServer.getWeight()); |
| | | RSInfo localRSInfo = toRSInfo(replicationServer, generationId); |
| | | rsInfos.add(localRSInfo); |
| | | |
| | | return new TopologyMsg(dsInfos, rsInfos); |
| | |
| | | for (DataServerHandler serverHandler : directoryServers.values()) |
| | | { |
| | | if (serverHandler.getServerId() == destDsId) |
| | | { |
| | | continue; |
| | | } |
| | | dsInfos.add(serverHandler.toDSInfo()); |
| | | } |
| | | |
| | | // Add our own info (local RS) |
| | | RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(), |
| | | replicationServer.getServerURL(), generationId, |
| | | replicationServer.getGroupId(), replicationServer.getWeight()); |
| | | RSInfo localRSInfo = toRSInfo(replicationServer, generationId); |
| | | rsInfos.add(localRSInfo); |
| | | |
| | | // Go through every peer RSs (and get their connected DSs), also add info |
| | |
| | | return new TopologyMsg(dsInfos, rsInfos); |
| | | } |
| | | |
| | | private RSInfo toRSInfo(ReplicationServer rs, long generationId) |
| | | { |
| | | return new RSInfo(rs.getServerId(), rs.getServerURL(), generationId, |
| | | rs.getGroupId(), rs.getWeight()); |
| | | } |
| | | |
| | | /** |
| | | * Get the generationId associated to this domain. |
| | | * |
| | |
| | | ResetGenerationIdMsg genIdMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this + |
| | | " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId()+ |
| | | " for baseDn " + baseDn + ":\n" + genIdMsg); |
| | | { |
| | | TRACER.debugInfo("In " + this + " Receiving ResetGenerationIdMsg from " |
| | | + senderHandler.getServerId() + " for baseDn " + baseDn + ":\n" |
| | | + genIdMsg); |
| | | } |
| | | |
| | | try |
| | | { |
| | |
| | | { |
| | | // Order to take a gen id we already have, just ignore |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this |
| | | { |
| | | TRACER.debugInfo("In " + this |
| | | + " Reset generation id requested for baseDn " + baseDn |
| | | + " but generation id was already " + this.generationId |
| | | + ":\n" + genIdMsg); |
| | | + " but generation id was already " + this.generationId + ":\n" |
| | | + genIdMsg); |
| | | } |
| | | } |
| | | |
| | | // If we are the first replication server warned, |
| | |
| | | // status of a DS has to be changed. See more comments in run method of |
| | | // StatusAnalyzer. |
| | | if (debugEnabled()) |
| | | TRACER |
| | | .debugInfo("Status analyzer for domain " |
| | | + baseDn |
| | | + " has been interrupted when" |
| | | + " trying to acquire domain lock for changing the status" |
| | | + " of DS " |
| | | + serverHandler.getServerId()); |
| | | { |
| | | TRACER.debugInfo("Status analyzer for domain " + baseDn |
| | | + " has been interrupted when" |
| | | + " trying to acquire domain lock for changing the status of DS " |
| | | + serverHandler.getServerId()); |
| | | } |
| | | return true; |
| | | } |
| | | |
| | |
| | | e.getMessage())); |
| | | } |
| | | |
| | | if ((newStatus == ServerStatus.INVALID_STATUS) |
| | | || (newStatus == oldStatus)) |
| | | if (newStatus == ServerStatus.INVALID_STATUS || newStatus == oldStatus) |
| | | { |
| | | // Change was impossible or already occurred (see StatusAnalyzer |
| | | // comments) |
| | |
| | | public boolean isDegradedDueToGenerationId(int serverId) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " isDegraded serverId=" + serverId + |
| | | " given local generation Id=" + this.generationId); |
| | | { |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + " baseDN=" + baseDn + " isDegraded serverId=" + serverId |
| | | + " given local generation Id=" + this.generationId); |
| | | } |
| | | |
| | | ServerHandler handler = replicationServers.get(serverId); |
| | | if (handler == null) |
| | |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + this.replicationServer.getMonitorInstanceName() + |
| | | " baseDN=" + baseDn + |
| | | " Compute degradation of serverId=" + serverId + |
| | | " LS server generation Id=" + handler.getGenerationId()); |
| | | return (handler.getGenerationId() != this.generationId); |
| | | { |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + " baseDN=" + baseDn + " Compute degradation of serverId=" |
| | | + serverId + " LS server generation Id=" + handler.getGenerationId()); |
| | | } |
| | | return handler.getGenerationId() != this.generationId; |
| | | } |
| | | |
| | | /** |
| | |
| | | // Check if generation id has to be reseted |
| | | mayResetGenerationId(); |
| | | if (generationId < 0) |
| | | { |
| | | generationId = handler.getGenerationId(); |
| | | } |
| | | } |
| | | |
| | | if (generationId > 0 && (generationId != handler.getGenerationId())) |
| | | if (isDifferentGenerationId(handler.getGenerationId())) |
| | | { |
| | | Message message = WARN_BAD_GENERATION_ID_FROM_RS.get(handler |
| | | .getServerId(), handler.session |
| | |
| | | ServerState dbServerState = getDbServerState(); |
| | | pendingMonitorData.setRSState(replicationServer.getServerId(), |
| | | dbServerState); |
| | | for (int sid : dbServerState) { |
| | | ChangeNumber storedCN = dbServerState.getChangeNumber(sid); |
| | | pendingMonitorData.setMaxCN(sid, storedCN); |
| | | for (int serverId : dbServerState) { |
| | | ChangeNumber storedCN = dbServerState.getChangeNumber(serverId); |
| | | pendingMonitorData.setMaxCN(serverId, storedCN); |
| | | } |
| | | } |
| | | |
| | |
| | | * @param serverId |
| | | * server handler that is receiving the message. |
| | | */ |
| | | private void receivesMonitorDataResponse(MonitorMsg msg, |
| | | int serverId) |
| | | private void receivesMonitorDataResponse(MonitorMsg msg, int serverId) |
| | | { |
| | | synchronized (pendingMonitorDataLock) |
| | | { |
| | |
| | | pendingMonitorData.setMaxCNs(replServerState); |
| | | |
| | | // store the remote RS states. |
| | | pendingMonitorData.setRSState(msg.getSenderID(), |
| | | replServerState); |
| | | pendingMonitorData.setRSState(msg.getSenderID(), replServerState); |
| | | |
| | | // Store the remote LDAP servers states |
| | | Iterator<Integer> lsidIterator = msg.ldapIterator(); |
| | |
| | | { |
| | | int connectedlsid = connectedlsh.getServerId(); |
| | | Long newfmd = msg.getRSApproxFirstMissingDate(rsid); |
| | | pendingMonitorData.setFirstMissingDate(connectedlsid, |
| | | newfmd); |
| | | pendingMonitorData.setFirstMissingDate(connectedlsid, newfmd); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // this is the latency of the remote RSi regarding another RSj |
| | | // let's update the latency of the LSes connected to RSj |
| | | ReplicationServerHandler rsjHdr = replicationServers |
| | | .get(rsid); |
| | | ReplicationServerHandler rsjHdr = replicationServers.get(rsid); |
| | | if (rsjHdr != null) |
| | | { |
| | | for (int remotelsid : rsjHdr |
| | | .getConnectedDirectoryServerIds()) |
| | | for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds()) |
| | | { |
| | | Long newfmd = msg.getRSApproxFirstMissingDate(rsid); |
| | | pendingMonitorData.setFirstMissingDate(remotelsid, |
| | | newfmd); |
| | | pendingMonitorData.setFirstMissingDate(remotelsid, newfmd); |
| | | } |
| | | } |
| | | } |
| | |
| | | private final ReentrantLock lock = new ReentrantLock(); |
| | | |
| | | /** |
| | | * This lock is used to protect the generationid variable. |
| | | * This lock is used to protect the generationId variable. |
| | | */ |
| | | private final Object generationIDLock = new Object(); |
| | | |
| | |
| | | * {@inheritDoc} |
| | | */ |
| | | @Override |
| | | public ArrayList<Attribute> getMonitorData() |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | /* |
| | | * publish the server id and the port number. |
| | | */ |
| | | ArrayList<Attribute> attributes = new ArrayList<Attribute>(); |
| | | // publish the server id and the port number. |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | attributes.add(Attributes.create("replication-server-id", |
| | | String.valueOf(replicationServer.getServerId()))); |
| | | attributes.add(Attributes.create("replication-server-port", |
| | | String.valueOf(replicationServer.getReplicationPort()))); |
| | | |
| | | /* |
| | | * Add all the base DNs that are known by this replication server. |
| | | */ |
| | | AttributeBuilder builder = new AttributeBuilder("domain-name"); |
| | | builder.add(baseDn); |
| | | attributes.add(builder.toAttribute()); |
| | | // Add all the base DNs that are known by this replication server. |
| | | attributes.add(Attributes.create("domain-name", baseDn)); |
| | | |
| | | // Publish to monitor the generation ID by replicationServerDomain |
| | | builder = new AttributeBuilder("generation-id"); |
| | | builder.add(baseDn + " " + generationId); |
| | | attributes.add(builder.toAttribute()); |
| | | attributes.add(Attributes.create("generation-id", |
| | | baseDn + " " + generationId)); |
| | | |
| | | MonitorData md = getDomainMonitorData(); |
| | | |
| | |
| | | { |
| | | if (ctHeartbeatState == null) |
| | | { |
| | | ctHeartbeatState = this.getDbServerState().duplicate(); |
| | | ctHeartbeatState = getDbServerState().duplicate(); |
| | | } |
| | | return ctHeartbeatState; |
| | | } |
| | |
| | | /** |
| | | * Computes the eligible server state for the domain. |
| | | * |
| | | * <pre> |
| | | * s1 s2 s3 |
| | | * -- -- -- |
| | | * cn31 |
| | |
| | | * cn14 |
| | | * cn26 |
| | | * cn13 |
| | | * </pre> |
| | | * |
| | | * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31 |
| | | * |
| | |
| | | */ |
| | | public ServerState getEligibleState(ChangeNumber eligibleCN) |
| | | { |
| | | ServerState dbState = this.getDbServerState(); |
| | | ServerState dbState = getDbServerState(); |
| | | |
| | | // The result is initialized from the dbState. |
| | | // From it, we don't want to keep the changes newer than eligibleCN. |
| | |
| | | |
| | | if (eligibleCN != null) |
| | | { |
| | | for (int sid : dbState) { |
| | | DbHandler h = sourceDbHandlers.get(sid); |
| | | ChangeNumber mostRecentDbCN = dbState.getChangeNumber(sid); |
| | | for (int serverId : dbState) |
| | | { |
| | | DbHandler h = sourceDbHandlers.get(serverId); |
| | | ChangeNumber mostRecentDbCN = dbState.getChangeNumber(serverId); |
| | | try { |
| | | // Is the most recent change in the Db newer than eligible CN ? |
| | | // if yes (like cn15 in the example above, then we have to go back |
| | |
| | | ReplicationIterator ri = null; |
| | | try { |
| | | ri = h.generateIterator(eligibleCN); |
| | | if ((ri != null) && (ri.getChange() != null)) { |
| | | if (ri != null && ri.getChange() != null) { |
| | | ChangeNumber newCN = ri.getChange().getChangeNumber(); |
| | | result.update(newCN); |
| | | } |
| | | } catch (Exception e) { |
| | | // there's no change older than eligibleCN (case of s3/cn31) |
| | | result.update(new ChangeNumber(0, 0, sid)); |
| | | result.update(new ChangeNumber(0, 0, serverId)); |
| | | } finally { |
| | | if (ri != null) { |
| | | ri.releaseCursor(); |
| | |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + this |
| | | + " getEligibleState() result is " + result); |
| | | { |
| | | TRACER |
| | | .debugInfo("In " + this + " getEligibleState() result is " + result); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | |
| | | for (DbHandler db : sourceDbHandlers.values()) |
| | | { |
| | | // Consider this producer (DS/db). |
| | | int sid = db.getServerId(); |
| | | int serverId = db.getServerId(); |
| | | |
| | | // Should it be considered for eligibility ? |
| | | ChangeNumber heartbeatLastCN = |
| | | getChangeTimeHeartbeatState().getChangeNumber(sid); |
| | | getChangeTimeHeartbeatState().getChangeNumber(serverId); |
| | | |
| | | // If the most recent UpdateMsg or CLHeartbeatMsg received is very old |
| | | // then the domain is considered down and not considered for eligibility |
| | |
| | | } |
| | | */ |
| | | |
| | | boolean sidConnected = false; |
| | | if (directoryServers.containsKey(sid)) |
| | | boolean serverIdConnected = false; |
| | | if (directoryServers.containsKey(serverId)) |
| | | { |
| | | sidConnected = true; |
| | | serverIdConnected = true; |
| | | } |
| | | else |
| | | { |
| | | // not directly connected |
| | | for (ReplicationServerHandler rsh : replicationServers.values()) |
| | | { |
| | | if (rsh.isRemoteLDAPServer(sid)) |
| | | if (rsh.isRemoteLDAPServer(serverId)) |
| | | { |
| | | sidConnected = true; |
| | | serverIdConnected = true; |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | if (!sidConnected) |
| | | if (!serverIdConnected) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("In " + "Replication Server " + |
| | | replicationServer.getReplicationPort() + " " + |
| | | baseDn + " " + replicationServer.getServerId() + |
| | | " Server " + sid |
| | | + " is not considered for eligibility ... potentially down"); |
| | | { |
| | | TRACER.debugInfo("In " + "Replication Server " |
| | | + replicationServer.getReplicationPort() + " " + baseDn + " " |
| | | + replicationServer.getServerId() + " Server " + serverId |
| | | + " is not considered for eligibility ... potentially down"); |
| | | } |
| | | continue; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "In " + "Replication Server " + replicationServer.getReplicationPort() + |
| | | " " + baseDn + " " + replicationServer.getServerId() + |
| | | " getEligibleCN() returns result =" + eligibleCN); |
| | | { |
| | | TRACER.debugInfo("In Replication Server " |
| | | + replicationServer.getReplicationPort() + " " + baseDn + " " |
| | | + replicationServer.getServerId() |
| | | + " getEligibleCN() returns result =" + eligibleCN); |
| | | } |
| | | return eligibleCN; |
| | | } |
| | | |
| | |
| | | { |
| | | // If we are the first replication server warned, |
| | | // then forwards the message to the remote replication servers |
| | | for (ReplicationServerHandler rsHandler : replicationServers |
| | | .values()) |
| | | for (ReplicationServerHandler rsHandler : replicationServers.values()) |
| | | { |
| | | try |
| | | { |
| | |
| | | // TODO:May be we can spare processing by only storing CN (timestamp) |
| | | // instead of a server state. |
| | | getChangeTimeHeartbeatState().update(cn); |
| | | |
| | | /* |
| | | if (debugEnabled()) |
| | | { |
| | | Set<String> ss = ctHeartbeatState.toStringSet(); |
| | | String dss = ""; |
| | | for (String s : ss) |
| | | { |
| | | dss = dss + " \\ " + s; |
| | | } |
| | | TRACER.debugInfo("In " + this.getName() + " " + dss); |
| | | } |
| | | */ |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | long res = 0; |
| | | |
| | | // Parses the dbState of the domain , server by server |
| | | ServerState dbState = this.getDbServerState(); |
| | | for (int sid : dbState) { |
| | | // process one sid |
| | | for (int serverId : getDbServerState()) |
| | | { |
| | | ChangeNumber startCN = null; |
| | | if (startState.getChangeNumber(sid) != null) |
| | | startCN = startState.getChangeNumber(sid); |
| | | long sidRes = getCount(sid, startCN, endCN); |
| | | if (startState.getChangeNumber(serverId) != null) |
| | | { |
| | | startCN = startState.getChangeNumber(serverId); |
| | | } |
| | | long serverIdRes = getCount(serverId, startCN, endCN); |
| | | |
| | | // The startPoint is excluded when counting the ECL eligible changes |
| | | if ((startCN != null) && (sidRes > 0)) |
| | | sidRes--; |
| | | if (startCN != null && serverIdRes > 0) |
| | | { |
| | | serverIdRes--; |
| | | } |
| | | |
| | | res += sidRes; |
| | | res += serverIdRes; |
| | | } |
| | | return res; |
| | | } |
| | |
| | | public long getEligibleCount(ChangeNumber startCN, ChangeNumber endCN) |
| | | { |
| | | long res = 0; |
| | | |
| | | // Parses the dbState of the domain , server by server |
| | | ServerState dbState = this.getDbServerState(); |
| | | for (int sid : dbState) { |
| | | // process one sid |
| | | for (int serverId : getDbServerState()) { |
| | | ChangeNumber lStartCN = |
| | | new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), sid); |
| | | res += getCount(sid, lStartCN, endCN); |
| | | new ChangeNumber(startCN.getTime(), startCN.getSeqnum(), serverId); |
| | | res += getCount(serverId, lStartCN, endCN); |
| | | } |
| | | return res; |
| | | } |
| | |
| | | long latest = 0; |
| | | for (DbHandler db : sourceDbHandlers.values()) |
| | | { |
| | | if ((latest==0) || (latest<db.getLatestTrimDate())) |
| | | if (latest == 0 || latest < db.getLatestTrimDate()) |
| | | { |
| | | latest = db.getLatestTrimDate(); |
| | | } |