opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,7 +27,8 @@ */ package org.opends.server.replication.server; import java.util.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Message; @@ -294,22 +295,19 @@ * restart as usual * load this change on the delayList */ NavigableSet<ReplicaDBCursor> sortedCursors = null; ReplicaDBCursor cursor = null; try { sortedCursors = collectAllCursorsWithChanges(); // fill the lateQueue while (!sortedCursors.isEmpty() && lateQueue.count() < 100 && lateQueue.bytesCount() < 50000) cursor = replicationServerDomain.getCursorFrom(serverState); while (cursor.next() && isLateQueueBelowThreshold()) { lateQueue.add(nextOldestUpdateMsg(sortedCursors)); lateQueue.add(cursor.getChange()); } } finally { close(sortedCursors); close(cursor); } /* @@ -403,34 +401,9 @@ return null; } private UpdateMsg nextOldestUpdateMsg( NavigableSet<ReplicaDBCursor> sortedCursors) private boolean isLateQueueBelowThreshold() { /* * The cursors are sorted based on the currentChange of each cursor to * consider the next change across all servers. * To keep consistent the order of the cursors in the SortedSet, * it is necessary to remove and eventually add again a cursor (after moving * it forward). */ final ReplicaDBCursor cursor = sortedCursors.pollFirst(); final UpdateMsg result = cursor.getChange(); cursor.next(); addCursorIfNotEmpty(sortedCursors, cursor); return result; } private void addCursorIfNotEmpty(Collection<ReplicaDBCursor> cursors, ReplicaDBCursor cursor) { if (cursor.getChange() != null) { cursors.add(cursor); } else { close(cursor); } return lateQueue.count() < 100 && lateQueue.bytesCount() < 50000; } /** @@ -476,12 +449,12 @@ private CSN findOldestCSNFromReplicaDBs() { SortedSet<ReplicaDBCursor> sortedCursors = null; ReplicaDBCursor cursor = null; try { sortedCursors = collectAllCursorsWithChanges(); UpdateMsg msg = sortedCursors.first().getChange(); return msg.getCSN(); cursor = replicationServerDomain.getCursorFrom(serverState); cursor.next(); return cursor.getChange().getCSN(); } catch (Exception e) { @@ -489,32 +462,11 @@ } finally { close(sortedCursors); close(cursor); } } /** * Collects all the {@link ReplicaDBCursor}s that have changes and sort them * with the oldest {@link CSN} first. * * @return a List of cursors with changes sorted by their {@link CSN} * (oldest first) */ private NavigableSet<ReplicaDBCursor> collectAllCursorsWithChanges() { final NavigableSet<ReplicaDBCursor> results = new TreeSet<ReplicaDBCursor>(); for (int serverId : replicationServerDomain.getServerIds()) { // get the last already sent CSN from that server to get a cursor final CSN lastCsn = serverState.getCSN(serverId); addCursorIfNotEmpty(results, replicationServerDomain.getCursorFrom(serverId, lastCsn)); } return results; } /** * Get the count of updates sent to this server. * @return The count of update sent to this server. */ opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -79,8 +79,7 @@ * <p> * Currently are only implemented the create and restore backup features. */ public class ReplicationBackend extends Backend public class ReplicationBackend extends Backend { private static final String CHANGE_NUMBER = "replicationChangeNumber"; @@ -620,23 +619,22 @@ * Exports or returns all the changes from a ReplicationServerDomain coming * after the CSN specified in the searchOperation. */ private void writeChangesAfterCSN(ReplicationServerDomain rsd, private void writeChangesAfterCSN(ReplicationServerDomain rsDomain, final LDIFExportConfig exportConfig, LDIFWriter ldifWriter, SearchOperation searchOperation, final CSN previousCSN) { for (int serverId : rsd.getServerIds()) { if (exportConfig != null && exportConfig.isCancelled()) { // Abort if cancelled return; } ReplicaDBCursor cursor = rsd.getCursorFrom(serverId, previousCSN); ReplicaDBCursor cursor = rsDomain.getCursorFrom(previousCSN); try { int lookthroughCount = 0; // Walk through the changes cursor.next(); // first try to advance the cursor while (cursor.getChange() != null) { if (exportConfig != null && exportConfig.isCancelled()) @@ -649,7 +647,7 @@ } lookthroughCount++; writeChange(cursor.getChange(), ldifWriter, searchOperation, rsd.getBaseDN(), exportConfig != null); rsDomain.getBaseDN(), exportConfig != null); cursor.next(); } } @@ -658,7 +656,6 @@ close(cursor); } } } private boolean canContinue(SearchOperation searchOperation, int lookthroughCount) opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1269,19 +1269,7 @@ } /** * Returns a set containing the serverIds that produced updates and known by * this replicationServer from all over the topology, whether directly * connected or connected to another RS. * * @return a set containing the serverIds known by this replicationServer. */ public Set<Integer> getServerIds() { return domainDB.getDomainServerIds(baseDN); } /** * Creates and returns a cursor. * Creates and returns a cursor across this replication domain. * <p> * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor * to the next available record. @@ -1290,16 +1278,35 @@ * {@link ReplicaDBCursor#close()} method to free the resources and locks used * by the cursor. * * @param serverId * Identifier of the server for which the cursor is created * @param startAfterCSN * Starting point for the cursor. If null, start from the oldest CSN * @return a non null {@link ReplicaDBCursor} * @see ReplicationDomainDB#getCursorFrom(DN, int, CSN) * @see ReplicationDomainDB#getCursorFrom(DN, CSN) */ public ReplicaDBCursor getCursorFrom(int serverId, CSN startAfterCSN) public ReplicaDBCursor getCursorFrom(CSN startAfterCSN) { return domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); return domainDB.getCursorFrom(baseDN, startAfterCSN); } /** * Creates and returns a cursor across this replication domain. * <p> * Client code must call {@link ReplicaDBCursor#next()} to advance the cursor * to the next available record. * <p> * When the cursor is not used anymore, client code MUST call the * {@link ReplicaDBCursor#close()} method to free the resources and locks used * by the cursor. * * @param startAfterServerState * Starting point for the replicaDB cursors. If null, start from the * oldest CSN * @return a non null {@link ReplicaDBCursor} going from oldest to newest CSN * @see ReplicationDomainDB#getCursorFrom(DN, ServerState) */ public ReplicaDBCursor getCursorFrom(ServerState startAfterServerState) { return domainDB.getCursorFrom(baseDN, startAfterServerState); } /** opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -26,8 +26,6 @@ */ package org.opends.server.replication.server.changelog.api; import java.util.Set; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; @@ -41,16 +39,6 @@ { /** * Returns the serverIds for the servers that are or have been part of the * provided replication domain. * * @param baseDN * the replication domain baseDN * @return an unmodifiable set of integers holding the serverIds */ Set<Integer> getDomainServerIds(DN baseDN); /** * Get the number of changes for the specified replication domain. * * @param baseDN @@ -171,8 +159,9 @@ long getCount(DN baseDN, int serverId, CSN from, CSN to); /** * Generates a {@link ReplicaDBCursor} for the specified serverId and * replication domain starting after the provided CSN. * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the * specified replication domain, with all cursors starting after the provided * CSN. * <p> * The cursor is already advanced to the record after startAfterCSN. * <p> @@ -182,13 +171,35 @@ * * @param baseDN * the replication domain baseDN * @param serverId * Identifier of the server for which the cursor is created * @param startAfterCSN * Starting point for the cursor. If null, start from the oldest CSN * Starting point for each ReplicaDB cursor. If null, start from the * oldest CSN for each ReplicaDB cursor. * @return a non null {@link ReplicaDBCursor} * @see #getCursorFrom(DN, ServerState) */ ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN); ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN); /** * Generates a {@link ReplicaDBCursor} across all the replicaDBs for the * specified replication domain starting after the provided * {@link ServerState} for each replicaDBs. * <p> * The cursor is already advanced to the records after the serverState. * <p> * When the cursor is not used anymore, client code MUST call the * {@link ReplicaDBCursor#close()} method to free the resources and locks used * by the cursor. * * @param baseDN * the replication domain baseDN * @param startAfterServerState * Starting point for each ReplicaDB cursor. If any CSN for a * replicaDB is null, then start from the oldest CSN for this * replicaDB * @return a non null {@link ReplicaDBCursor} * @see #getCursorFrom(DN, CSN) */ ReplicaDBCursor getCursorFrom(DN baseDN, ServerState startAfterServerState); /** * for the specified serverId and replication domain. opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,10 +27,7 @@ package org.opends.server.replication.server.changelog.je; import java.io.File; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import org.opends.messages.Message; @@ -57,6 +54,114 @@ { /** * ReplicaDBCursor implementation that iterates across all the ReplicaDBs of a * replication domain, advancing from the oldest to the newest change cross * all replicaDBs. */ private final class CrossReplicaDBCursor implements ReplicaDBCursor { private UpdateMsg currentChange; /** * The cursors are sorted based on the current change of each cursor to * consider the next change across all replicaDBs. */ private final NavigableSet<ReplicaDBCursor> cursors = new TreeSet<ReplicaDBCursor>(); private final DN baseDN; public CrossReplicaDBCursor(DN baseDN, ServerState startAfterServerState) { this.baseDN = baseDN; for (int serverId : getDomainMap(baseDN).keySet()) { // get the last already sent CSN from that server to get a cursor final CSN lastCSN = startAfterServerState.getCSN(serverId); addCursorIfNotEmpty(getCursorFrom(baseDN, serverId, lastCSN)); } } private ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN) { JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); if (replicaDB != null) { try { ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN); cursor.next(); return cursor; } catch (ChangelogException e) { // ignored } } return EMPTY_CURSOR; } @Override public boolean next() { if (cursors.isEmpty()) { currentChange = null; return false; } // To keep consistent the cursors' order in the SortedSet, it is necessary // to remove and eventually add again a cursor (after moving it forward). final ReplicaDBCursor cursor = cursors.pollFirst(); currentChange = cursor.getChange(); cursor.next(); addCursorIfNotEmpty(cursor); return true; } void addCursorIfNotEmpty(ReplicaDBCursor cursor) { if (cursor.getChange() != null) { cursors.add(cursor); } else { StaticUtils.close(cursor); } } @Override public UpdateMsg getChange() { return currentChange; } @Override public void close() { StaticUtils.close(cursors); } @Override public int compareTo(ReplicaDBCursor o) { final CSN csn1 = getChange().getCSN(); final CSN csn2 = o.getChange().getCSN(); return CSN.compare(csn1, csn2); } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " baseDN=" + baseDN + " currentChange=" + currentChange + " open cursors=" + cursors; } } /** * This map contains the List of updates received from each LDAP server. */ private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs = @@ -109,6 +214,12 @@ { // empty } @Override public String toString() { return "EmptyReplicaDBCursor"; } }; /** @@ -368,13 +479,6 @@ /** {@inheritDoc} */ @Override public Set<Integer> getDomainServerIds(DN baseDN) { return Collections.unmodifiableSet(getDomainMap(baseDN).keySet()); } /** {@inheritDoc} */ @Override public long getCount(DN baseDN, int serverId, CSN from, CSN to) { JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); @@ -565,24 +669,45 @@ /** {@inheritDoc} */ @Override public ReplicaDBCursor getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN) public ReplicaDBCursor getCursorFrom(DN baseDN, CSN startAfterCSN) { JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); if (replicaDB != null) { try { ReplicaDBCursor cursor = replicaDB.generateCursorFrom(startAfterCSN); cursor.next(); return cursor; // Builds a new serverState for all the serverIds in the replication domain // to ensure we get cursors starting after the provided CSN. return getCursorFrom(baseDN, buildServerState(baseDN, startAfterCSN)); } catch (ChangelogException e) /** {@inheritDoc} */ @Override public ReplicaDBCursor getCursorFrom(DN baseDN, ServerState startAfterServerState) { // ignored return new CrossReplicaDBCursor(baseDN, startAfterServerState); } private ServerState buildServerState(DN baseDN, CSN startAfterCSN) { final ServerState result = new ServerState(); if (startAfterCSN == null) { return result; } for (int serverId : getDomainMap(baseDN).keySet()) { if (serverId == startAfterCSN.getServerId()) { // reuse the provided CSN one as it is the most accurate result.update(startAfterCSN); } else { // build a new CSN, ignoring the seqNum since it is irrelevant for // a different serverId final CSN csn = startAfterCSN; // only used for increased readability result.update(new CSN(csn.getTime(), 0, serverId)); } } return EMPTY_CURSOR; return result; } /** {@inheritDoc} */ opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -584,7 +584,8 @@ @Override public String toString() { return baseDN + " " + serverId + " " + oldestCSN + " " + newestCSN; return getClass().getSimpleName() + " " + baseDN + " " + serverId + " " + oldestCSN + " " + newestCSN; } /** opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -166,4 +166,12 @@ return CSN.compare(csn1, csn2); } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " currentChange=" + currentChange + "" + replicaDB; } }