opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -252,7 +252,6 @@ { while (activeConsumer) { UpdateMsg msg; if (!following) { /* this server is late with regard to some other masters @@ -282,17 +281,17 @@ * restart as usual * load this change on the delayList */ NavigableSet<ReplicationDBCursor> sortedCursors = null; NavigableSet<ReplicationDBCursor> sortedCursors = null; try { sortedCursors = collectAllCursorsWithChanges(); // fill the lateQueue // fill the lateQueue while (!sortedCursors.isEmpty() && lateQueue.count() < 100 && lateQueue.bytesCount() < 50000) { lateQueue.add(nextOldestUpdateMsg(sortedCursors)); lateQueue.add(nextOldestUpdateMsg(sortedCursors)); } } finally @@ -315,15 +314,15 @@ following = true; } } } else } else { /* * if the first change in the lateQueue is also on the regular * queue, we can resume the processing from the regular queue * -> set following to true and empty the lateQueue. */ msg = lateQueue.first(); UpdateMsg msg = lateQueue.first(); synchronized (msgQueue) { if (msgQueue.contains(msg)) @@ -341,10 +340,11 @@ } } } } else } else { // get the next change from the lateQueue // get the next change from the lateQueue UpdateMsg msg; synchronized (msgQueue) { msg = lateQueue.removeFirst(); @@ -353,6 +353,8 @@ return msg; } } synchronized (msgQueue) { if (following) @@ -371,8 +373,7 @@ { return null; } msg = msgQueue.removeFirst(); UpdateMsg msg = msgQueue.removeFirst(); if (updateServerState(msg)) { /* @@ -393,38 +394,38 @@ return null; } private UpdateMsg nextOldestUpdateMsg( NavigableSet<ReplicationDBCursor> sortedCursors) { /* * The cursors are sorted based on the currentChange of each cursor to * consider the next change across all servers. * To keep consistent the order of the cursors in the SortedSet, * it is necessary to remove and eventually add again a cursor (after moving * it forward). */ final ReplicationDBCursor cursor = sortedCursors.pollFirst(); final UpdateMsg result = cursor.getChange(); cursor.next(); addCursorIfNotEmpty(sortedCursors, cursor); return result; } private UpdateMsg nextOldestUpdateMsg( NavigableSet<ReplicationDBCursor> sortedCursors) { /* * The cursors are sorted based on the currentChange of each cursor to * consider the next change across all servers. * To keep consistent the order of the cursors in the SortedSet, * it is necessary to remove and eventually add again a cursor (after moving * it forward). */ final ReplicationDBCursor cursor = sortedCursors.pollFirst(); final UpdateMsg result = cursor.getChange(); cursor.next(); addCursorIfNotEmpty(sortedCursors, cursor); return result; } private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors, ReplicationDBCursor cursor) { if (cursor != null) { if (cursor.getChange() != null) { cursors.add(cursor); } else { close(cursor); } } } private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors, ReplicationDBCursor cursor) { if (cursor != null) { if (cursor.getChange() != null) { cursors.add(cursor); } else { close(cursor); } } } /** * Get the older Change Number for that server. @@ -460,49 +461,49 @@ the lateQueue when it will send the next update but we are not yet there. So let's take the last change not sent directly from the db. */ result = findOldestChangeNumberFromReplicationDBs(); result = findOldestChangeNumberFromReplicationDBs(); } } } return result; } private ChangeNumber findOldestChangeNumberFromReplicationDBs() { SortedSet<ReplicationDBCursor> sortedCursors = null; try { sortedCursors = collectAllCursorsWithChanges(); UpdateMsg msg = sortedCursors.first().getChange(); return msg.getChangeNumber(); } catch (Exception e) { return null; } finally { close(sortedCursors); } } /** * Collects all the replication DB cursors that have changes and sort them * with the oldest {@link ChangeNumber} first. * * @return a List of cursors with changes sorted by their {@link ChangeNumber} * (oldest first) */ private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges() private ChangeNumber findOldestChangeNumberFromReplicationDBs() { final NavigableSet<ReplicationDBCursor> results = new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator()); SortedSet<ReplicationDBCursor> sortedCursors = null; try { sortedCursors = collectAllCursorsWithChanges(); UpdateMsg msg = sortedCursors.first().getChange(); return msg.getChangeNumber(); } catch (Exception e) { return null; } finally { close(sortedCursors); } } /** * Collects all the replication DB cursors that have changes and sort them * with the oldest {@link ChangeNumber} first. * * @return a List of cursors with changes sorted by their {@link ChangeNumber} * (oldest first) */ private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges() { final NavigableSet<ReplicationDBCursor> results = new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator()); for (int serverId : replicationServerDomain.getServerIds()) { // get the last already sent CN from that server to get a cursor final ChangeNumber lastCsn = serverState.getChangeNumber(serverId); addCursorIfNotEmpty(results, replicationServerDomain.getCursorFrom(serverId, lastCsn)); final ChangeNumber lastCsn = serverState.getChangeNumber(serverId); addCursorIfNotEmpty(results, replicationServerDomain.getCursorFrom(serverId, lastCsn)); } return results; } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -49,7 +49,6 @@ import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationDBCursor; import org.opends.server.replication.server.changelog.je.DbHandler; import org.opends.server.replication.server.changelog.je.ReplicationDB; import org.opends.server.types.*; import static org.opends.messages.ReplicationMessages.*; @@ -1263,17 +1262,17 @@ } /** * Creates and returns a cursor. When the cursor is not used anymore, the * caller MUST call the {@link ReplicationDBCursor#close()} method to free the * resources and locks used by the cursor. * * @param serverId * Identifier of the server for which the cursor is created. * @param startAfterCN * Starting point for the cursor. * @return the created {@link ReplicationDB}. Null when no DB is available or * the DB is empty for the provided serverId . */ * Creates and returns a cursor. When the cursor is not used anymore, the * caller MUST call the {@link ReplicationDBCursor#close()} method to free the * resources and locks used by the cursor. * * @param serverId * Identifier of the server for which the cursor is created. * @param startAfterCN * Starting point for the cursor. * @return the created {@link ReplicationDBCursor}. Null when no DB is * available or the DB is empty for the provided serverId . */ public ReplicationDBCursor getCursorFrom(int serverId, ChangeNumber startAfterCN) { opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java
@@ -38,24 +38,24 @@ { /** * Get the UpdateMsg where the cursor is currently set. * * @return The UpdateMsg where the cursor is currently set. */ * Get the UpdateMsg where the cursor is currently set. * * @return The UpdateMsg where the cursor is currently set. */ UpdateMsg getChange(); /** * Go to the next change in the ReplicationDB or in the server Queue. * * @return false if the cursor is already on the last change before this call. */ * Go to the next change in the ReplicationDB or in the server Queue. * * @return false if the cursor is already on the last change before this call. */ boolean next(); /** * Release the resources and locks used by this cursor. This method must be * called when the cursor is no longer used. Failure to do it could cause DB * deadlock. */ * Release the resources and locks used by this cursor. This method must be * called when the cursor is no longer used. Failure to do it could cause DB * deadlock. */ @Override void close(); opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java
@@ -30,7 +30,6 @@ import java.util.Comparator; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.UpdateMsg; /** * This class defines a {@link Comparator} that allows to know which @@ -41,14 +40,14 @@ implements Comparator<ReplicationDBCursor> { /** * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}. * * @param o1 * first cursor. * @param o2 * second cursor. * @return result of the comparison. */ * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}. * * @param o1 * first cursor. * @param o2 * second cursor. * @return result of the comparison. */ @Override public int compare(ReplicationDBCursor o1, ReplicationDBCursor o2) { opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -44,7 +44,7 @@ import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationDBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.*; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.InitializationException; @@ -260,18 +260,18 @@ } /** * Generate a new {@link ReplicationDBCursor} that allows to browse the db * managed by this dbHandler and starting at the position defined by a given * changeNumber. * * @param startAfterCN * The position where the cursor must start. * @return a new {@link ReplicationDBCursor} that allows to browse the db * managed by this dbHandler and starting at the position defined by a * given changeNumber. * @throws ChangelogException * if a database problem happened. */ * Generate a new {@link ReplicationDBCursor} that allows to browse the db * managed by this dbHandler and starting at the position defined by a given * changeNumber. * * @param startAfterCN * The position where the cursor must start. * @return a new {@link ReplicationDBCursor} that allows to browse the db * managed by this dbHandler and starting at the position defined by a * given changeNumber. * @throws ChangelogException * if a database problem happened. */ public ReplicationDBCursor generateCursorFrom(ChangeNumber startAfterCN) throws ChangelogException { opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java
@@ -32,7 +32,7 @@ import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationDBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.*; /** * Berkeley DB JE implementation of {@link ReplicationDBCursor}. @@ -46,18 +46,18 @@ private ChangeNumber lastNonNullCurrentCN; /** * Creates a new JEReplicationDBCursor. All created cursor must be released by * the caller using the {@link #close()} method. * * @param db * The db where the cursor must be created. * @param startAfterCN * The ChangeNumber after which the cursor must start. * @param dbHandler * The associated DbHandler. * @throws ChangelogException * if a database problem happened. */ * Creates a new JEReplicationDBCursor. All created cursor must be released by * the caller using the {@link #close()} method. * * @param db * The db where the cursor must be created. * @param startAfterCN * The ChangeNumber after which the cursor must start. * @param dbHandler * The associated DbHandler. * @throws ChangelogException * if a database problem happened. */ public JEReplicationDBCursor(ReplicationDB db, ChangeNumber startAfterCN, DbHandler dbHandler) throws ChangelogException { @@ -151,10 +151,10 @@ } /** * Called by the Gc when the object is garbage collected Release the internal * cursor in case the cursor was badly used and {@link #close()} was never * called. */ * Called by the Gc when the object is garbage collected Release the internal * cursor in case the cursor was badly used and {@link #close()} was never * called. */ @Override protected void finalize() {