opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,10 +27,7 @@ */ package org.opends.server.replication.server; import java.util.ArrayList; import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Message; @@ -253,9 +250,9 @@ */ protected UpdateMsg getNextMessage(boolean synchronous) { UpdateMsg msg; while (activeConsumer) { UpdateMsg msg; if (!following) { /* this server is late with regard to some other masters @@ -285,32 +282,22 @@ * restart as usual * load this change on the delayList */ SortedSet<ReplicationIterator> iteratorSortedSet = null; NavigableSet<ReplicationDBCursor> sortedCursors = null; try { iteratorSortedSet = collectAllIteratorsWithChanges(); sortedCursors = collectAllCursorsWithChanges(); /* fill the lateQueue */ // The loop below relies on the fact that it is sorted based // on the currentChange of each iterator to consider the next // change across all servers. // // Hence it is necessary to remove and eventual add again an // iterator when looping in order to keep consistent the order of // the iterators (see ReplicationIteratorComparator. while (!iteratorSortedSet.isEmpty() && (lateQueue.count() < 100) && (lateQueue.bytesCount() < 50000)) // fill the lateQueue while (!sortedCursors.isEmpty() && lateQueue.count() < 100 && lateQueue.bytesCount() < 50000) { ReplicationIterator iterator = iteratorSortedSet.first(); iteratorSortedSet.remove(iterator); lateQueue.add(iterator.getChange()); addIteratorIfNotEmpty(iteratorSortedSet, iterator); lateQueue.add(nextOldestUpdateMsg(sortedCursors)); } } finally { close(iteratorSortedSet); close(sortedCursors); } /* @@ -328,7 +315,8 @@ following = true; } } } else } else { /* * if the first change in the lateQueue is also on the regular @@ -353,9 +341,10 @@ } } } } else } else { /* get the next change from the lateQueue */ // get the next change from the lateQueue synchronized (msgQueue) { msg = lateQueue.removeFirst(); @@ -404,18 +393,38 @@ return null; } private void addIteratorIfNotEmpty(SortedSet<ReplicationIterator> iterators, ReplicationIterator iter) { if (iter.next()) { iterators.add(iter); } else { close(iter); } } private UpdateMsg nextOldestUpdateMsg( NavigableSet<ReplicationDBCursor> sortedCursors) { /* * The cursors are sorted based on the currentChange of each cursor to * consider the next change across all servers. * To keep consistent the order of the cursors in the SortedSet, * it is necessary to remove and eventually add again a cursor (after moving * it forward). */ final ReplicationDBCursor cursor = sortedCursors.pollFirst(); final UpdateMsg result = cursor.getChange(); cursor.next(); addCursorIfNotEmpty(sortedCursors, cursor); return result; } private void addCursorIfNotEmpty(Collection<ReplicationDBCursor> cursors, ReplicationDBCursor cursor) { if (cursor != null) { if (cursor.getChange() != null) { cursors.add(cursor); } else { close(cursor); } } } /** * Get the older Change Number for that server. @@ -449,57 +458,51 @@ We may be at the very moment when the writer has emptied the lateQueue when it sent the last update. The writer will fill again the lateQueue when it will send the next update but we are not yet there. So let's take the last change not sent directly from the db. there. So let's take the last change not sent directly from the db. */ SortedSet<ReplicationIterator> iteratorSortedSet = null; try { iteratorSortedSet = collectAllIteratorsWithChanges(); UpdateMsg msg = iteratorSortedSet.first().getChange(); result = msg.getChangeNumber(); } catch (Exception e) { result = null; } finally { close(iteratorSortedSet); } result = findOldestChangeNumberFromReplicationDBs(); } } } return result; } private SortedSet<ReplicationIterator> collectAllIteratorsWithChanges() { SortedSet<ReplicationIterator> results = new TreeSet<ReplicationIterator>(new ReplicationIteratorComparator()); private ChangeNumber findOldestChangeNumberFromReplicationDBs() { SortedSet<ReplicationDBCursor> sortedCursors = null; try { sortedCursors = collectAllCursorsWithChanges(); UpdateMsg msg = sortedCursors.first().getChange(); return msg.getChangeNumber(); } catch (Exception e) { return null; } finally { close(sortedCursors); } } // Build a list of candidates iterator (i.e. db i.e. server) /** * Collects all the replication DB cursors that have changes and sort them * with the oldest {@link ChangeNumber} first. * * @return a List of cursors with changes sorted by their {@link ChangeNumber} * (oldest first) */ private NavigableSet<ReplicationDBCursor> collectAllCursorsWithChanges() { final NavigableSet<ReplicationDBCursor> results = new TreeSet<ReplicationDBCursor>(new ReplicationDBCursorComparator()); for (int serverId : replicationServerDomain.getServerIds()) { // get the last already sent CN from that server ChangeNumber lastCsn = serverState.getChangeNumber(serverId); // get an iterator in this server db from that last change ReplicationIterator iter = replicationServerDomain.getChangelogIterator(serverId, lastCsn); /* if that iterator has changes, then it is a candidate it is added in the sorted list at a position given by its current change (see ReplicationIteratorComparator). */ if (iter != null) { if (iter.getChange() != null) { results.add(iter); } else { close(iter); } } // get the last already sent CN from that server to get a cursor final ChangeNumber lastCsn = serverState.getChangeNumber(serverId); addCursorIfNotEmpty(results, replicationServerDomain.getCursorFrom(serverId, lastCsn)); } return results; } opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -50,7 +50,7 @@ import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.plugin.ReplicationServerListener; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.opends.server.replication.server.changelog.api.ReplicationDBCursor; import org.opends.server.types.*; import org.opends.server.util.*; @@ -631,15 +631,15 @@ return; } ReplicationIterator ri = rsd.getChangelogIterator(serverId, previousCN); if (ri != null) ReplicationDBCursor cursor = rsd.getCursorFrom(serverId, previousCN); if (cursor != null) { try { int lookthroughCount = 0; // Walk through the changes while (ri.getChange() != null) while (cursor.getChange() != null) { if (exportConfig != null && exportConfig.isCancelled()) { // abort if cancelled @@ -650,14 +650,14 @@ break; } lookthroughCount++; writeChange(ri.getChange(), ldifWriter, searchOperation, writeChange(cursor.getChange(), ldifWriter, searchOperation, rsd.getBaseDn(), exportConfig != null); ri.next(); cursor.next(); } } finally { close(ri); close(cursor); } } } opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,8 +47,9 @@ import org.opends.server.replication.common.*; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.opends.server.replication.server.changelog.api.ReplicationDBCursor; import org.opends.server.replication.server.changelog.je.DbHandler; import org.opends.server.replication.server.changelog.je.ReplicationDB; import org.opends.server.types.*; import static org.opends.messages.ReplicationMessages.*; @@ -1262,17 +1263,18 @@ } /** * Creates and returns an iterator. * When the iterator is not used anymore, the caller MUST call the * ReplicationIterator.releaseCursor() method to free the resources * and locks used by the ReplicationIterator. * * @param serverId Identifier of the server for which the iterator is created. * @param startAfterCN Starting point for the iterator. * @return the created ReplicationIterator. Null when no DB is available * for the provided server Id. */ public ReplicationIterator getChangelogIterator(int serverId, * Creates and returns a cursor. When the cursor is not used anymore, the * caller MUST call the {@link ReplicationDBCursor#close()} method to free the * resources and locks used by the cursor. * * @param serverId * Identifier of the server for which the cursor is created. * @param startAfterCN * Starting point for the cursor. * @return the created {@link ReplicationDB}. Null when no DB is available or * the DB is empty for the provided serverId . */ public ReplicationDBCursor getCursorFrom(int serverId, ChangeNumber startAfterCN) { DbHandler dbHandler = sourceDbHandlers.get(serverId); @@ -1281,29 +1283,29 @@ return null; } ReplicationIterator it; ReplicationDBCursor cursor; try { it = dbHandler.generateIterator(startAfterCN); cursor = dbHandler.generateCursorFrom(startAfterCN); } catch (Exception e) { return null; } if (!it.next()) if (!cursor.next()) { close(it); close(cursor); return null; } return it; return cursor; } /** * Count the number of changes in the replication changelog for the provided * serverID, between 2 provided changenumbers. * @param serverId Identifier of the server for which the iterator is created. * @param serverId Identifier of the server for which to compute the count. * @param from lower limit changenumber. * @param to upper limit changenumber. * @return the number of changes. @@ -2679,18 +2681,18 @@ // to the Db and look for the change older than eligible CN (cn14) if (eligibleCN.olderOrEqual(mostRecentDbCN)) { // let's try to seek the first change <= eligibleCN ReplicationIterator ri = null; ReplicationDBCursor cursor = null; try { ri = h.generateIterator(eligibleCN); if (ri != null && ri.getChange() != null) { ChangeNumber newCN = ri.getChange().getChangeNumber(); cursor = h.generateCursorFrom(eligibleCN); if (cursor != null && cursor.getChange() != null) { ChangeNumber newCN = cursor.getChange().getChangeNumber(); result.update(newCN); } } catch (ChangelogException e) { // there's no change older than eligibleCN (case of s3/cn31) result.update(new ChangeNumber(0, 0, serverId)); } finally { close(ri); close(cursor); } } else { // for this serverId, all changes in the ChangelogDb are holder opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursor.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java @@ -31,32 +31,31 @@ import org.opends.server.replication.protocol.UpdateMsg; /** * This interface allows to iterate through the changes received from a given * This cursor allows to iterate through the changes received from a given * LDAP Server Identifier. */ public interface ReplicationIterator extends Closeable public interface ReplicationDBCursor extends Closeable { /** * Get the UpdateMsg where the iterator is currently set. * * @return The UpdateMsg where the iterator is currently set. */ * Get the UpdateMsg where the cursor is currently set. * * @return The UpdateMsg where the cursor is currently set. */ UpdateMsg getChange(); /** * Go to the next change in the ReplicationDB or in the server Queue. * * @return false if the iterator is already on the last change before this * call. */ * Go to the next change in the ReplicationDB or in the server Queue. * * @return false if the cursor is already on the last change before this call. */ boolean next(); /** * Release the resources and locks used by this Iterator. This method must be * called when the iterator is no longer used. Failure to do it could cause DB * deadlock. */ * Release the resources and locks used by this cursor. This method must be * called when the cursor is no longer used. Failure to do it could cause DB * deadlock. */ @Override void close(); opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDBCursorComparator.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java @@ -30,24 +30,27 @@ import java.util.Comparator; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.UpdateMsg; /** * This Class define a Comparator that allows to know which ReplicationIterator * contain the next UpdateMessage in the order defined by the ChangeNumber * of the UpdateMessage. * This class defines a {@link Comparator} that allows to know which * {@link ReplicationDBCursor} contain the next {@link UpdateMsg} in the order * defined by the {@link ChangeNumber} of the {@link UpdateMsg}. */ public class ReplicationIteratorComparator implements Comparator<ReplicationIterator> public class ReplicationDBCursorComparator implements Comparator<ReplicationDBCursor> { /** * Compare the ChangeNumber of the ReplicationIterator. * * @param o1 first ReplicationIterator. * @param o2 second ReplicationIterator. * @return result of the comparison. */ * Compare the {@link ChangeNumber} of the {@link ReplicationDBCursor}. * * @param o1 * first cursor. * @param o2 * second cursor. * @return result of the comparison. */ @Override public int compare(ReplicationIterator o1, ReplicationIterator o2) public int compare(ReplicationDBCursor o1, ReplicationDBCursor o2) { ChangeNumber csn1 = o1.getChange().getChangeNumber(); ChangeNumber csn2 = o2.getChange().getChangeNumber(); opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -43,8 +43,8 @@ import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.opends.server.replication.server.changelog.je.ReplicationDB.*; import org.opends.server.replication.server.changelog.api.ReplicationDBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.InitializationException; @@ -59,8 +59,8 @@ * server in the topology. * It is responsible for efficiently saving the updates that is received from * each master server into stable storage. * This class is also able to generate a ReplicationIterator that can be * used to read all changes from a given ChangeNumber. * This class is also able to generate a {@link ReplicationDBCursor} that can be * used to read all changes from a given {@link ChangeNumber}. * * This class publish some monitoring information below cn=monitor. */ @@ -260,24 +260,26 @@ } /** * Generate a new ReplicationIterator that allows to browse the db * managed by this dbHandler and starting at the position defined * by a given changeNumber. * * @param startAfterCN The position where the iterator must start. * @return a new ReplicationIterator that allows to browse the db * managed by this dbHandler and starting at the position defined * by a given changeNumber. * @throws ChangelogException if a database problem happened. */ public ReplicationIterator generateIterator(ChangeNumber startAfterCN) * Generate a new {@link ReplicationDBCursor} that allows to browse the db * managed by this dbHandler and starting at the position defined by a given * changeNumber. * * @param startAfterCN * The position where the cursor must start. * @return a new {@link ReplicationDBCursor} that allows to browse the db * managed by this dbHandler and starting at the position defined by a * given changeNumber. * @throws ChangelogException * if a database problem happened. */ public ReplicationDBCursor generateCursorFrom(ChangeNumber startAfterCN) throws ChangelogException { if (startAfterCN == null) { flush(); } return new JEReplicationIterator(db, startAfterCN, this); return new JEReplicationDBCursor(db, startAfterCN, this); } /** opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationDBCursor.java
File was renamed from opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicationIterator.java @@ -31,13 +31,13 @@ import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.opends.server.replication.server.changelog.je.ReplicationDB.*; import org.opends.server.replication.server.changelog.api.ReplicationDBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor; /** * Berkeley DB JE implementation of IReplicationIterator. * Berkeley DB JE implementation of {@link ReplicationDBCursor}. */ public class JEReplicationIterator implements ReplicationIterator public class JEReplicationDBCursor implements ReplicationDBCursor { private UpdateMsg currentChange = null; private ReplServerDBCursor cursor = null; @@ -46,16 +46,19 @@ private ChangeNumber lastNonNullCurrentCN; /** * Creates a new ReplicationIterator. * All created iterator must be released by the caller using the * releaseCursor() method. * * @param db The db where the iterator must be created. * @param startAfterCN The ChangeNumber after which the iterator must start. * @param dbHandler The associated DbHandler. * @throws ChangelogException if a database problem happened. */ public JEReplicationIterator(ReplicationDB db, ChangeNumber startAfterCN, * Creates a new JEReplicationDBCursor. All created cursor must be released by * the caller using the {@link #close()} method. * * @param db * The db where the cursor must be created. * @param startAfterCN * The ChangeNumber after which the cursor must start. * @param dbHandler * The associated DbHandler. * @throws ChangelogException * if a database problem happened. */ public JEReplicationDBCursor(ReplicationDB db, ChangeNumber startAfterCN, DbHandler dbHandler) throws ChangelogException { this.db = db; @@ -148,10 +151,10 @@ } /** * Called by the Gc when the object is garbage collected * Release the cursor in case the iterator was badly used and releaseCursor * was never called. */ * Called by the Gc when the object is garbage collected Release the internal * cursor in case the cursor was badly used and {@link #close()} was never * called. */ @Override protected void finalize() { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
@@ -40,7 +40,7 @@ import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ReplicationIterator; import org.opends.server.replication.server.changelog.api.ReplicationDBCursor; import org.opends.server.util.StaticUtils; import org.testng.annotations.Test; @@ -124,7 +124,7 @@ assertEquals(changeNumber3, handler.getLastChange()); //-- // Iterator tests with db and memory queue populated // Cursor tests with db and memory queue populated // all changes in the db - add one in the memory queue handler.add(update4); @@ -132,7 +132,7 @@ assertEquals(handler.getQueueSize(),1); assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3, changeNumber4); // Test iterator from existing CN at the limit between queue and db // Test cursor from existing CN at the limit between queue and db assertFoundInOrder(handler, changeNumber3, changeNumber4); assertFoundInOrder(handler, changeNumber4); assertNotFound(handler, changeNumber5); @@ -196,33 +196,31 @@ return; } ReplicationIterator it = handler.generateIterator(changeNumbers[0]); ReplicationDBCursor cursor = handler.generateCursorFrom(changeNumbers[0]); try { for (int i = 1; i < changeNumbers.length; i++) { assertTrue(it.next()); final ChangeNumber cn = it.getChange().getChangeNumber(); final boolean equals = cn.compareTo(changeNumbers[i]) == 0; assertTrue(equals, "Actual change number=" + cn + ", Expected change number=" + changeNumbers[i]); assertTrue(cursor.next()); final ChangeNumber cn = cursor.getChange().getChangeNumber(); assertEquals(cn, changeNumbers[i]); } assertFalse(it.next()); assertNull(it.getChange(), "Actual change number=" + it.getChange() + ", Expected null"); assertFalse(cursor.next()); assertNull(cursor.getChange(), "Actual change number=" + cursor.getChange() + ", Expected null"); } finally { StaticUtils.close(it); StaticUtils.close(cursor); } } private void assertNotFound(DbHandler handler, ChangeNumber changeNumber) { ReplicationIterator iter = null; ReplicationDBCursor cursor = null; try { iter = handler.generateIterator(changeNumber); cursor = handler.generateCursorFrom(changeNumber); fail("Expected exception"); } catch (Exception e) @@ -231,7 +229,7 @@ } finally { StaticUtils.close(iter); StaticUtils.close(cursor); } }