opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -27,35 +27,21 @@ */ package org.opends.server.replication.server; import java.io.IOException; import java.util.*; import java.util.zip.DataFormatException; import org.opends.messages.Message; import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; import org.opends.server.types.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.common.StatusMachine.*; import static org.opends.server.replication.protocol.ProtocolVersion.*; import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.zip.DataFormatException; import org.opends.messages.Message; import org.opends.server.replication.common.AssuredMode; import org.opends.server.replication.common.DSInfo; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.common.StatusMachine; import org.opends.server.replication.common.StatusMachineEvent; import org.opends.server.replication.protocol.*; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeBuilder; import org.opends.server.types.Attributes; import org.opends.server.types.DirectoryException; import org.opends.server.types.ResultCode; /** * This class defines a server handler, which handles all interaction with a * peer server (RS or DS). @@ -104,11 +90,11 @@ * @param newGenId The new generation id to take into account * @throws IOException If IO error occurred. */ public void changeStatusForResetGenId(long newGenId) throws IOException public void changeStatusForResetGenId(long newGenId) throws IOException { StatusMachineEvent event; final int localRsServerId = replicationServer.getServerId(); StatusMachineEvent event; if (newGenId == -1) { // The generation id is being made invalid, let's put the DS @@ -127,9 +113,8 @@ if (debugEnabled()) { TRACER.debugInfo( "In RS " + replicationServerDomain.getReplicationServer().getServerId() + ". Closing connection to DS " + getServerId() + "In RS " + localRsServerId + ", closing connection to DS " + getServerId() + " for baseDn " + getBaseDN() + " to force reconnection as new local" + " generationId and remote one match and DS is in bad gen id: " + @@ -140,20 +125,19 @@ // would rewait the RSD lock that we already must have entering this // method. This would lead to a reentrant lock which we do not want. // So simply close the session, this will make the hang up appear // after the reader thread that took the RSD lock realeases it. if (session != null) // after the reader thread that took the RSD lock releases it. if (session != null // V4 protocol introduced a StopMsg to properly close the // connection between servers && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) { // V4 protocol introduces a StopMsg to properly close the // connection between servers if (getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4) try { try { session.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } session.publish(new StopMsg()); } catch (IOException ioe) { // Anyway, going to close session, so nothing to do } } @@ -165,12 +149,10 @@ { if (debugEnabled()) { TRACER.debugInfo( "In RS " + replicationServerDomain.getReplicationServer().getServerId() + ". DS " + getServerId() + " for baseDn " + getBaseDN() + " has already generation id " + newGenId + " so no ChangeStatusMsg sent to him."); TRACER.debugInfo("In RS " + localRsServerId + ". DS " + getServerId() + " for baseDn " + getBaseDN() + " has already generation id " + newGenId + " so no ChangeStatusMsg sent to him."); } return; } @@ -182,14 +164,13 @@ } } if ((event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT) && (status == ServerStatus.FULL_UPDATE_STATUS)) if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT && status == ServerStatus.FULL_UPDATE_STATUS) { // Prevent useless error message (full update status cannot lead to bad // gen status) Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get( Integer.toString(replicationServerDomain. getReplicationServer().getServerId()), Integer.toString(localRsServerId), getBaseDN(), Integer.toString(serverId), Long.toString(generationId), @@ -214,11 +195,9 @@ if (debugEnabled()) { TRACER.debugInfo( "In RS " + replicationServerDomain.getReplicationServer().getServerId() + " Sending change status for reset gen id to " + getServerId() + " for baseDn " + getBaseDN() + ":\n" + csMsg); TRACER.debugInfo("In RS " + localRsServerId + " Sending change status for reset gen id to " + getServerId() + " for baseDn " + getBaseDN() + ":\n" + csMsg); } session.publish(csMsg); @@ -257,11 +236,9 @@ if (debugEnabled()) { TRACER.debugInfo( "In RS " + replicationServerDomain.getReplicationServer().getServerId() + " Sending change status from status analyzer to " + getServerId() + " for baseDn " + getBaseDN() + ":\n" + csMsg); TRACER.debugInfo("In RS " + replicationServer.getServerId() + " Sending change status from status analyzer to " + getServerId() + " for baseDn " + getBaseDN() + ":\n" + csMsg); } session.publish(csMsg); @@ -296,14 +273,13 @@ // Add the specific DS ones attributes.add(Attributes.create("replica", serverURL)); attributes.add(Attributes.create("connected-to", this.replicationServerDomain.getReplicationServer() .getMonitorInstanceName())); this.replicationServer.getMonitorInstanceName())); MonitorData md = replicationServerDomain.getDomainMonitorData(); // Oldest missing update Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0)) long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId); if (approxFirstMissingDate > 0) { Date date = new Date(approxFirstMissingDate); attributes.add(Attributes.create( @@ -314,14 +290,12 @@ } // Missing changes long missingChanges = md.getMissingChanges(serverId); attributes.add(Attributes.create("missing-changes", String .valueOf(missingChanges))); attributes.add(Attributes.create("missing-changes", String.valueOf(md.getMissingChanges(serverId)))); // Replication delay long delay = md.getApproxDelay(serverId); attributes.add(Attributes.create("approximate-delay", String .valueOf(delay))); attributes.add(Attributes.create("approximate-delay", String.valueOf(md.getApproxDelay(serverId)))); /* get the Server State */ AttributeBuilder builder = new AttributeBuilder("server-state"); @@ -541,18 +515,9 @@ { Message errMessage = ERR_DS_DISCONNECTED_DURING_HANDSHAKE.get( Integer.toString(inServerStartMsg.getServerId()), Integer.toString(replicationServerDomain.getReplicationServer(). getServerId())); Integer.toString(replicationServer.getServerId())); throw new DirectoryException(ResultCode.OTHER, errMessage); } catch (NotSupportedOldVersionPDUException e) { // We do not need to support DS V1 connection, we just accept RS V1 // connection: // We just trash the message, log the event for debug purpose and close // the connection throw new DirectoryException(ResultCode.OTHER, null, null); } catch (Exception e) { // We do not need to support DS V1 connection, we just accept RS V1 @@ -588,7 +553,7 @@ } finally { if ((replicationServerDomain != null) && if (replicationServerDomain != null && replicationServerDomain.hasLock()) replicationServerDomain.release(); } @@ -610,21 +575,20 @@ { // Peer DS uses protocol < V4 : send it a ReplServerStartMsg startMsg = new ReplServerStartMsg(getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, getReplicationServerURL(), getBaseDN(), maxRcvWindow, replicationServerDomain.getDbServerState(), localGenerationId, sslEncryption, getLocalGroupId(), replicationServerDomain.getReplicationServer() .getDegradedStatusThreshold()); replicationServer.getDegradedStatusThreshold()); } else { // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg startMsg = new ReplServerStartDSMsg(getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, getReplicationServerURL(), getBaseDN(), maxRcvWindow, replicationServerDomain.getDbServerState(), localGenerationId, sslEncryption, getLocalGroupId(), replicationServerDomain.getReplicationServer() .getDegradedStatusThreshold(), replicationServer.getWeight(), replicationServer.getDegradedStatusThreshold(), replicationServer.getWeight(), replicationServerDomain.getConnectedLDAPservers().size()); } @@ -651,17 +615,10 @@ { if (serverId != 0) { StringBuilder builder = new StringBuilder("Replica DS("); builder.append(serverId); builder.append(") for domain \""); builder.append(replicationServerDomain.getBaseDn()); builder.append("\""); return builder.toString(); return "Replica DS(" + serverId + ") for domain \"" + replicationServerDomain.getBaseDn() + "\""; } else { return "Unknown server"; } return "Unknown server"; } /** @@ -740,7 +697,7 @@ else { // We are an empty ReplicationServer if ((generationId > 0) && (!getServerState().isEmpty())) if (generationId > 0 && !getServerState().isEmpty()) { // If the LDAP server has already sent changes // it is not expected to connect to an empty RS opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -365,20 +365,19 @@ if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4) { // Peer DS uses protocol < V4 : send it a ReplServerStartMsg startMsg = new ReplServerStartMsg(getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, replicationServerDomain.getDbServerState(), localGenerationId, sslEncryption, getLocalGroupId(), replicationServerDomain.getReplicationServer() .getDegradedStatusThreshold()); startMsg = new ReplServerStartMsg(getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, replicationServerDomain.getDbServerState(), localGenerationId, sslEncryption, getLocalGroupId(), replicationServer.getDegradedStatusThreshold()); } else { // Peer DS uses protocol V4 : send it a ReplServerStartDSMsg startMsg = new ReplServerStartDSMsg(getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, new ServerState(), localGenerationId, sslEncryption, getLocalGroupId(), 0, replicationServer.getWeight(), 0); startMsg = new ReplServerStartDSMsg(getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, new ServerState(), localGenerationId, sslEncryption, getLocalGroupId(), 0, replicationServer.getWeight(), 0); } send(startMsg); @@ -556,15 +555,13 @@ catch(DirectoryException de) { TRACER.debugCaught(DebugLogLevel.ERROR, de); if (draftCNDbIter != null) draftCNDbIter.releaseCursor(); releaseIterator(); throw de; } catch(Exception e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); if (draftCNDbIter != null) draftCNDbIter.releaseCursor(); releaseIterator(); throw new DirectoryException( ResultCode.OPERATIONS_ERROR, Message.raw(Category.SYNC, @@ -917,11 +914,7 @@ { if (debugEnabled()) TRACER.debugInfo(this + " shutdown()" + draftCNDbIter); if (this.draftCNDbIter != null) { draftCNDbIter.releaseCursor(); draftCNDbIter = null; } releaseIterator(); for (DomainContext domainCtxt : domainCtxts) { if (!domainCtxt.unRegisterHandler()) { logError(Message.raw(Category.SYNC, Severity.NOTICE, @@ -934,6 +927,15 @@ domainCtxts = null; } private void releaseIterator() { if (this.draftCNDbIter != null) { this.draftCNDbIter.releaseCursor(); this.draftCNDbIter = null; } } /** * Request to shutdown the associated writer. */ @@ -1112,7 +1114,7 @@ { session.publish( new ErrorMsg( replicationServerDomain.getReplicationServer().getServerId(), replicationServer.getServerId(), serverId, Message.raw(Category.SYNC, Severity.INFORMATION, "Exception raised: " + e.getMessage()))); @@ -1130,11 +1132,9 @@ registerIntoDomain(); if (debugEnabled()) TRACER.debugInfo( this.getClass().getCanonicalName()+ " " + operationId + " initialized: " + " " + dumpState() + " " + " " + clDomCtxtsToString("")); TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId + " initialized: " + " " + dumpState() + " " + " " + clDomCtxtsToString("")); } private void initializeChangelogSearch(StartECLSessionMsg msg) @@ -1522,12 +1522,8 @@ searchPhase = UNDEFINED_PHASE; } if (draftCNDbIter!=null) { // End of INIT_PHASE => always release the iterator draftCNDbIter.releaseCursor(); draftCNDbIter = null; } // End of INIT_PHASE => always release the iterator releaseIterator(); } /** opends/src/server/org/opends/server/replication/server/LightweightServerHandler.java
@@ -27,14 +27,7 @@ */ package org.opends.server.replication.server; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.*; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.MonitorProvider; @@ -50,6 +43,8 @@ import org.opends.server.types.Attributes; import org.opends.server.types.InitializationException; import static org.opends.server.loggers.debug.DebugLogger.*; /** * This class defines a server handler dedicated to the remote LDAP servers * connected to a remote Replication Server. @@ -142,13 +137,15 @@ this.protocolVersion = protocolVersion; if (debugEnabled()) TRACER.debugInfo( "In " + replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName()+ " LWSH for remote server " + this.serverId + " connected to:" + this.replServerHandler.getMonitorInstanceName() + " ()"); } TRACER.debugInfo("In " + getLocalRSMonitorInstanceName() + " LWSH for remote server " + this.serverId + " connected to:" + this.replServerHandler.getMonitorInstanceName() + " ()"); } private String getLocalRSMonitorInstanceName() { return rsDomain.getReplicationServer().getMonitorInstanceName(); } /** * Creates a DSInfo structure representing this remote DS. @@ -176,15 +173,11 @@ public void startHandler() { if (debugEnabled()) TRACER.debugInfo( "In " + replServerHandler.getDomain().getReplicationServer().getMonitorInstanceName() + " LWSH for remote server " + this.serverId + " connected to:" + this.replServerHandler.getMonitorInstanceName() + " start"); TRACER.debugInfo("In " + getLocalRSMonitorInstanceName() + " LWSH for remote server " + this.serverId + " connected to:" + this.replServerHandler.getMonitorInstanceName() + " start"); DirectoryServer.deregisterMonitorProvider(this); DirectoryServer.registerMonitorProvider(this); } /** @@ -193,10 +186,8 @@ public void stopHandler() { if (debugEnabled()) TRACER.debugInfo("In " + replServerHandler.getDomain().getReplicationServer() .getMonitorInstanceName() + " LWSH for remote server " + this.serverId + " connected to:" TRACER.debugInfo("In " + getLocalRSMonitorInstanceName() + " LWSH for remote server " + this.serverId + " connected to:" + this.replServerHandler.getMonitorInstanceName() + " stop"); DirectoryServer.deregisterMonitorProvider(this); } opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -280,32 +280,13 @@ * unlock memory tree * restart as usual * load this change on the delayList * */ SortedSet<ReplicationIterator> iteratorSortedSet = new TreeSet<ReplicationIterator>( new ReplicationIteratorComparator()); SortedSet<ReplicationIterator> iteratorSortedSet = null; try { /* fill the lateQueue */ for (int serverId : replicationServerDomain.getServers()) { ChangeNumber lastCsn = serverState.getChangeNumber(serverId); ReplicationIterator iterator = replicationServerDomain .getChangelogIterator(serverId, lastCsn); if (iterator != null) { if (iterator.getChange() != null) { iteratorSortedSet.add(iterator); } else { iterator.releaseCursor(); } } } iteratorSortedSet = collectAllIteratorsWithChanges(); /* fill the lateQueue */ // The loop below relies on the fact that it is sorted based // on the currentChange of each iterator to consider the next // change across all servers. @@ -320,22 +301,12 @@ ReplicationIterator iterator = iteratorSortedSet.first(); iteratorSortedSet.remove(iterator); lateQueue.add(iterator.getChange()); if (iterator.next()) { iteratorSortedSet.add(iterator); } else { iterator.releaseCursor(); } addIteratorIfNotEmpty(iteratorSortedSet, iterator); } } finally { for (ReplicationIterator iterator : iteratorSortedSet) { iterator.releaseCursor(); } releaseAllIterators(iteratorSortedSet); } /* @@ -343,7 +314,6 @@ * messages in the replication log so the remote serevr is not * late anymore. */ if (lateQueue.isEmpty()) { synchronized (msgQueue) @@ -430,6 +400,19 @@ return null; } private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators, ReplicationIterator iter) { if (iter.next()) { iterators.add(iter); } else { iter.releaseCursor(); } } /** * Get the older Change Number for that server. * Returns null when the queue is empty. @@ -450,7 +433,12 @@ } else { if (lateQueue.isEmpty()) if (!lateQueue.isEmpty()) { UpdateMsg msg = lateQueue.first(); result = msg.getChangeNumber(); } else { /* following is false AND lateQueue is empty @@ -460,36 +448,10 @@ there. So let's take the last change not sent directly from the db. */ SortedSet<ReplicationIterator> iteratorSortedSet = new TreeSet<ReplicationIterator>( new ReplicationIteratorComparator()); SortedSet<ReplicationIterator> iteratorSortedSet = null; try { // Build a list of candidates iterator (i.e. db i.e. server) for (int serverId : replicationServerDomain.getServers()) { // get the last already sent CN from that server ChangeNumber lastCsn = serverState.getChangeNumber(serverId); // get an iterator in this server db from that last change ReplicationIterator iterator = replicationServerDomain.getChangelogIterator(serverId, lastCsn); /* if that iterator has changes, then it is a candidate it is added in the sorted list at a position given by its current change (see ReplicationIteratorComparator). */ if (iterator != null) { if (iterator.getChange() != null) { iteratorSortedSet.add(iterator); } else { iterator.releaseCursor(); } } } iteratorSortedSet = collectAllIteratorsWithChanges(); UpdateMsg msg = iteratorSortedSet.first().getChange(); result = msg.getChangeNumber(); } catch (Exception e) @@ -497,21 +459,58 @@ result = null; } finally { for (ReplicationIterator iterator : iteratorSortedSet) { iterator.releaseCursor(); } releaseAllIterators(iteratorSortedSet); } } else { UpdateMsg msg = lateQueue.first(); result = msg.getChangeNumber(); } } } return result; } private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges() { SortedSet<ReplicationIterator> results = new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator()); // Build a list of candidates iterator (i.e. db i.e. server) for (int serverId : replicationServerDomain.getServers()) { // get the last already sent CN from that server ChangeNumber lastCsn = serverState.getChangeNumber(serverId); // get an iterator in this server db from that last change ReplicationIterator iter = replicationServerDomain.getChangelogIterator(serverId, lastCsn); /* if that iterator has changes, then it is a candidate it is added in the sorted list at a position given by its current change (see ReplicationIteratorComparator). */ if (iter != null) { if (iter.getChange() != null) { results.add(iter); } else { iter.releaseCursor(); } } } return results; } private void releaseAllIterators(SortedSet<ReplicationIterator> iterators) { if (iterators != null) { for (ReplicationIterator iter : iterators) { iter.releaseCursor(); } } } /** * Get the count of updates sent to this server. * @return The count of update sent to this server. opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -118,7 +118,8 @@ */ private final Map<Integer, DbHandler> sourceDbHandlers = new ConcurrentHashMap<Integer, DbHandler>(); private ReplicationServer replicationServer; /** The ReplicationServer that created the current instance. */ private ReplicationServer localReplicationServer; /** GenerationId management. */ private volatile long generationId = -1; @@ -217,16 +218,16 @@ * Creates a new ReplicationServerDomain associated to the DN baseDn. * * @param baseDn The baseDn associated to the ReplicationServerDomain. * @param replicationServer the ReplicationServer that created this * @param localReplicationServer the ReplicationServer that created this * replicationServer cache. */ public ReplicationServerDomain( String baseDn, ReplicationServer replicationServer) public ReplicationServerDomain(String baseDn, ReplicationServer localReplicationServer) { this.baseDn = baseDn; this.replicationServer = replicationServer; this.localReplicationServer = localReplicationServer; this.assuredTimeoutTimer = new Timer("Replication server RS(" + replicationServer.getServerId() + localReplicationServer.getServerId() + ") assured timer for domain \"" + baseDn + "\"", true); DirectoryServer.registerMonitorProvider(this); @@ -245,9 +246,9 @@ public void put(UpdateMsg update, ServerHandler sourceHandler) throws IOException { ChangeNumber cn = update.getChangeNumber(); int id = cn.getServerId(); int serverId = cn.getServerId(); sourceHandler.updateServerState(update); sourceHandler.incrementInCount(); @@ -297,7 +298,7 @@ { // Unknown assured mode: should never happen Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get( Integer.toString(replicationServer.getServerId()), Integer.toString(localReplicationServer.getServerId()), assuredMode.toString(), baseDn, update.toString()); logError(errorMsg); assuredMessage = false; @@ -308,40 +309,11 @@ } } // look for the dbHandler that is responsible for the LDAP server which // generated the change. DbHandler dbHandler; synchronized (sourceDbHandlers) if (!publishMessage(update, serverId)) { dbHandler = sourceDbHandlers.get(id); if (dbHandler == null) { try { dbHandler = replicationServer.newDbHandler(id, baseDn); generationIdSavedStatus = true; } catch (ChangelogException e) { /* * Because of database problem we can't save any more changes * from at least one LDAP server. * This replicationServer therefore can't do it's job properly anymore * and needs to close all its connections and shutdown itself. */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); return; } sourceDbHandlers.put(id, dbHandler); } return; } // Publish the messages to the source handler dbHandler.add(update); List<Integer> expectedServers = null; if (assuredMessage) { @@ -363,7 +335,7 @@ // times out) AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn); assuredTimeoutTimer.schedule(assuredTimeoutTask, replicationServer.getAssuredTimeout()); localReplicationServer.getAssuredTimeout()); // Purge timer every 100 treated messages assuredTimeoutTimerPurgeCounter++; if ((assuredTimeoutTimerPurgeCounter % 100) == 0) @@ -408,8 +380,9 @@ if (debugEnabled()) { TRACER.debugInfo("In Replication Server " + replicationServer.getReplicationPort() + " " + baseDn + " " + replicationServer.getServerId() + " for dn " + baseDn + localReplicationServer.getReplicationPort() + " " + baseDn + " " + localReplicationServer.getServerId() + " for dn " + baseDn + ", update " + update.getChangeNumber() + " will not be sent to replication server " + handler.getServerId() + " with generation id " @@ -464,7 +437,7 @@ } if (dsStatus == ServerStatus.FULL_UPDATE_STATUS) { TRACER.debugInfo("In RS " + replicationServer.getServerId() TRACER.debugInfo("In RS " + localReplicationServer.getServerId() + " for dn " + baseDn + ", update " + update.getChangeNumber() + " will not be sent to directory server " + handler.getServerId() + " as it is in full update"); @@ -484,6 +457,44 @@ } } private boolean publishMessage(UpdateMsg update, int serverId) { // look for the dbHandler that is responsible for the LDAP server which // generated the change. DbHandler dbHandler; synchronized (sourceDbHandlers) { dbHandler = sourceDbHandlers.get(serverId); if (dbHandler == null) { try { dbHandler = localReplicationServer.newDbHandler(serverId, baseDn); generationIdSavedStatus = true; } catch (ChangelogException e) { /* * Because of database problem we can't save any more changes * from at least one LDAP server. * This replicationServer therefore can't do it's job properly anymore * and needs to close all its connections and shutdown itself. */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); localReplicationServer.shutdown(); return false; } sourceDbHandlers.put(serverId, dbHandler); } } // Publish the messages to the source handler dbHandler.add(update); return true; } private NotAssuredUpdateMsg addUpdate(ServerHandler handler, UpdateMsg update, NotAssuredUpdateMsg notAssuredUpdate, boolean assuredMessage, List<Integer> expectedServers) @@ -557,7 +568,7 @@ UpdateMsg update, ServerHandler sourceHandler) throws IOException { ChangeNumber cn = update.getChangeNumber(); byte groupId = replicationServer.getGroupId(); byte groupId = localReplicationServer.getGroupId(); byte sourceGroupId = sourceHandler.getGroupId(); List<Integer> expectedServers = new ArrayList<Integer>(); List<Integer> wrongStatusServers = new ArrayList<Integer>(); @@ -642,13 +653,13 @@ ChangeNumber cn = update.getChangeNumber(); boolean interestedInAcks = false; byte safeDataLevel = update.getSafeDataLevel(); byte groupId = replicationServer.getGroupId(); byte groupId = localReplicationServer.getGroupId(); byte sourceGroupId = sourceHandler.getGroupId(); if (safeDataLevel < (byte) 1) { // Should never happen Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get( Integer.toString(replicationServer.getServerId()), Integer.toString(localReplicationServer.getServerId()), Byte.toString(safeDataLevel), baseDn, update.toString()); logError(errorMsg); } else if (sourceGroupId == groupId @@ -799,7 +810,7 @@ */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_RS_ERROR_SENDING_ACK.get( Integer.toString(replicationServer.getServerId()), Integer.toString(localReplicationServer.getServerId()), Integer.toString(origServer.getServerId()), cn.toString(), baseDn)); mb.append(stackTraceToSingleLineString(e)); @@ -862,7 +873,7 @@ ServerHandler origServer = expectedAcksInfo.getRequesterServer(); if (debugEnabled()) { TRACER.debugInfo("In RS " + replicationServer.getServerId() TRACER.debugInfo("In RS " + localReplicationServer.getServerId() + " for "+ baseDn + ", sending timeout for assured update with change " + " number " + cn + " to server id " @@ -879,7 +890,7 @@ */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_RS_ERROR_SENDING_ACK.get( Integer.toString(replicationServer.getServerId()), Integer.toString(localReplicationServer.getServerId()), Integer.toString(origServer.getServerId()), cn.toString(), baseDn)); mb.append(stackTraceToSingleLineString(e)); @@ -987,7 +998,7 @@ { // looks like two connected LDAP servers have the same serverId Message message = ERR_DUPLICATE_SERVER_ID.get( replicationServer.getMonitorInstanceName(), localReplicationServer.getMonitorInstanceName(), directoryServers.get(handler.getServerId()).toString(), handler.toString(), handler.getServerId()); logError(message); @@ -1007,7 +1018,8 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() TRACER.debugInfo("In " + this.localReplicationServer.getMonitorInstanceName() + " domain=" + this + " stopServer() on the server handler " + handler.getMonitorInstanceName()); } @@ -1045,7 +1057,8 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() TRACER.debugInfo("In " + localReplicationServer.getMonitorInstanceName() + " remote server " + handler.getMonitorInstanceName() + " is the last RS/DS to be stopped:" + " stopping monitoring publisher"); @@ -1078,7 +1091,7 @@ if (debugEnabled()) { TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + localReplicationServer.getMonitorInstanceName() + " remote server " + handler.getMonitorInstanceName() + " is the last DS to be stopped: stopping status analyzer"); } @@ -1128,7 +1141,8 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() TRACER.debugInfo("In " + this.localReplicationServer.getMonitorInstanceName() + " domain=" + this + " stopServer() on the message handler " + handler.getMonitorInstanceName()); } @@ -1207,8 +1221,8 @@ if (debugEnabled()) { TRACER.debugInfo("In RS " + this.replicationServer.getMonitorInstanceName() + " for " + baseDn + " " + " mayResetGenerationId generationIdSavedStatus=" + this.localReplicationServer.getMonitorInstanceName() + " for " + baseDn + " mayResetGenerationId generationIdSavedStatus=" + generationIdSavedStatus); } @@ -1225,7 +1239,7 @@ if (debugEnabled()) { TRACER.debugInfo("In RS " + this.replicationServer.getMonitorInstanceName() + " for " + this.localReplicationServer.getMonitorInstanceName() + " for " + baseDn + " " + " mayResetGenerationId skip RS" + rsh.getMonitorInstanceName() + " that has different genId"); } @@ -1236,7 +1250,7 @@ if (debugEnabled()) { TRACER.debugInfo("In RS " + this.replicationServer.getMonitorInstanceName() + this.localReplicationServer.getMonitorInstanceName() + " for "+ baseDn + " mayResetGenerationId RS" + rsh.getMonitorInstanceName() + " has servers connected to it" @@ -1252,7 +1266,7 @@ if (debugEnabled()) { TRACER.debugInfo("In RS " + this.replicationServer.getMonitorInstanceName() + " for " + this.localReplicationServer.getMonitorInstanceName() + " for " + baseDn + " " + " has servers connected to it - will not reset generationId"); } @@ -1292,7 +1306,7 @@ // looks like two replication servers have the same serverId // log an error message and drop this connection. Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get( replicationServer.getMonitorInstanceName(), oldHandler. localReplicationServer.getMonitorInstanceName(), oldHandler. getServerAddressURL(), handler.getServerAddressURL(), handler.getServerId()); throw new DirectoryException(ResultCode.OTHER, message); @@ -1372,12 +1386,12 @@ * and locks used by the ReplicationIterator. * * @param serverId Identifier of the server for which the iterator is created. * @param changeNumber Starting point for the iterator. * @param startAfterCN Starting point for the iterator. * @return the created ReplicationIterator. Null when no DB is available * for the provided server Id. */ public ReplicationIterator getChangelogIterator(int serverId, ChangeNumber changeNumber) ChangeNumber startAfterCN) { DbHandler handler = sourceDbHandlers.get(serverId); if (handler == null) @@ -1388,7 +1402,7 @@ ReplicationIterator it; try { it = handler.generateIterator(changeNumber); it = handler.generateIterator(startAfterCN); } catch (Exception e) { @@ -1535,14 +1549,15 @@ } /** * Processes a message coming from one server in the topology * and potentially forwards it to one or all other servers. * Processes a message coming from one server in the topology and potentially * forwards it to one or all other servers. * * @param msg The message received and to be processed. * @param senderHandler The server handler of the server that emitted * the message. * @param msg * The message received and to be processed. * @param msgEmitter * The server handler of the server that emitted the message. */ public void process(RoutableMsg msg, ServerHandler senderHandler) public void process(RoutableMsg msg, ServerHandler msgEmitter) { // Test the message for which a ReplicationServer is expected // to be the destination @@ -1551,158 +1566,176 @@ !(msg instanceof InitializeRcvAckMsg) && !(msg instanceof EntryMsg) && !(msg instanceof DoneMsg) && (msg.getDestination() == this.replicationServer.getServerId())) (msg.getDestination() == this.localReplicationServer.getServerId())) { if (msg instanceof ErrorMsg) { ErrorMsg errorMsg = (ErrorMsg) msg; logError(ERR_ERROR_MSG_RECEIVED.get( errorMsg.getDetails())); logError(ERR_ERROR_MSG_RECEIVED.get(errorMsg.getDetails())); } else if (msg instanceof MonitorRequestMsg) { // If the request comes from a Directory Server we need to // build the full list of all servers in the topology // and send back a MonitorMsg with the full list of all the servers // in the topology. if (senderHandler.isDataServer()) { // Monitoring information requested by a DS MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( msg.getDestination(), msg.getSenderID(), monitorData); if (monitorMsg != null) { try { senderHandler.send(monitorMsg); } catch (IOException e) { // the connection was closed. } } return; } else { // Monitoring information requested by a RS MonitorMsg monitorMsg = createLocalTopologyMonitorMsg(msg.getDestination(), msg.getSenderID()); if (monitorMsg != null) { try { senderHandler.send(monitorMsg); } catch (Exception e) { // We log the error. The requestor will detect a timeout or // any other failure on the connection. logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get( Integer.toString(msg.getDestination()))); } } } replyWithMonitorMsg(msg, msgEmitter); } else if (msg instanceof MonitorMsg) { MonitorMsg monitorMsg = (MonitorMsg) msg; receivesMonitorDataResponse(monitorMsg, senderHandler.getServerId()); receivesMonitorDataResponse(monitorMsg, msgEmitter.getServerId()); } else { logError(NOTE_ERR_ROUTING_TO_SERVER.get( msg.getClass().getCanonicalName())); MessageBuilder mb1 = new MessageBuilder(); mb1.append( NOTE_ERR_ROUTING_TO_SERVER.get(msg.getClass().getCanonicalName())); mb1.append("serverID:").append(msg.getDestination()); ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb1.toMessage()); try { senderHandler.send(errMsg); } catch (IOException ioe1) { // an error happened on the sender session trying to recover // from an error on the receiver session. // Not much more we can do at this point. } replyWithUnroutableMsgType(msgEmitter, msg); } return; } List<ServerHandler> servers = getDestinationServers(msg, senderHandler); if (servers.isEmpty()) List<ServerHandler> servers = getDestinationServers(msg, msgEmitter); if (!servers.isEmpty()) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( this.baseDn, Integer.toString(msg.getDestination()))); mb.append(" In Replication Server=").append( this.replicationServer.getMonitorInstanceName()); mb.append(" unroutable message =").append(msg.getClass().getSimpleName()); mb.append(" Details:routing table is empty"); ErrorMsg errMsg = new ErrorMsg( this.replicationServer.getServerId(), msg.getSenderID(), mb.toMessage()); logError(mb.toMessage()); forwardMsgToAllServers(msg, servers, msgEmitter); } else { replyWithUnreachablePeerMsg(msgEmitter, msg); } } private void replyWithMonitorMsg(RoutableMsg msg, ServerHandler msgEmitter) { /* * If the request comes from a Directory Server we need to build the full * list of all servers in the topology and send back a MonitorMsg with the * full list of all the servers in the topology. */ if (msgEmitter.isDataServer()) { // Monitoring information requested by a DS MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg( msg.getDestination(), msg.getSenderID(), monitorData); try { senderHandler.send(errMsg); } catch (IOException ioe) { // TODO Handle error properly (sender timeout in addition) /* * An error happened trying to send an error msg to this server. * Log an error and close the connection to this server. */ MessageBuilder mb2 = new MessageBuilder(); mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString())); mb2.append(stackTraceToSingleLineString(ioe)); logError(mb2.toMessage()); stopServer(senderHandler, false); msgEmitter.send(monitorMsg); } } else catch (IOException e) { // the connection was closed. } } else { for (ServerHandler targetHandler : servers) // Monitoring information requested by a RS MonitorMsg monitorMsg = createLocalTopologyMonitorMsg( msg.getDestination(), msg.getSenderID()); if (monitorMsg != null) { try { targetHandler.send(msg); } catch (IOException ioe) msgEmitter.send(monitorMsg); } catch (IOException e) { /* * An error happened trying the send a routable message * to its destination server. * Send back an error to the originator of the message. */ MessageBuilder mb1 = new MessageBuilder(); mb1.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( this.baseDn, Integer.toString(msg.getDestination()))); mb1.append(" unroutable message =" + msg.getClass().getSimpleName()); mb1.append(" Details: " + ioe.getLocalizedMessage()); ErrorMsg errMsg = new ErrorMsg( msg.getSenderID(), mb1.toMessage()); logError(mb1.toMessage()); try { senderHandler.send(errMsg); } catch (IOException ioe1) { // an error happened on the sender session trying to recover // from an error on the receiver session. // We don't have much solution left beside closing the sessions. stopServer(senderHandler, false); stopServer(targetHandler, false); } // TODO Handle error properly (sender timeout in addition) // We log the error. The requestor will detect a timeout or // any other failure on the connection. logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(Integer.toString(msg .getDestination()))); } } } } private void replyWithUnroutableMsgType(ServerHandler msgEmitter, RoutableMsg msg) { String msgClassname = msg.getClass().getCanonicalName(); logError(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname)); MessageBuilder mb = new MessageBuilder(); mb.append(NOTE_ERR_ROUTING_TO_SERVER.get(msgClassname)); mb.append("serverID:").append(msg.getDestination()); ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), mb.toMessage()); try { msgEmitter.send(errMsg); } catch (IOException ignored) { // an error happened on the sender session trying to recover // from an error on the receiver session. // Not much more we can do at this point. } } private void replyWithUnreachablePeerMsg(ServerHandler msgEmitter, RoutableMsg msg) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( this.baseDn, Integer.toString(msg.getDestination()))); mb.append(" In Replication Server=").append( this.localReplicationServer.getMonitorInstanceName()); mb.append(" unroutable message =").append(msg.getClass().getSimpleName()); mb.append(" Details:routing table is empty"); final Message message = mb.toMessage(); logError(message); ErrorMsg errMsg = new ErrorMsg(this.localReplicationServer.getServerId(), msg.getSenderID(), message); try { msgEmitter.send(errMsg); } catch (IOException ignored) { // TODO Handle error properly (sender timeout in addition) /* * An error happened trying to send an error msg to this server. * Log an error and close the connection to this server. */ MessageBuilder mb2 = new MessageBuilder(); mb2.append(ERR_CHANGELOG_ERROR_SENDING_ERROR.get(this.toString())); mb2.append(stackTraceToSingleLineString(ignored)); logError(mb2.toMessage()); stopServer(msgEmitter, false); } } private void forwardMsgToAllServers(RoutableMsg msg, List<ServerHandler> servers, ServerHandler msgEmitter) { for (ServerHandler targetHandler : servers) { try { targetHandler.send(msg); } catch (IOException ioe) { /* * An error happened trying to send a routable message to its * destination server. * Send back an error to the originator of the message. */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_NO_REACHABLE_PEER_IN_THE_DOMAIN.get( this.baseDn, Integer.toString(msg.getDestination()))); mb.append(" unroutable message =" + msg.getClass().getSimpleName()); mb.append(" Details: " + ioe.getLocalizedMessage()); final Message message = mb.toMessage(); logError(message); ErrorMsg errMsg = new ErrorMsg(msg.getSenderID(), message); try { msgEmitter.send(errMsg); } catch (IOException ioe1) { // an error happened on the sender session trying to recover // from an error on the receiver session. // We don't have much solution left beside closing the sessions. stopServer(msgEmitter, false); stopServer(targetHandler, false); } // TODO Handle error properly (sender timeout in addition) } } } /** * Creates a new monitor message including monitoring information for the @@ -1720,13 +1753,11 @@ public MonitorMsg createGlobalTopologyMonitorMsg( int sender, int destination, MonitorData monitorData) { MonitorMsg returnMsg = new MonitorMsg(sender, destination); final MonitorMsg returnMsg = new MonitorMsg(sender, destination); returnMsg.setReplServerDbState(getDbServerState()); // Add the informations about the Replicas currently in // the topology. // Add the informations about the Replicas currently in the topology. Iterator<Integer> it = monitorData.ldapIterator(); while (it.hasNext()) { @@ -1736,8 +1767,7 @@ monitorData.getApproxFirstMissingDate(replicaId), true); } // Add the information about the Replication Servers // currently in the topology. // Add the information about the RSs currently in the topology. it = monitorData.rsIterator(); while (it.hasNext()) { @@ -1787,16 +1817,14 @@ for (DataServerHandler lsh : this.directoryServers.values()) { monitorMsg.setServerState(lsh.getServerId(), lsh.getServerState(), lsh.getApproxFirstMissingDate(), true); lsh.getServerState(), lsh.getApproxFirstMissingDate(), true); } // Same for the connected RS for (ReplicationServerHandler rsh : this.replicationServers.values()) { monitorMsg.setServerState(rsh.getServerId(), rsh.getServerState(), rsh.getApproxFirstMissingDate(), false); rsh.getServerState(), rsh.getApproxFirstMissingDate(), false); } // Populate the RS state in the msg from the DbState @@ -1821,15 +1849,12 @@ stopAllServers(true); stopDbHandlers(); shutdownDbHandlers(); } /** * Stop the dbHandlers . */ private void stopDbHandlers() /** Shutdown all the dbHandlers. */ private void shutdownDbHandlers() { // Shutdown the dbHandlers synchronized (sourceDbHandlers) { for (DbHandler dbHandler : sourceDbHandlers.values()) @@ -1964,9 +1989,7 @@ // Create info for the local RS List<RSInfo> rsInfos = new ArrayList<RSInfo>(); RSInfo localRSInfo = toRSInfo(replicationServer, generationId); rsInfos.add(localRSInfo); rsInfos.add(toRSInfo(localReplicationServer, generationId)); return new TopologyMsg(dsInfos, rsInfos); } @@ -1982,10 +2005,8 @@ */ public TopologyMsg createTopologyMsgForDS(int destDsId) { List<DSInfo> dsInfos = new ArrayList<DSInfo>(); List<RSInfo> rsInfos = new ArrayList<RSInfo>(); // Go through every DSs (except recipient of msg) List<DSInfo> dsInfos = new ArrayList<DSInfo>(); for (DataServerHandler serverHandler : directoryServers.values()) { if (serverHandler.getServerId() == destDsId) @@ -1995,15 +2016,15 @@ dsInfos.add(serverHandler.toDSInfo()); } List<RSInfo> rsInfos = new ArrayList<RSInfo>(); // Add our own info (local RS) RSInfo localRSInfo = toRSInfo(replicationServer, generationId); rsInfos.add(localRSInfo); rsInfos.add(toRSInfo(localReplicationServer, generationId)); // Go through every peer RSs (and get their connected DSs), also add info // for RSs for (ReplicationServerHandler serverHandler : replicationServers.values()) { // Put RS info rsInfos.add(serverHandler.toRSInfo()); serverHandler.addDSInfos(dsInfos); @@ -2354,11 +2375,11 @@ logError(mb.toMessage()); } } stopDbHandlers(); shutdownDbHandlers(); } try { replicationServer.clearGenerationId(baseDn); localReplicationServer.clearGenerationId(baseDn); } catch (Exception e) { // TODO: i18n @@ -2381,7 +2402,8 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() TRACER.debugInfo("In " + this.localReplicationServer.getMonitorInstanceName() + " baseDN=" + baseDn + " isDegraded serverId=" + serverId + " given local generation Id=" + this.generationId); } @@ -2398,7 +2420,8 @@ if (debugEnabled()) { TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() TRACER.debugInfo("In " + this.localReplicationServer.getMonitorInstanceName() + " baseDN=" + baseDn + " Compute degradation of serverId=" + serverId + " LS server generation Id=" + handler.getGenerationId()); } @@ -2411,7 +2434,7 @@ */ public ReplicationServer getReplicationServer() { return replicationServer; return localReplicationServer; } /** @@ -2557,7 +2580,7 @@ int serverId = rs.getServerId(); MonitorRequestMsg msg = new MonitorRequestMsg( this.replicationServer.getServerId(), serverId); this.localReplicationServer.getServerId(), serverId); try { rs.send(msg); @@ -2684,7 +2707,7 @@ // - from our own local db state // - whatever they are directly or indirectly connected ServerState dbServerState = getDbServerState(); pendingMonitorData.setRSState(replicationServer.getServerId(), pendingMonitorData.setRSState(localReplicationServer.getServerId(), dbServerState); for (int serverId : dbServerState) { ChangeNumber storedCN = dbServerState.getChangeNumber(serverId); @@ -2744,7 +2767,7 @@ while (rsidIterator.hasNext()) { int rsid = rsidIterator.next(); if (rsid == replicationServer.getServerId()) if (rsid == localReplicationServer.getServerId()) { // this is the latency of the remote RSi regarding the current RS // let's update the fmd of my connected LS @@ -2895,7 +2918,7 @@ if (statusAnalyzer == null) { int degradedStatusThreshold = replicationServer.getDegradedStatusThreshold(); localReplicationServer.getDegradedStatusThreshold(); if (degradedStatusThreshold > 0) // 0 means no status analyzer { statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold); @@ -2946,7 +2969,7 @@ if (monitoringPublisher == null) { long period = replicationServer.getMonitoringPublisherPeriod(); localReplicationServer.getMonitoringPublisherPeriod(); if (period > 0) // 0 means no monitoring publisher { monitoringPublisher = new MonitoringPublisher(this, period); @@ -3004,8 +3027,8 @@ @Override public String getMonitorInstanceName() { return "Replication server RS(" + replicationServer.getServerId() + ") " + replicationServer.getServerURL() + ",cn=" return "Replication server RS(" + localReplicationServer.getServerId() + ") " + localReplicationServer.getServerURL() + ",cn=" + baseDn.replace(',', '_').replace('=', '_') + ",cn=Replication"; } @@ -3018,9 +3041,9 @@ // publish the server id and the port number. List<Attribute> attributes = new ArrayList<Attribute>(); attributes.add(Attributes.create("replication-server-id", String.valueOf(replicationServer.getServerId()))); String.valueOf(localReplicationServer.getServerId()))); attributes.add(Attributes.create("replication-server-port", String.valueOf(replicationServer.getReplicationPort()))); String.valueOf(localReplicationServer.getReplicationPort()))); // Add all the base DNs that are known by this replication server. attributes.add(Attributes.create("domain-name", baseDn)); @@ -3032,7 +3055,7 @@ MonitorData md = getDomainMonitorData(); // Missing changes long missingChanges = md.getMissingChangesRS(replicationServer long missingChanges = md.getMissingChangesRS(localReplicationServer .getServerId()); attributes.add(Attributes.create("missing-changes", String.valueOf(missingChanges))); @@ -3201,30 +3224,13 @@ } */ boolean serverIdConnected = false; if (directoryServers.containsKey(serverId)) { serverIdConnected = true; } else { // not directly connected for (ReplicationServerHandler rsh : replicationServers.values()) { if (rsh.isRemoteLDAPServer(serverId)) { serverIdConnected = true; break; } } } if (!serverIdConnected) if (!isServerConnected(serverId)) { if (debugEnabled()) { TRACER.debugInfo("In " + "Replication Server " + replicationServer.getReplicationPort() + " " + baseDn + " " + replicationServer.getServerId() + " Server " + serverId + localReplicationServer.getReplicationPort() + " " + baseDn + " " + localReplicationServer.getServerId() + " Server " + serverId + " is not considered for eligibility ... potentially down"); } continue; @@ -3246,13 +3252,31 @@ if (debugEnabled()) { TRACER.debugInfo("In Replication Server " + replicationServer.getReplicationPort() + " " + baseDn + " " + replicationServer.getServerId() + localReplicationServer.getReplicationPort() + " " + baseDn + " " + localReplicationServer.getServerId() + " getEligibleCN() returns result =" + eligibleCN); } return eligibleCN; } private boolean isServerConnected(int serverId) { if (directoryServers.containsKey(serverId)) { return true; } // not directly connected for (ReplicationServerHandler rsHandler : replicationServers.values()) { if (rsHandler.isRemoteLDAPServer(serverId)) { return true; } } return false; } /** * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp) @@ -3299,8 +3323,8 @@ TRACER.debugCaught(DebugLogLevel.ERROR, e); logError(ERR_CHANGELOG_ERROR_SENDING_MSG .get("Replication Server " + replicationServer.getReplicationPort() + " " + baseDn + " " + replicationServer.getServerId())); + localReplicationServer.getReplicationPort() + " " + baseDn + " " + localReplicationServer.getServerId())); stopServer(rsHandler, false); } } opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -27,11 +27,6 @@ */ package org.opends.server.replication.server; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.protocol.ProtocolVersion.*; import java.io.IOException; import java.util.List; import java.util.Map; @@ -46,6 +41,11 @@ import org.opends.server.replication.protocol.*; import org.opends.server.types.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.protocol.ProtocolVersion.*; /** * This class defines a server handler, which handles all interaction with a * peer replication server. @@ -82,10 +82,8 @@ generationId = inReplServerStartMsg.getGenerationId(); serverId = inReplServerStartMsg.getServerId(); serverURL = inReplServerStartMsg.getServerURL(); int separator = serverURL.lastIndexOf(':'); serverAddressURL = session.getRemoteAddress() + ":" + serverURL.substring(separator + 1); final String port = serverURL.substring(serverURL.lastIndexOf(':') + 1); serverAddressURL = session.getRemoteAddress() + ":" + port; setBaseDNAndDomain(inReplServerStartMsg.getBaseDn(), false); setInitialServerState(inReplServerStartMsg.getServerState()); setSendWindowSize(inReplServerStartMsg.getWindowSize()); @@ -119,8 +117,7 @@ getReplicationServerId(), getReplicationServerURL(), getBaseDN(), maxRcvWindow, replicationServerDomain.getDbServerState(), localGenerationId, sslEncryption, getLocalGroupId(), replicationServerDomain.getReplicationServer() .getDegradedStatusThreshold()); getLocalGroupId(), replicationServer.getDegradedStatusThreshold()); send(outReplServerStartMsg); return outReplServerStartMsg; } @@ -296,7 +293,7 @@ finally { // Release domain if ((replicationServerDomain != null) && if (replicationServerDomain != null && replicationServerDomain.hasLock()) replicationServerDomain.release(); } @@ -374,11 +371,9 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + this + " RS V1 with serverID=" + serverId + " is connected with the right generation ID"); TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + " " + this + " RS V1 with serverID=" + serverId + " is connected with the right generation ID"); } } else { @@ -420,10 +415,9 @@ { TRACER.debugCaught(DebugLogLevel.ERROR, e); } Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get(Integer .toString(inReplServerStartMsg.getServerId()), Integer .toString(replicationServerDomain.getReplicationServer() .getServerId())); Message errMessage = ERR_RS_DISCONNECTED_DURING_HANDSHAKE.get( Integer.toString(inReplServerStartMsg.getServerId()), Integer.toString(replicationServer.getServerId())); abortStart(errMessage); } catch (DirectoryException e) @@ -444,7 +438,7 @@ } finally { if ((replicationServerDomain != null) && if (replicationServerDomain != null && replicationServerDomain.hasLock()) replicationServerDomain.release(); } @@ -489,12 +483,10 @@ // connection attempt. return null; } else { Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg .getClass().getCanonicalName(), "TopologyMsg"); throw new DirectoryException(ResultCode.OTHER, message); } Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get( msg.getClass().getCanonicalName(), "TopologyMsg"); throw new DirectoryException(ResultCode.OTHER, message); } // Remote RS sent his topo msg @@ -518,10 +510,9 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " RS with serverID=" + serverId + " is connected with the right generation ID, same as local =" TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + " RS with serverID=" + serverId + " is connected with the right generation ID, same as local =" + generationId); } } @@ -541,42 +532,40 @@ { if (localGenerationId > 0) { // the local RS is initialized if (generationId > 0) { // the remote RS is initialized. // If not, there's nothing to do anyway. if (generationId != localGenerationId) { /* Either: * * 1) The 2 RS have different generationID * replicationServerDomain.getGenerationIdSavedStatus() == true * * if the present RS has received changes regarding its * gen ID and so won't change without a reset * then we are just degrading the peer. * * 2) This RS has never received any changes for the current * generation ID. * * Example case: * - we are in RS1 * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) * - RS1 has genId1 from LS1 /genId1 comes from data in suffix * - we are in RS1 and we receive a START msg from RS2 * - Each RS keeps its genID / is degraded and when LS2 * will be populated from LS1 everything will become ok. * * Issue: * FIXME : Would it be a good idea in some cases to just set the * gen ID received from the peer RS specially if the peer has a * non null state and we have a null state ? * replicationServerDomain.setGenerationId(generationId, false); */ Message message = WARN_BAD_GENERATION_ID_FROM_RS.get( serverId, session.getReadableRemoteAddress(), generationId, getBaseDN(), getReplicationServerId(), localGenerationId); logError(message); } if (generationId > 0 // the remote RS is initialized. If not, there's nothing to do anyway. && generationId != localGenerationId) { /* Either: * * 1) The 2 RS have different generationID * replicationServerDomain.getGenerationIdSavedStatus() == true * * if the present RS has received changes regarding its * gen ID and so won't change without a reset * then we are just degrading the peer. * * 2) This RS has never received any changes for the current * generation ID. * * Example case: * - we are in RS1 * - RS2 has genId2 from LS2 (genId2 <=> no data in LS2) * - RS1 has genId1 from LS1 /genId1 comes from data in suffix * - we are in RS1 and we receive a START msg from RS2 * - Each RS keeps its genID / is degraded and when LS2 * will be populated from LS1 everything will become ok. * * Issue: * FIXME : Would it be a good idea in some cases to just set the * gen ID received from the peer RS specially if the peer has a * non null state and we have a null state ? * replicationServerDomain.setGenerationId(generationId, false); */ Message message = WARN_BAD_GENERATION_ID_FROM_RS.get( serverId, session.getReadableRemoteAddress(), generationId, getBaseDN(), getReplicationServerId(), localGenerationId); logError(message); } } else @@ -655,9 +644,7 @@ groupId = rsInfo.getGroupId(); weight = rsInfo.getWeight(); /** * Store info for DSs connected to the peer RS */ // Store info for DSs connected to the peer RS List<DSInfo> dsInfos = topoMsg.getDsList(); synchronized (remoteDirectoryServers) @@ -688,18 +675,18 @@ * When this handler is connected to a replication server, specifies if * a wanted server is connected to this replication server. * * @param wantedServer The server we want to know if it is connected * @param serverId The server we want to know if it is connected * to the replication server represented by this handler. * @return boolean True is the wanted server is connected to the server * represented by this handler. */ public boolean isRemoteLDAPServer(int wantedServer) public boolean isRemoteLDAPServer(int serverId) { synchronized (remoteDirectoryServers) { for (LightweightServerHandler server : remoteDirectoryServers.values()) { if (wantedServer == server.getServerId()) if (serverId == server.getServerId()) { return true; } @@ -765,9 +752,8 @@ MonitorData md = replicationServerDomain.getDomainMonitorData(); // Missing changes long missingChanges = md.getMissingChangesRS(serverId); attributes.add(Attributes.create("missing-changes", String.valueOf(missingChanges))); String.valueOf(md.getMissingChangesRS(serverId)))); /* get the Server State */ AttributeBuilder builder = new AttributeBuilder("server-state"); @@ -791,17 +777,10 @@ { if (serverId != 0) { StringBuilder builder = new StringBuilder("Replication server RS("); builder.append(serverId); builder.append(") for domain \""); builder.append(replicationServerDomain.getBaseDn()); builder.append("\""); return builder.toString(); return "Replication server RS(" + serverId + ") for domain \"" + replicationServerDomain.getBaseDn() + "\""; } else { return "Unknown server"; } return "Unknown server"; } /** opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -44,8 +44,7 @@ import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.opends.server.replication.server.changelog.je.ReplicationDB .ReplServerDBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.*; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.InitializationException; @@ -265,22 +264,20 @@ * managed by this dbHandler and starting at the position defined * by a given changeNumber. * * @param changeNumber The position where the iterator must start. * * @param startAfterCN The position where the iterator must start. * @return a new ReplicationIterator that allows to browse the db * managed by this dbHandler and starting at the position defined * by a given changeNumber. * * @throws ChangelogException if a database problem happened. */ public ReplicationIterator generateIterator(ChangeNumber changeNumber) public ReplicationIterator generateIterator(ChangeNumber startAfterCN) throws ChangelogException { if (changeNumber == null) if (startAfterCN == null) { flush(); } return new JEReplicationIterator(db, changeNumber, this); return new JEReplicationIterator(db, startAfterCN, this); } /** opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java
@@ -32,8 +32,7 @@ import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.opends.server.replication.server.changelog.je.ReplicationDB .ReplServerDBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.*; /** * Berkeley DB JE implementation of IReplicationIterator. @@ -52,20 +51,20 @@ * releaseCursor() method. * * @param db The db where the iterator must be created. * @param changeNumber The ChangeNumber after which the iterator must start. * @param startAfterCN The ChangeNumber after which the iterator must start. * @param dbHandler The associated DbHandler. * @throws ChangelogException if a database problem happened. */ public JEReplicationIterator(ReplicationDB db, ChangeNumber changeNumber, public JEReplicationIterator(ReplicationDB db, ChangeNumber startAfterCN, DbHandler dbHandler) throws ChangelogException { this.db = db; this.dbHandler = dbHandler; this.lastNonNullCurrentCN = changeNumber; this.lastNonNullCurrentCN = startAfterCN; try { cursor = db.openReadCursor(changeNumber); cursor = db.openReadCursor(startAfterCN); } catch(Exception e) { @@ -79,7 +78,7 @@ dbHandler.flush(); // look again in the db cursor = db.openReadCursor(changeNumber); cursor = db.openReadCursor(startAfterCN); if (cursor == null) { throw new ChangelogException(Message.raw("no new change"));