opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -344,8 +344,8 @@ /** * Run method for this class. * Periodically Flushes the ReplicationCache from memory to the stable storage * and trims the old updates. * Periodically Flushes the ReplicationServerDomain cache from memory to the * stable storage and trims the old updates. */ public void run() { opends/src/server/org/opends/server/replication/server/ReplServerAckMessageList.java
@@ -35,7 +35,7 @@ public class ReplServerAckMessageList extends AckMessageList { private short replicationServerId; private ReplicationCache replicationCache; private ReplicationServerDomain replicationServerDomain; /** * Creates a new AckMessageList for a given ChangeNumber. @@ -45,17 +45,17 @@ * original change. * @param replicationServerId The Identifier of the replication server * from which the change was received. * @param replicationCache The ReplicationCache from which he change * was received. * @param replicationServerDomain The ReplicationServerDomain from which he * change was received. */ public ReplServerAckMessageList(ChangeNumber changeNumber, int numExpectedAcks, short replicationServerId, ReplicationCache replicationCache) ReplicationServerDomain replicationServerDomain) { super(changeNumber, numExpectedAcks); this.replicationServerId = replicationServerId; this.replicationCache = replicationCache; this.replicationServerDomain = replicationServerDomain; } /** @@ -70,14 +70,14 @@ } /** * Get the replicationCache of the replication server from which we received * the change. * @return Returns the replicationCache of the replication server from which * we received the change . * Get the replicationServerDomain of the replication server from which we * received the change. * @return Returns the replicationServerDomain of the replication server from * which we received the change . */ public ReplicationCache getChangelogCache() public ReplicationServerDomain getChangelogCache() { return replicationCache; return replicationServerDomain; } opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -347,13 +347,13 @@ //This method only returns the number of actual change entries, the //domain and any baseDN entries are not counted. long retNum=0; Iterator<ReplicationCache> rcachei = server.getCacheIterator(); Iterator<ReplicationServerDomain> rcachei = server.getCacheIterator(); if (rcachei != null) { while (rcachei.hasNext()) { ReplicationCache rc = rcachei.next(); retNum += rc.getChangesCount(); ReplicationServerDomain rsd = rcachei.next(); retNum += rsd.getChangesCount(); } } return retNum; @@ -531,18 +531,18 @@ { List<DN> includeBranches = exportConfig.getIncludeBranches(); DN baseDN; ArrayList<ReplicationCache> exportContainers = new ArrayList<ReplicationCache>(); ArrayList<ReplicationServerDomain> exportContainers = new ArrayList<ReplicationServerDomain>(); if(server == null) { Message message = ERR_REPLICATONBACKEND_EXPORT_LDIF_FAILED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,message); } Iterator<ReplicationCache> rcachei = server.getCacheIterator(); if (rcachei != null) Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator(); if (rsdi != null) { while (rcachei.hasNext()) while (rsdi.hasNext()) { ReplicationCache rc = rcachei.next(); ReplicationServerDomain rc = rsdi.next(); // Skip containers that are not covered by the include branches. baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN); @@ -598,7 +598,7 @@ // Iterate through the containers. try { for (ReplicationCache exportContainer : exportContainers) for (ReplicationServerDomain exportContainer : exportContainers) { if (exportConfig.isCancelled()) { @@ -642,7 +642,7 @@ /* * Exports the root changes of the export, and one entry by domain. */ private void exportRootChanges(List<ReplicationCache> exportContainers, private void exportRootChanges(List<ReplicationServerDomain> exportContainers, LDIFExportConfig exportConfig, LDIFWriter ldifWriter) { Map<AttributeType,List<Attribute>> attributes = @@ -668,7 +668,7 @@ } catch (Exception e) {} for (ReplicationCache exportContainer : exportContainers) for (ReplicationServerDomain exportContainer : exportContainers) { if (exportConfig != null && exportConfig.isCancelled()) { @@ -725,21 +725,21 @@ } /** * Processes the changes for a given ReplicationCache. * Processes the changes for a given ReplicationServerDomain. */ private void processContainer(ReplicationCache rc, private void processContainer(ReplicationServerDomain rsd, LDIFExportConfig exportConfig, LDIFWriter ldifWriter, SearchOperation searchOperation) { // Walk through the servers for (Short serverId : rc.getServers()) for (Short serverId : rsd.getServers()) { if (exportConfig != null && exportConfig.isCancelled()) { break; } ReplicationIterator ri = rc.getChangelogIterator(serverId, ReplicationIterator ri = rsd.getChangelogIterator(serverId, null); if (ri != null) @@ -1139,8 +1139,8 @@ // Get the base DN, scope, and filter for the search. DN searchBaseDN = searchOperation.getBaseDN(); DN baseDN; ArrayList<ReplicationCache> searchContainers = new ArrayList<ReplicationCache>(); ArrayList<ReplicationServerDomain> searchContainers = new ArrayList<ReplicationServerDomain>(); //This check is for GroupManager initialization. It currently doesn't //come into play because the replication server variable is null in @@ -1202,25 +1202,25 @@ } // Walk through all entries and send the ones that match. Iterator<ReplicationCache> rcachei = server.getCacheIterator(); if (rcachei != null) Iterator<ReplicationServerDomain> rsdi = server.getCacheIterator(); if (rsdi != null) { while (rcachei.hasNext()) while (rsdi.hasNext()) { ReplicationCache rc = rcachei.next(); ReplicationServerDomain rsd = rsdi.next(); // Skip containers that are not covered by the include branches. baseDN = DN.decode(rc.getBaseDn().toString() + "," + EXPORT_BASE_DN); baseDN = DN.decode(rsd.getBaseDn().toString() + "," + EXPORT_BASE_DN); if (searchBaseDN.isDescendantOf(baseDN) || searchBaseDN.isAncestorOf(baseDN)) { searchContainers.add(rc); searchContainers.add(rsd); } } } for (ReplicationCache exportContainer : searchContainers) for (ReplicationServerDomain exportContainer : searchContainers) { processContainer(exportContainer, null, null, searchOperation); } opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -78,9 +78,10 @@ this.dbenv = dbenv; this.replicationServer = replicationServer; // Get or create the associated Replicationcache and Db. // Get or create the associated ReplicationServerDomain and Db. db = dbenv.getOrAddDb(serverId, baseDn, replicationServer.getReplicationCache(baseDn, true).getGenerationId()); replicationServer.getReplicationServerDomain(baseDn, true).getGenerationId()); } /** opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -184,7 +184,7 @@ " Has read baseDn=" + baseDn + " generationId=" + generationId); replicationServer.getReplicationCache(baseDn, true). replicationServer.getReplicationServerDomain(baseDn, true). setGenerationId(generationId, true); } } @@ -259,7 +259,7 @@ DbHandler dbHandler = new DbHandler(serverId, baseDn, replicationServer, this, 1); replicationServer.getReplicationCache(baseDn, true). replicationServer.getReplicationServerDomain(baseDn, true). setDbHandler(serverId, dbHandler); } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -88,7 +88,8 @@ * and publisher objects for * connection with LDAP servers and with replication servers * * It is responsible for creating the replication server cache and managing it * It is responsible for creating the replication server replicationServerDomain * and managing it */ public class ReplicationServer extends MonitorProvider<MonitorProviderCfg> implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>, @@ -108,8 +109,8 @@ /* This table is used to store the list of dn for which we are currently * handling servers. */ private ConcurrentHashMap<DN, ReplicationCache> baseDNs = new ConcurrentHashMap<DN, ReplicationCache>(); private ConcurrentHashMap<DN, ReplicationServerDomain> baseDNs = new ConcurrentHashMap<DN, ReplicationServerDomain>(); private String localURL = "null"; private boolean shutdown = false; @@ -279,9 +280,10 @@ * periodically check that we are connected to all other * replication servers and if not establish the connection */ for (ReplicationCache replicationCache: baseDNs.values()) for (ReplicationServerDomain replicationServerDomain: baseDNs.values()) { Set<String> connectedReplServers = replicationCache.getChangelogs(); Set<String> connectedReplServers = replicationServerDomain.getChangelogs(); /* * check that all replication server in the config are in the connected * Set. If not create the connection @@ -301,7 +303,7 @@ && (serverAddress.compareTo(this.localURL) != 0) && (!connectedReplServers.contains(serverAddress))) { this.connect(serverURL, replicationCache.getBaseDn()); this.connect(serverURL, replicationServerDomain.getBaseDn()); } } catch (IOException e) @@ -396,7 +398,7 @@ this); /* * create replicationServer cache * create replicationServer replicationServerDomain */ serverId = changelogId; @@ -461,28 +463,32 @@ } /** * Get the ReplicationCache associated to the base DN given in parameter. * Get the ReplicationServerDomain associated to the base DN given in * parameter. * * @param baseDn The base Dn for which the ReplicationCache must be returned. * @param create Specifies whether to create the ReplicationCache if it does * not already exist. * @return The ReplicationCache associated to the base DN given in parameter. * @param baseDn The base Dn for which the ReplicationServerDomain must be * returned. * @param create Specifies whether to create the ReplicationServerDomain if * it does not already exist. * @return The ReplicationServerDomain associated to the base DN given in * parameter. */ public ReplicationCache getReplicationCache(DN baseDn, boolean create) public ReplicationServerDomain getReplicationServerDomain(DN baseDn, boolean create) { ReplicationCache replicationCache; ReplicationServerDomain replicationServerDomain; synchronized (baseDNs) { replicationCache = baseDNs.get(baseDn); if ((replicationCache == null) && (create)) replicationServerDomain = baseDNs.get(baseDn); if ((replicationServerDomain == null) && (create)) { replicationCache = new ReplicationCache(baseDn, this); baseDNs.put(baseDn, replicationCache); replicationServerDomain = new ReplicationServerDomain(baseDn, this); baseDNs.put(baseDn, replicationServerDomain); } } return replicationCache; return replicationServerDomain; } /** @@ -520,9 +526,9 @@ } // shutdown all the ChangelogCaches for (ReplicationCache replicationCache : baseDNs.values()) for (ReplicationServerDomain replicationServerDomain : baseDNs.values()) { replicationCache.shutdown(); replicationServerDomain.shutdown(); } if (dbEnv != null) @@ -539,7 +545,8 @@ * * @param id The serverId for which the dbHandler must be created. * @param baseDn The DN for which the dbHandler muste be created. * @param generationId The generationId for this server and this domain. * @param generationId The generationId for this server and this * replicationServerDomain. * @return The new DB handler for this ReplicationServer and the serverId and * DN given in parameter. * @throws DatabaseException in case of underlying database problem. @@ -551,7 +558,8 @@ } /** * Clears the generationId for the domain related to the provided baseDn. * Clears the generationId for the replicationServerDomain related to the * provided baseDn. * @param baseDn The baseDn for which to delete the generationId. * @throws DatabaseException When it occurs. */ @@ -755,7 +763,7 @@ Attribute bases = new Attribute(baseType, "base-dn", baseValues); attributes.add(bases); // Publish to monitor the generation ID by domain // Publish to monitor the generation ID by replicationServerDomain AttributeType generationIdType= DirectoryServer.getAttributeType("base-dn-generation-id", true); LinkedHashSet<AttributeValue> generationIdValues = @@ -763,9 +771,10 @@ for (DN base : baseDNs.keySet()) { long generationId=-1; ReplicationCache cache = getReplicationCache(base, false); if (cache != null) generationId = cache.getGenerationId(); ReplicationServerDomain replicationServerDomain = getReplicationServerDomain(base, false); if (replicationServerDomain != null) generationId = replicationServerDomain.getGenerationId(); generationIdValues.add(new AttributeValue(generationIdType, base.toString() + " " + generationId)); } @@ -777,17 +786,18 @@ } /** * Get the value of generationId for the replication domain * Get the value of generationId for the replication replicationServerDomain * associated with the provided baseDN. * * @param baseDN The baseDN of the domain. * @param baseDN The baseDN of the replicationServerDomain. * @return The value of the generationID. */ public long getGenerationId(DN baseDN) { ReplicationCache rc = this.getReplicationCache(baseDN, false); if (rc!=null) return rc.getGenerationId(); ReplicationServerDomain rsd = this.getReplicationServerDomain(baseDN, false); if (rsd!=null) return rsd.getGenerationId(); return -1; } @@ -962,7 +972,7 @@ " Export starts"); if (backend.getBackendID().equals(backendId)) { // Retrieves the backend related to this domain // Retrieves the backend related to this replicationServerDomain // backend = ReplicationBackend b = (ReplicationBackend)DirectoryServer.getBackend(backendId); @@ -980,11 +990,11 @@ } /** * Returns an iterator on the list of replicationCache. * Returns an iterator on the list of replicationServerDomain. * Returns null if none. * @return the iterator. */ public Iterator<ReplicationCache> getCacheIterator() public Iterator<ReplicationServerDomain> getCacheIterator() { if (!baseDNs.isEmpty()) return baseDNs.values().iterator(); @@ -997,13 +1007,13 @@ */ public void clearDb() { Iterator<ReplicationCache> rcachei = getCacheIterator(); Iterator<ReplicationServerDomain> rcachei = getCacheIterator(); if (rcachei != null) { while (rcachei.hasNext()) { ReplicationCache rc = rcachei.next(); rc.clearDbs(); ReplicationServerDomain rsd = rcachei.next(); rsd.clearDbs(); } } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
File was renamed from opends/src/server/org/opends/server/replication/server/ReplicationCache.java @@ -70,7 +70,7 @@ * received to the disk and for trimming them * Decision to trim can be based on disk space or age of the message */ public class ReplicationCache public class ReplicationServerDomain { private Object flowControlLock = new Object(); private DN baseDn = null; @@ -119,13 +119,13 @@ private static final DebugTracer TRACER = getTracer(); /** * Creates a new ReplicationCache associated to the DN baseDn. * Creates a new ReplicationServerDomain associated to the DN baseDn. * * @param baseDn The baseDn associated to the ReplicationCache. * @param baseDn The baseDn associated to the ReplicationServerDomain. * @param replicationServer the ReplicationServer that created this * replicationServer cache. */ public ReplicationCache(DN baseDn, ReplicationServer replicationServer) public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer) { this.baseDn = baseDn; this.replicationServer = replicationServer; @@ -531,7 +531,7 @@ } /** * Returns the change count for that ReplicationCache. * Returns the change count for that ReplicationServerDomain. * * @return the change count. */ @@ -842,7 +842,7 @@ } /** * Shutdown this ReplicationCache. * Shutdown this ReplicationServerDomain. */ public void shutdown() { @@ -890,7 +890,7 @@ @Override public String toString() { return "ReplicationCache " + baseDn; return "ReplicationServerDomain " + baseDn; } /** opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -111,7 +111,7 @@ private MsgQueue lateQueue = new MsgQueue(); private final Map<ChangeNumber, AckMessageList> waitingAcks = new HashMap<ChangeNumber, AckMessageList>(); private ReplicationCache replicationCache = null; private ReplicationServerDomain replicationServerDomain = null; private String serverURL; private int outCount = 0; // number of update sent to the server private int inCount = 0; // number of updates received from the server @@ -227,11 +227,13 @@ // This is an outgoing connection. Publish our start message. this.baseDn = baseDn; // Get or create the ReplicationCache replicationCache = replicationServer.getReplicationCache(baseDn, true); localGenerationId = replicationCache.getGenerationId(); // Get or create the ReplicationServerDomain replicationServerDomain = replicationServer.getReplicationServerDomain(baseDn, true); localGenerationId = replicationServerDomain.getGenerationId(); ServerState localServerState = replicationCache.getDbServerState(); ServerState localServerState = replicationServerDomain.getDbServerState(); ReplServerStartMessage msg = new ReplServerStartMessage(replicationServerId, replicationServerURL, baseDn, windowSize, localServerState, @@ -298,12 +300,13 @@ serverIsLDAPserver = true; // Get or Create the ReplicationCache replicationCache = replicationServer.getReplicationCache(this.baseDn, true); localGenerationId = replicationCache.getGenerationId(); // Get or Create the ReplicationServerDomain replicationServerDomain = replicationServer.getReplicationServerDomain(this.baseDn, true); localGenerationId = replicationServerDomain.getGenerationId(); ServerState localServerState = replicationCache.getDbServerState(); ServerState localServerState = replicationServerDomain.getDbServerState(); // This an incoming connection. Publish our start message ReplServerStartMessage myStartMsg = new ReplServerStartMessage(replicationServerId, replicationServerURL, @@ -322,9 +325,10 @@ if (debugEnabled()) { Set<String> ss = this.serverState.toStringSet(); Set<String> lss = replicationCache.getDbServerState().toStringSet(); TRACER.debugInfo("In " + replicationCache.getReplicationServer(). getMonitorInstanceName() + Set<String> lss = replicationServerDomain.getDbServerState().toStringSet(); TRACER.debugInfo("In " + replicationServerDomain. getReplicationServer().getMonitorInstanceName() + ", SH received START from LS serverId=" + serverId + " baseDN=" + this.baseDn + " generationId=" + generationId + @@ -376,7 +380,7 @@ } else { replicationCache.setGenerationId(generationId, false); replicationServerDomain.setGenerationId(generationId, false); } } } @@ -396,11 +400,11 @@ this.baseDn = receivedMsg.getBaseDn(); if (baseDn == null) { // Get or create the ReplicationCache replicationCache = replicationServer.getReplicationCache(this.baseDn, true); localGenerationId = replicationCache.getGenerationId(); ServerState serverState = replicationCache.getDbServerState(); // Get or create the ReplicationServerDomain replicationServerDomain = replicationServer. getReplicationServerDomain(this.baseDn, true); localGenerationId = replicationServerDomain.getGenerationId(); ServerState serverState = replicationServerDomain.getDbServerState(); // The session initiator decides whether to use SSL. sslEncryption = receivedMsg.getSSLEncryption(); @@ -431,9 +435,10 @@ if (debugEnabled()) { Set<String> ss = this.serverState.toStringSet(); Set<String> lss = replicationCache.getDbServerState().toStringSet(); TRACER.debugInfo("In " + replicationCache.getReplicationServer(). getMonitorInstanceName() + Set<String> lss = replicationServerDomain.getDbServerState().toStringSet(); TRACER.debugInfo("In " + replicationServerDomain. getReplicationServer().getMonitorInstanceName() + ", SH received START from RS serverId=" + serverId + " baseDN=" + this.baseDn + " generationId=" + generationId + @@ -448,7 +453,8 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + replicationCache.getReplicationServer(). TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " RS with serverID=" + serverId + " is connected with the right generation ID"); } @@ -464,7 +470,7 @@ if (generationId != localGenerationId) { // if the 2 RS have different generationID if (replicationCache.getGenerationIdSavedStatus()) if (replicationServerDomain.getGenerationIdSavedStatus()) { // it the present RS has received changes regarding its // gen ID and so won't change without a reset @@ -497,7 +503,8 @@ // set the gen ID received from the peer RS // specially if the peer has a non nul state and // we have a nul state ? // replicationCache.setGenerationId(generationId, false); // replicationServerDomain. // setGenerationId(generationId, false); Message message = NOTE_BAD_GENERATION_ID.get( this.baseDn.toNormalizedString(), Short.toString(receivedMsg.getServerId()), @@ -519,7 +526,7 @@ else { // The local RS is not initialized - take the one received replicationCache.setGenerationId(generationId, false); replicationServerDomain.setGenerationId(generationId, false); } } } @@ -529,18 +536,18 @@ return; // we did not recognize the message, ignore it } // Get or create the ReplicationCache replicationCache = replicationServer.getReplicationCache(this.baseDn, true); // Get or create the ReplicationServerDomain replicationServerDomain = replicationServer. getReplicationServerDomain(this.baseDn,true); boolean started; if (serverIsLDAPserver) { started = replicationCache.startServer(this); started = replicationServerDomain.startServer(this); } else { started = replicationCache.startReplicationServer(this); started = replicationServerDomain.startReplicationServer(this); } if (started) @@ -548,8 +555,10 @@ // sendWindow MUST be created before starting the writer sendWindow = new Semaphore(sendWindowSize); writer = new ServerWriter(session, serverId, this, replicationCache); reader = new ServerReader(session, serverId, this, replicationCache); writer = new ServerWriter(session, serverId, this, replicationServerDomain); reader = new ServerReader(session, serverId, this, replicationServerDomain); reader.start(); writer.start(); @@ -575,7 +584,8 @@ { if (debugEnabled()) { TRACER.debugInfo("In " + replicationCache.getReplicationServer(). TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " RS failed to start locally " + " the connection from serverID="+serverId); } @@ -812,7 +822,7 @@ * the sum of the number of missing changes for every dbHandler. */ int totalCount = 0; ServerState dbState = replicationCache.getDbServerState(); ServerState dbState = replicationServerDomain.getDbServerState(); for (short id : dbState) { int max = dbState.getMaxChangeNumber(id).getSeqnum(); @@ -926,7 +936,7 @@ * Ignore updates from a server that is degraded due to * its inconsistent generationId */ long referenceGenerationId = replicationCache.getGenerationId(); long referenceGenerationId = replicationServerDomain.getGenerationId(); if ((referenceGenerationId>0) && (referenceGenerationId != generationId)) { @@ -993,7 +1003,7 @@ saturationCount = 0; try { replicationCache.checkAllSaturation(); replicationServerDomain.checkAllSaturation(); } catch (IOException e) { @@ -1059,11 +1069,11 @@ SortedSet<ReplicationIterator> iteratorSortedSet = new TreeSet<ReplicationIterator>(comparator); /* fill the lateQueue */ for (short serverId : replicationCache.getServers()) for (short serverId : replicationServerDomain.getServers()) { ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); ReplicationIterator iterator = replicationCache.getChangelogIterator(serverId, lastCsn); replicationServerDomain.getChangelogIterator(serverId, lastCsn); if ((iterator != null) && (iterator.getChange() != null)) { iteratorSortedSet.add(iterator); @@ -1244,7 +1254,7 @@ } if (completedFlag) { replicationCache.sendAck(changeNumber, true); replicationServerDomain.sendAck(changeNumber, true); } } @@ -1274,8 +1284,9 @@ } if (completedFlag) { ReplicationCache replicationCache = ackList.getChangelogCache(); replicationCache.sendAck(changeNumber, false, ReplicationServerDomain replicationServerDomain = ackList.getChangelogCache(); replicationServerDomain.sendAck(changeNumber, false, ackList.getReplicationServerId()); } } @@ -1304,20 +1315,22 @@ * @param update The update that must be added to the list. * @param ChangelogServerId The identifier of the replicationServer that sent * the update. * @param replicationCache The ReplicationCache from which the change was * processed and to which the ack must later be sent. * @param replicationServerDomain The ReplicationServerDomain from which the * change was processed and to which the ack * must later be sent. * @param nbWaitedAck The number of ack that must be received before * the update is fully acked. */ public static void addWaitingAck( UpdateMessage update, short ChangelogServerId, ReplicationCache replicationCache, short ChangelogServerId, ReplicationServerDomain replicationServerDomain, int nbWaitedAck) { ReplServerAckMessageList ackList = new ReplServerAckMessageList(update.getChangeNumber(), nbWaitedAck, ChangelogServerId, replicationCache); ChangelogServerId, replicationServerDomain); synchronized(changelogsWaitingAcks) { changelogsWaitingAcks.put(update.getChangeNumber(), ackList); @@ -1561,7 +1574,7 @@ { if (flowControl) { if (replicationCache.restartAfterSaturation(this)) if (replicationServerDomain.restartAfterSaturation(this)) { flowControl = false; } @@ -1605,11 +1618,11 @@ public void process(RoutableMessage msg) { if (debugEnabled()) TRACER.debugInfo("In " + replicationCache.getReplicationServer(). TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " SH for remote server " + this.getMonitorInstanceName() + " processes received msg=" + msg); replicationCache.process(msg, this); replicationServerDomain.process(msg, this); } /** @@ -1623,7 +1636,7 @@ throws IOException { if (debugEnabled()) TRACER.debugInfo("In " + replicationCache.getReplicationServer(). TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " SH for remote server " + this.getMonitorInstanceName() + " sends message=" + info); @@ -1640,7 +1653,7 @@ public void receiveReplServerInfo(ReplServerInfoMessage infoMsg) { if (debugEnabled()) TRACER.debugInfo("In " + replicationCache.getReplicationServer(). TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " SH for remote server " + this.getMonitorInstanceName() + " sets replServerInfo " + "<" + infoMsg + ">"); @@ -1691,7 +1704,8 @@ public void send(RoutableMessage msg) throws IOException { if (debugEnabled()) TRACER.debugInfo("In " + replicationCache.getReplicationServer(). TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " SH for remote server " + this.getMonitorInstanceName() + " sends message=" + msg); opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -72,7 +72,7 @@ private short serverId; private ProtocolSession session; private ServerHandler handler; private ReplicationCache replicationCache; private ReplicationServerDomain replicationServerDomain; /** * Constructor for the LDAP server reader part of the replicationServer. @@ -80,16 +80,18 @@ * @param session The ProtocolSession from which to read the data. * @param serverId The server ID of the server from which we read messages. * @param handler The server handler for this server reader. * @param replicationCache The ReplicationCache for this server reader. * @param replicationServerDomain The ReplicationServerDomain for this server * reader. */ public ServerReader(ProtocolSession session, short serverId, ServerHandler handler, ReplicationCache replicationCache) ServerHandler handler, ReplicationServerDomain replicationServerDomain) { super(handler.toString() + " reader"); this.session = session; this.serverId = serverId; this.handler = handler; this.replicationCache = replicationCache; this.replicationServerDomain = replicationServerDomain; } /** @@ -100,14 +102,15 @@ if (debugEnabled()) { TRACER.debugInfo( "In RS " + replicationCache.getReplicationServer(). "In RS " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + (handler.isReplicationServer()?" RS ":" LS")+ " reader starting for serverId=" + serverId); } /* * wait on input stream * grab all incoming messages and publish them to the replicationCache * grab all incoming messages and publish them to the * replicationServerDomain */ try { @@ -118,7 +121,7 @@ if (debugEnabled()) { TRACER.debugInfo( "In RS " + replicationCache.getReplicationServer(). "In RS " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + (handler.isReplicationServer()?" From RS ":" From LS")+ " with serverId=" + serverId + " receives " + msg); @@ -127,13 +130,14 @@ { AckMessage ack = (AckMessage) msg; handler.checkWindow(); replicationCache.ack(ack, serverId); replicationServerDomain.ack(ack, serverId); } else if (msg instanceof UpdateMessage) { // Ignore update received from a replica with // a bad generation ID long referenceGenerationId = replicationCache.getGenerationId(); long referenceGenerationId = replicationServerDomain.getGenerationId(); if ((referenceGenerationId>0) && (referenceGenerationId != handler.getGenerationId())) { @@ -145,7 +149,7 @@ { UpdateMessage update = (UpdateMessage) msg; handler.decAndCheckWindow(); replicationCache.put(update, handler); replicationServerDomain.put(update, handler); } } else if (msg instanceof WindowMessage) @@ -182,7 +186,7 @@ else if (msg instanceof ResetGenerationId) { ResetGenerationId genIdMsg = (ResetGenerationId) msg; replicationCache.resetGenerationId(this.handler, genIdMsg); replicationServerDomain.resetGenerationId(this.handler, genIdMsg); } else if (msg instanceof WindowProbe) { @@ -198,19 +202,20 @@ { if (handler.isReplicationServer()) TRACER.debugInfo( "In RS " + replicationCache.getReplicationServer(). "In RS " + replicationServerDomain.getReplicationServer(). getServerId() + " Receiving replServerInfo from " + handler.getServerId() + " baseDn=" + replicationCache.getBaseDn() + " baseDn=" + replicationServerDomain.getBaseDn() + " genId=" + infoMsg.getGenerationId()); } if (replicationCache.getGenerationId()<0) if (replicationServerDomain.getGenerationId()<0) { // Here is the case where a ReplicationServer receives from // another ReplicationServer the generationId for a domain // for which the generation ID has never been set. replicationCache.setGenerationId(infoMsg.getGenerationId(), false); replicationServerDomain. setGenerationId(infoMsg.getGenerationId(),false); } else { @@ -221,19 +226,20 @@ // If we have generationId set locally and no server currently // connected for that domain in the topology then we may also // reset the generationId localy. replicationCache.mayResetGenerationId(); replicationServerDomain.mayResetGenerationId(); } if (replicationCache.getGenerationId() != infoMsg.getGenerationId()) if (replicationServerDomain.getGenerationId() != infoMsg.getGenerationId()) { Message message = NOTE_BAD_GENERATION_ID.get( replicationCache.getBaseDn().toNormalizedString(), replicationServerDomain.getBaseDn().toNormalizedString(), Short.toString(handler.getServerId()), Long.toString(infoMsg.getGenerationId()), Long.toString(replicationCache.getGenerationId())); Long.toString(replicationServerDomain.getGenerationId())); ErrorMessage errorMsg = new ErrorMessage( replicationCache.getReplicationServer().getServerId(), replicationServerDomain.getReplicationServer().getServerId(), handler.getServerId(), message); session.publish(errorMsg); @@ -260,7 +266,7 @@ */ if (debugEnabled()) TRACER.debugInfo( "In RS " + replicationCache.getReplicationServer(). "In RS " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " reader IO EXCEPTION for serverID=" + serverId + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage()); @@ -270,7 +276,7 @@ { if (debugEnabled()) TRACER.debugInfo( "In RS <" + replicationCache.getReplicationServer(). "In RS <" + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " reader CNF EXCEPTION serverID=" + serverId + stackTraceToSingleLineString(e)); @@ -284,7 +290,7 @@ { if (debugEnabled()) TRACER.debugInfo( "In RS <" + replicationCache.getReplicationServer(). "In RS <" + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " server reader EXCEPTION serverID=" + serverId + stackTraceToSingleLineString(e)); @@ -304,7 +310,7 @@ */ if (debugEnabled()) TRACER.debugInfo( "In RS " + replicationCache.getReplicationServer(). "In RS " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + " server reader for serverID=" + serverId + " is closing the session"); @@ -315,11 +321,11 @@ { // ignore } replicationCache.stopServer(handler); replicationServerDomain.stopServer(handler); } if (debugEnabled()) TRACER.debugInfo( "In RS " + replicationCache.getReplicationServer(). "In RS " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + (handler.isReplicationServer()?" RS":" LDAP") + " server reader stopped for serverID=" + serverId); opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -56,7 +56,7 @@ private ProtocolSession session; private ServerHandler handler; private ReplicationCache replicationCache; private ReplicationServerDomain replicationServerDomain; private short serverId; /** @@ -67,22 +67,24 @@ * @param session the ProtocolSession that will be used to send updates. * @param serverId the Identifier of the server. * @param handler handler for which the ServerWriter is created. * @param replicationCache The ReplicationCache of this ServerWriter. * @param replicationServerDomain The ReplicationServerDomain of this * ServerWriter. */ public ServerWriter(ProtocolSession session, short serverId, ServerHandler handler, ReplicationCache replicationCache) ServerHandler handler, ReplicationServerDomain replicationServerDomain) { super(handler.toString() + " writer"); this.serverId = serverId; this.session = session; this.handler = handler; this.replicationCache = replicationCache; this.replicationServerDomain = replicationServerDomain; } /** * Run method for the ServerWriter. * Loops waiting for changes from the ReplicationCache and forward them * Loops waiting for changes from the ReplicationServerDomain and forward them * to the other servers */ public void run() @@ -102,12 +104,12 @@ { while (true) { UpdateMessage update = replicationCache.take(this.handler); UpdateMessage update = replicationServerDomain.take(this.handler); if (update == null) return; /* this connection is closing */ // Ignore update to be sent to a replica with a bad generation ID long referenceGenerationId = replicationCache.getGenerationId(); long referenceGenerationId = replicationServerDomain.getGenerationId(); if ((referenceGenerationId != handler.getGenerationId()) || (referenceGenerationId == -1) || (handler.getGenerationId() == -1)) @@ -121,7 +123,7 @@ if (debugEnabled()) { TRACER.debugInfo( "In " + replicationCache.getReplicationServer(). "In " + replicationServerDomain.getReplicationServer(). getMonitorInstanceName() + ", writer to " + this.handler.getMonitorInstanceName() + " publishes msg=" + update.toString() + @@ -168,7 +170,7 @@ { // Can't do much more : ignore } replicationCache.stopServer(handler); replicationServerDomain.stopServer(handler); if (debugEnabled()) { opends/src/server/org/opends/server/replication/server/package-info.java
@@ -53,7 +53,7 @@ * ReplicationMessages objects. This class is used by both the * replicationServer and the replication package. * </li> * <li><A HREF="ReplicationCache.html"><B>ReplicationCache</B></A> * <li><A HREF="ReplicationServerDomain.html"><B>ReplicationServerDomain</B></A> * implements the multiplexing part of the replication * server. It contains method for forwarding all the received messages to * the ServerHandler and to the dbHandler objects.<br> opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -914,14 +914,14 @@ rgenId = replServer1.getGenerationId(baseDn); assertEquals(genId, rgenId, "DS and replServer are expected to have same genId."); assertTrue(!replServer1.getReplicationCache(baseDn, false). assertTrue(!replServer1.getReplicationServerDomain(baseDn, false). isDegradedDueToGenerationId(server1ID), "Expecting that DS is not degraded since domain genId has been reset"); assertTrue(replServer1.getReplicationCache(baseDn, false). assertTrue(replServer1.getReplicationServerDomain(baseDn, false). isDegradedDueToGenerationId(server2ID), "Expecting that broker2 is degraded since domain genId has been reset"); assertTrue(replServer1.getReplicationCache(baseDn, false). assertTrue(replServer1.getReplicationServerDomain(baseDn, false). isDegradedDueToGenerationId(server3ID), "Expecting that broker3 is degraded since domain genId has been reset"); @@ -1106,7 +1106,7 @@ } debugInfo("Expecting that broker2 is not degraded since it has a correct genId"); assertTrue(!replServer1.getReplicationCache(baseDn, false). assertTrue(!replServer1.getReplicationServerDomain(baseDn, false). isDegradedDueToGenerationId(server2ID)); debugInfo("Disconnecting DS from replServer1"); @@ -1132,7 +1132,7 @@ } debugInfo("Expecting that broker3 is degraded since it has a bad genId"); assertTrue(replServer1.getReplicationCache(baseDn, false). assertTrue(replServer1.getReplicationServerDomain(baseDn, false). isDegradedDueToGenerationId(server3ID)); int found = testEntriesInDb(); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -1090,25 +1090,25 @@ // Check that the list of connected LDAP servers is correct // in each replication servers List<String> l1 = changelog1.getReplicationCache(baseDn, false). List<String> l1 = changelog1.getReplicationServerDomain(baseDn, false). getConnectedLDAPservers(); assertEquals(l1.size(), 1); assertEquals(l1.get(0), String.valueOf(server1ID)); List<String> l2; l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers(); l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers(); assertEquals(l2.size(), 2); assertTrue(l2.contains(String.valueOf(server2ID))); assertTrue(l2.contains(String.valueOf(server3ID))); List<String> l3; l3 = changelog3.getReplicationCache(baseDn, false).getConnectedLDAPservers(); l3 = changelog3.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers(); assertEquals(l3.size(), 0); // Test updates broker3.stop(); Thread.sleep(1000); l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers(); l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers(); assertEquals(l2.size(), 1); assertEquals(l2.get(0), String.valueOf(server2ID)); @@ -1116,11 +1116,11 @@ server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges); broker2.stop(); Thread.sleep(1000); l2 = changelog2.getReplicationCache(baseDn, false).getConnectedLDAPservers(); l2 = changelog2.getReplicationServerDomain(baseDn, false).getConnectedLDAPservers(); assertEquals(l2.size(), 1); assertEquals(l2.get(0), String.valueOf(server3ID)); // TODO Test ReplicationCache.getDestinationServers method. // TODO Test ReplicationServerDomain.getDestinationServers method. broker2.stop(); broker3.stop();