opends/src/messages/messages/replication.properties
@@ -500,7 +500,7 @@ NOTICE_RS_NOT_LOCALLY_CONFIGURED_219=RS(%d) was not configured locally on DS(%d), \ but at least one other RS was NOTICE_RS_HAS_NO_GENERATION_ID_220=RS(%d) has no generation Id, but at least one \ other RS has the same generation Id %d as DS(%d) other RS has the same generation Id %d as DS(%d) NOTICE_RS_HAS_DIFFERENT_GENERATION_ID_THAN_DS_221=RS(%d) generation Id %d does not \ match DS(%d) generation Id %d, but at least another RS does NOTICE_RS_HAS_DIFFERENT_GROUP_ID_THAN_DS_222=RS(%d) groupId %d does not match \ @@ -525,5 +525,5 @@ biggest weight among all the replication servers NOTICE_AVOID_YOYO_EFFECT_232=DS(%d) stayed connected to RS(%d) to avoid the yoyo effect NOTICE_BEST_RS_233=RS(%d) has been evaluated to be the best replication server \ for DS(%d) to connect to because it was the only one standing after all tests for DS(%d) to connect to because it was the only one standing after all tests NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d) opends/src/server/org/opends/server/replication/common/RSInfo.java
@@ -167,16 +167,11 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append("\nId: "); sb.append(id); sb.append(" ; Server URL: "); sb.append(serverUrl); sb.append(" ; Generation id: "); sb.append(generationId); sb.append(" ; Group id: "); sb.append(groupId); sb.append(" ; Weight: "); sb.append(weight); sb.append("Id: ").append(id); sb.append(" ; Server URL: ").append(serverUrl); sb.append(" ; Generation id: ").append(generationId); sb.append(" ; Group id: ").append(groupId); sb.append(" ; Weight: ").append(weight); return sb.toString(); } } opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -58,13 +58,33 @@ public final class ECLServerHandler extends ServerHandler { /** * Marks the end of the search on the External Change Log. */ private static int UNDEFINED_PHASE = 0; /** * Constant used to indicate the handler is in the ECL initialization phase. * The External Change Log initialization phase currently reads the changes * from all the ReplicaDBs from oldest to newest and then: * <ul> * <li>Matches each ReplicaDBs change with the corresponding record in * {@link ChangeNumberIndexDB}, then assign its changeNumber in memory before * returning the change to the client</li> * <li>Once it reaches the end of the {@link ChangeNumberIndexDB}, it inserts * each ReplicaDBs change in the {@link ChangeNumberIndexDB} and gets back and * assign the changeNumber in memory to the ReplicaDBs change.</li> * </ul> * Once this phase is over the current search moves to the * {@link #UNDEFINED_PHASE} or the {@link #PERSISTENT_PHASE} depending on the * search type. * * @see #getSearchPhase() */ public static int INIT_PHASE = 1; /** * The persistent phase is only used for persistent searches on the External * ChangeLog. It comes after the {@link #INIT_PHASE} and sends back to the * client newly added changes to the {@link ChangeNumberIndexDB}. */ private static int PERSISTENT_PHASE = 2; /** @@ -111,8 +131,8 @@ private Set<String> excludedBaseDNs = new HashSet<String>(); /** * Eligible CSN - only changes older or equal to eligibleCSN * are published * in the ECL. * Eligible CSN - only changes older or equal to eligibleCSN are published in * the ECL. */ private CSN eligibleCSN; @@ -1203,6 +1223,9 @@ if (draftCompat) { continueLooping = !assignChangeNumber(change); // if we encounter a change that has been trimmed from the replicaDBs, // we will skip it and loop to the next oldest change from the // replicaDBs } // here we have the right oldest change @@ -1288,34 +1311,34 @@ * Either retrieves a change number from the DB, or assign a new change number * and store in the DB. * * @param oldestChange * the oldestChange where to assign the change number * @param replicaDBChange * the replica DB change to find in the {@link ChangeNumberIndexDB} * where to assign the change number * @return <code>true</code> if a change number has been assigned to the * provided oldestChange, <code>false</code> otherwise * provided replicaDBChange, <code>false</code> otherwise * @throws DirectoryException * if any problem occur * @throws ChangelogException * if a database problem occurs. */ private boolean assignChangeNumber(final ECLUpdateMsg oldestChange) private boolean assignChangeNumber(final ECLUpdateMsg replicaDBChange) throws ChangelogException { // We also need to check if the CNIndexDB is consistent with the // changelogDB. If not, 2 potential reasons: // a/ changelog has been purged (trim) let's traverse the CNIndexDB // We also need to check if the CNIndexDB is consistent with the replicaDBs. // If not, 2 potential reasons: // a/ replicaDBs have been purged (trim) let's traverse the CNIndexDB // b/ changelog is late ... let's traverse the changelogDb // The following loop allows to loop until being on the same cn in the 2 dbs // replogCSN : the oldest change from the changelog db CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN(); DN dnFromChangelogDb = oldestChange.getBaseDN(); CSN csnFromReplicaDB = replicaDBChange.getUpdateMsg().getCSN(); DN dnFromReplicaDB = replicaDBChange.getBaseDN(); while (true) { if (isEndOfCNIndexDBReached) { // we are at the end of the CNIndexDB in the append mode assignNewChangeNumberAndStore(oldestChange); assignNewChangeNumberAndStore(replicaDBChange); return true; } @@ -1324,43 +1347,44 @@ final DN dnFromCNIndexDB = currentRecord.getBaseDN(); if (debugEnabled()) TRACER.debugInfo("assignChangeNumber() comparing the 2 db DNs :" + dnFromChangelogDb + "?=" + dnFromCNIndexDB + " timestamps:" + asDate(csnFromChangelogDb) + " ?older" TRACER.debugInfo("assignChangeNumber() comparing the replicaDB's and" + " CNIndexDB's DNs :" + dnFromReplicaDB + "?=" + dnFromCNIndexDB + " timestamps:" + asDate(csnFromReplicaDB) + " ?older" + asDate(csnFromCNIndexDB)); if (areSameChange(csnFromChangelogDb, dnFromChangelogDb, csnFromCNIndexDB, dnFromCNIndexDB)) if (areSameChange(csnFromReplicaDB, dnFromReplicaDB, csnFromCNIndexDB, dnFromCNIndexDB)) { // We matched the ReplicaDB change with a record in the CNIndexDB // => set the changeNumber in memory and return the change to the client if (debugEnabled()) TRACER.debugInfo("assignChangeNumber() assigning changeNumber=" + currentRecord.getChangeNumber() + " to change=" + oldestChange); + currentRecord.getChangeNumber() + " to change=" + replicaDBChange); oldestChange.setChangeNumber(currentRecord.getChangeNumber()); replicaDBChange.setChangeNumber(currentRecord.getChangeNumber()); return true; } if (!csnFromCNIndexDB.isOlderThan(csnFromChangelogDb)) if (!csnFromCNIndexDB.isOlderThan(csnFromReplicaDB)) { // the change from the changelogDb is older // the change from the replicaDB is older // it should have been stored lately // let's continue to traverse the changelogDB // let's continue to traverse the replicaDBs if (debugEnabled()) TRACER.debugInfo("assignChangeNumber() will skip " + csnFromChangelogDb TRACER.debugInfo("assignChangeNumber() will skip " + csnFromReplicaDB + " and read next change from the regular changelog."); return false; // TO BE CHECKED } // the change from the CNIndexDB is older // that means that the change has been purged from the // changelogDb (and CNIndexDB not yet been trimmed) // The change from the CNIndexDB is older. // It means that the CNIndexDB change has been purged from the replicaDB // and CNIndexDB has not been trimmed yet. try { // let's traverse the CNIndexDB searching for the change // found in the changelogDB // keep traversing the CNIndexDB searching for the replicaDB change if (debugEnabled()) TRACER.debugInfo("assignChangeNumber() will skip " + csnFromCNIndexDB + " and read next change from the CNIndexDB."); @@ -1380,7 +1404,7 @@ TRACER.debugCaught(DebugLogLevel.ERROR, e); } // FIXME There is an opportunity for an infinite loop here if the DB // continuously throws DatabaseExceptions // continuously throws ChangelogExceptions } } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2595,10 +2595,10 @@ CSN eligibleCSN = null; final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN); for (final CSN changelogNewestCSN : newestCSNs) for (final CSN replicaNewestCSN : newestCSNs) { // Should it be considered for eligibility ? int serverId = changelogNewestCSN.getServerId(); int serverId = replicaNewestCSN.getServerId(); CSN heartbeatLastCSN = getChangeTimeHeartbeatState().getCSN(serverId); // If the most recent UpdateMsg or CLHeartbeatMsg received is very old @@ -2625,11 +2625,11 @@ continue; } if (changelogNewestCSN != null if (replicaNewestCSN != null && (eligibleCSN == null || changelogNewestCSN.isNewerThan(eligibleCSN))) replicaNewestCSN.isNewerThan(eligibleCSN))) { eligibleCSN = changelogNewestCSN; eligibleCSN = replicaNewestCSN; } if (heartbeatLastCSN != null && (eligibleCSN == null || heartbeatLastCSN.isNewerThan(eligibleCSN))) opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -299,20 +299,19 @@ /** * Sets the locally configured flag for the passed ReplicationServerInfo * object, analyzing the local configuration. * @param replicationServerInfo the Replication server to check and update * @param rsInfo the Replication server to check and update */ private void updateRSInfoLocallyConfiguredStatus( ReplicationServerInfo replicationServerInfo) private void updateRSInfoLocallyConfiguredStatus(ReplicationServerInfo rsInfo) { // Determine if the passed ReplicationServerInfo has a URL that is present // in the locally configured replication servers String rsUrl = replicationServerInfo.getServerURL(); String rsUrl = rsInfo.getServerURL(); if (rsUrl == null) { // The ReplicationServerInfo has been generated from a server with // no URL in TopologyMsg (i.e: with replication protocol version < 4): // ignore this server as we do not know how to connect to it replicationServerInfo.setLocallyConfigured(false); rsInfo.setLocallyConfigured(false); return; } for (String serverUrl : getReplicationServerUrls()) @@ -320,12 +319,12 @@ if (isSameReplicationServerUrl(serverUrl, rsUrl)) { // This RS is locally configured, mark this replicationServerInfo.setLocallyConfigured(true); replicationServerInfo.serverURL = serverUrl; rsInfo.setLocallyConfigured(true); rsInfo.serverURL = serverUrl; return; } } replicationServerInfo.setLocallyConfigured(false); rsInfo.setLocallyConfigured(false); } /** @@ -704,14 +703,12 @@ for (String serverUrl : getReplicationServerUrls()) { // Connect to server and get info about it ReplicationServerInfo replicationServerInfo = performPhaseOneHandshake(serverUrl, false, false); // Store server info in list if (replicationServerInfo != null) // Connect to server + get and store info about it ReplicationServerInfo rsInfo = performPhaseOneHandshake(serverUrl, false, false); if (rsInfo != null) { rsInfos.put(replicationServerInfo.getServerId(), replicationServerInfo); rsInfos.put(rsInfo.getServerId(), rsInfo); } } @@ -1943,10 +1940,10 @@ int sumOfWeights = 0; // Sum of the connected DSs int sumOfConnectedDSs = 0; for (ReplicationServerInfo replicationServerInfo : bestServers.values()) for (ReplicationServerInfo rsInfo : bestServers.values()) { sumOfWeights += replicationServerInfo.getWeight(); sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber(); sumOfWeights += rsInfo.getWeight(); sumOfConnectedDSs += rsInfo.getConnectedDSNumber(); } // Distance (difference) of the current loads to the load goals of each RS: @@ -3036,27 +3033,25 @@ // Update replication server info list with the received topology // information List<Integer> rsToKeepList = new ArrayList<Integer>(); final Set<Integer> rssToKeep = new HashSet<Integer>(); for (RSInfo rsInfo : topoMsg.getRsList()) { int rsId = rsInfo.getId(); rsToKeepList.add(rsId); // Mark this server as still existing rssToKeep.add(rsId); // Mark this server as still existing List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList); ReplicationServerInfo replicationServerInfo = replicationServerInfos.get(rsId); if (replicationServerInfo == null) ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId); if (rsInfo2 == null) { // New replication server, create info for it add it to the list replicationServerInfo = new ReplicationServerInfo(rsInfo, connectedDSs); rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs); // Set the locally configured flag for this new RS only if it is // configured updateRSInfoLocallyConfiguredStatus(replicationServerInfo); replicationServerInfos.put(rsId, replicationServerInfo); updateRSInfoLocallyConfiguredStatus(rsInfo2); replicationServerInfos.put(rsId, rsInfo2); } else { // Update the existing info for the replication server replicationServerInfo.update(rsInfo, connectedDSs); rsInfo2.update(rsInfo, connectedDSs); } } @@ -3064,12 +3059,11 @@ * Now remove any replication server that may have disappeared from the * topology. */ Iterator<Entry<Integer, ReplicationServerInfo>> rsInfoIt = replicationServerInfos.entrySet().iterator(); Iterator<Integer> rsInfoIt = replicationServerInfos.keySet().iterator(); while (rsInfoIt.hasNext()) { Entry<Integer, ReplicationServerInfo> rsInfoEntry = rsInfoIt.next(); if (!rsToKeepList.contains(rsInfoEntry.getKey())) final Integer rsId = rsInfoIt.next(); if (!rssToKeep.contains(rsId)) { // This replication server has quit the topology, remove it from the // list