opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -282,7 +282,7 @@ { flush(); } return new ReplicationIterator(serverId, db, changeNumber, this); return new ReplicationIterator(db, changeNumber, this); } /** @@ -657,7 +657,7 @@ * @param to The upper (newer) change number. * @return The computed number of changes. */ public int getCount(ChangeNumber from, ChangeNumber to) public long getCount(ChangeNumber from, ChangeNumber to) { // Now that we always keep the last ChangeNumber in the DB to avoid // expiring cookies too quickly, we need to check if the "to" opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -32,6 +32,7 @@ import static org.opends.server.util.StaticUtils.*; import java.io.Closeable; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -50,6 +51,9 @@ */ public class ReplicationDB { private static final int START = 0; private static final int STOP = 1; private Database db = null; private ReplicationDbEnv dbenv = null; private ReplicationServer replicationServer; @@ -60,7 +64,8 @@ * The lock used to provide exclusive access to the thread that close the db * (shutdown or clear). */ private ReentrantReadWriteLock dbCloseLock; private final ReentrantReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true); // Change counter management // The Db itself does not allow to count records between a start and an end @@ -76,9 +81,9 @@ // - a counter value : count of changes since previous counter record. // // A counter record has to follow the order of the db, so it needs to have // a changenumber key that follow the order. // a changenumber key that follows the order. // A counter record must have its own changenumber key since the Db does not // support duplicate key (it is a compatibility breaker character of the DB). // support duplicate keys (it is a compatibility breaker character of the DB). // // We define 2 conditions to store a counter record : // 1/- at least 'counterWindowSize' changes have been stored in the Db @@ -88,7 +93,7 @@ /** Current value of the counter. */ private int counterCurrValue = 1; private int counterCurrValue = 1; /** * When not null, the next change with a ts different from @@ -100,7 +105,7 @@ * The counter record will never be written to the db more often than each * counterWindowSize changes. */ private int counterWindowSize = 1000; private int counterWindowSize = 1000; /** * Creates a new database or open existing database that will be used @@ -122,40 +127,37 @@ this.replicationServer = replicationServer; // Get or create the associated ReplicationServerDomain and Db. db = dbenv.getOrAddDb(serverId, baseDn, replicationServer.getReplicationServerDomain(baseDn, true).getGenerationId()); final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDn, true); db = dbenv.getOrAddDb(serverId, baseDn, domain.getGenerationId()); dbCloseLock = new ReentrantReadWriteLock(true); Cursor cursor; Transaction txn = null; DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); OperationStatus status; int distBackToCounterRecord = 0; intializeCounters(); } // Initialize counter private void intializeCounters() { this.counterCurrValue = 1; cursor = db.openCursor(txn, null); Cursor cursor = db.openCursor(null, null); try { status = cursor.getLast(key, data, LockMode.DEFAULT); int distBackToCounterRecord = 0; DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT); while (status == OperationStatus.SUCCESS) { ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData())); if (!ReplicationDB.isaCounter(cn)) ChangeNumber cn = toChangeNumber(key.getData()); if (isACounterRecord(cn)) { status = cursor.getPrev(key, data, LockMode.DEFAULT); distBackToCounterRecord++; } else { // counter record counterCurrValue = decodeCounterValue(data.getData()) + 1; counterTsLimit = cn.getTime(); break; } status = cursor.getPrev(key, data, LockMode.DEFAULT); distBackToCounterRecord++; } counterCurrValue += distBackToCounterRecord; } @@ -165,6 +167,10 @@ } } private static ChangeNumber toChangeNumber(byte[] data) { return new ChangeNumber(decodeUTF8(data)); } /** @@ -188,29 +194,11 @@ for (UpdateMsg change : changes) { DatabaseEntry key = new ReplicationKey( change.getChangeNumber()); DatabaseEntry data = new ReplicationData(change); final DatabaseEntry key = createReplicationKey(change.getChangeNumber()); final DatabaseEntry data = new ReplicationData(change); if ((counterCurrValue != 0) && (counterCurrValue % counterWindowSize == 0)) { // enough changes to generate a counter record - wait for the next // change of time counterTsLimit = change.getChangeNumber().getTime(); } if ((counterTsLimit != 0) && (change.getChangeNumber().getTime() != counterTsLimit)) { // Write the counter record DatabaseEntry counterKey = new ReplicationKey( new ChangeNumber(change.getChangeNumber().getTime(), 0, 0)); DatabaseEntry counterValue = encodeCounterValue(counterCurrValue - 1); db.put(null, counterKey, counterValue); counterTsLimit = 0; } insertCounterRecordIfNeeded(change.getChangeNumber()); db.put(null, key, data); counterCurrValue++; } @@ -226,6 +214,39 @@ } } private void insertCounterRecordIfNeeded(ChangeNumber changeNumber) { if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0)) { // enough changes to generate a counter record // wait for the next change of time counterTsLimit = changeNumber.getTime(); } if (counterTsLimit != 0 && changeNumber.getTime() != counterTsLimit) { // Write the counter record final ChangeNumber counterRecord = newCounterRecord(changeNumber); DatabaseEntry counterKey = createReplicationKey(counterRecord); DatabaseEntry counterValue = encodeCounterValue(counterCurrValue - 1); db.put(null, counterKey, counterValue); counterTsLimit = 0; } } private DatabaseEntry createReplicationKey(ChangeNumber changeNumber) { DatabaseEntry key = new DatabaseEntry(); try { key.setData(changeNumber.toString().getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { // Should never happens, UTF-8 is always supported // TODO : add better logging } return key; } /** * Shutdown the database. @@ -288,8 +309,7 @@ private void closeLockedCursor(Cursor cursor) throws DatabaseException private void closeAndReleaseReadLock(Cursor cursor) throws DatabaseException { try { @@ -308,8 +328,6 @@ public ChangeNumber readFirstChange() { Cursor cursor = null; ChangeNumber cn = null; try { dbCloseLock.readLock().lock(); @@ -321,49 +339,43 @@ return null; } DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); cursor = db.openCursor(null, null); OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); LockMode defaultMode = LockMode.DEFAULT; if (cursor.getFirst(key, data, defaultMode) != OperationStatus.SUCCESS) { /* database is empty */ // database is empty return null; } String str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) final ChangeNumber cn = toChangeNumber(key.getData()); if (!isACounterRecord(cn)) { // First record is a counter record .. go next status = cursor.getNext(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { // DB contains only a counter record return null; } else { cn = new ChangeNumber(decodeUTF8(key.getData())); } return cn; } // First record is a counter record .. go next if (cursor.getNext(key, data, defaultMode) != OperationStatus.SUCCESS) { // DB contains only a counter record return null; } // There cannot be 2 counter record next to each other, // it is safe to return this record return toChangeNumber(key.getData()); } finally { closeLockedCursor(cursor); closeAndReleaseReadLock(cursor); } } catch (DatabaseException e) { /* database is faulty */ replicationServer.handleUnexpectedDatabaseException(e); cn = null; return null; } return cn; } @@ -376,8 +388,6 @@ public ChangeNumber readLastChange() { Cursor cursor = null; ChangeNumber cn = null; try { dbCloseLock.readLock().lock(); @@ -389,53 +399,45 @@ return null; } DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); cursor = db.openCursor(null, null); OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); LockMode defaultMode = LockMode.DEFAULT; if (cursor.getLast(key, data, defaultMode) != OperationStatus.SUCCESS) { /* database is empty */ // database is empty return null; } String str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) final ChangeNumber cn = toChangeNumber(key.getData()); if (!isACounterRecord(cn)) { if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { /* * database only contain a counter record - don't know how much it * can be possible but ... */ cn = null; } else { str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); // There can't be 2 counter record next to each other } return cn; } if (cursor.getPrev(key, data, defaultMode) != OperationStatus.SUCCESS) { /* * database only contain a counter record - don't know how much it can * be possible but ... */ return null; } // There cannot be 2 counter record next to each other, // it is safe to return this record return toChangeNumber(key.getData()); } finally { closeLockedCursor(cursor); closeAndReleaseReadLock(cursor); } } catch (DatabaseException e) { replicationServer.handleUnexpectedDatabaseException(e); cn = null; return null; } return cn; } /** @@ -455,11 +457,6 @@ } Cursor cursor = null; ChangeNumber cn = null; DatabaseEntry key = new ReplicationKey(changeNumber); DatabaseEntry data = new DatabaseEntry(); try { dbCloseLock.readLock().lock(); @@ -471,6 +468,8 @@ return null; } DatabaseEntry key = createReplicationKey(changeNumber); DatabaseEntry data = new DatabaseEntry(); cursor = db.openCursor(null, null); if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) @@ -480,23 +479,7 @@ if (cursor.getPrev(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) { String str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // database starts with a counter record. cn = null; } else { str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); // There can't be 2 counter record next to each other } } return getRegularRecord(cursor, key, data); } // else, there was no change previous to our changeNumber. } @@ -507,40 +490,41 @@ if (cursor.getLast(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) { String str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { /* * database only contain a counter record, should not be * possible, but Ok, let's just say no change Number */ cn = null; } else { str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); // There can't be 2 counter record next to each other } } return getRegularRecord(cursor, key, data); } } } finally { closeLockedCursor(cursor); closeAndReleaseReadLock(cursor); } } catch (DatabaseException e) { replicationServer.handleUnexpectedDatabaseException(e); cn = null; } return cn; return null; } private ChangeNumber getRegularRecord(Cursor cursor, DatabaseEntry key, DatabaseEntry data) { final ChangeNumber cn = toChangeNumber(key.getData()); if (!isACounterRecord(cn)) { return cn; } // There cannot be 2 counter record next to each other, // it is safe to return previous record which must exist if (cursor.getPrev(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) { return toChangeNumber(key.getData()); } // database only contain a counter record, which should not be possible // let's just say no changeNumber return null; } @@ -587,7 +571,7 @@ { if (startingChangeNumber != null) { key = new ReplicationKey(startingChangeNumber); key = createReplicationKey(startingChangeNumber); } else { @@ -646,7 +630,7 @@ catch (Exception e) { // Unlocking is required before throwing any exception closeLockedCursor(localCursor); closeAndReleaseReadLock(localCursor); throw e; } } @@ -682,7 +666,7 @@ } catch (Exception e) { closeLockedCursor(localCursor); closeAndReleaseReadLock(localCursor); if (localTxn != null) { @@ -714,7 +698,8 @@ isClosed = true; } closeLockedCursor(cursor); closeAndReleaseReadLock(cursor); if (txn != null) { try @@ -747,7 +732,7 @@ isClosed = true; } closeLockedCursor(cursor); closeAndReleaseReadLock(cursor); if (txn != null) { @@ -776,13 +761,11 @@ } OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { return null; } String csnString = decodeUTF8(key.getData()); return new ChangeNumber(csnString); return toChangeNumber(key.getData()); } /** @@ -815,15 +798,12 @@ ChangeNumber cn = null; try { cn = new ChangeNumber( decodeUTF8(key.getData())); if (ReplicationDB.isaCounter(cn)) cn = toChangeNumber(key.getData()); if (isACounterRecord(cn)) { // counter record continue; } currentChange = ReplicationData.generateChange(data .getData()); currentChange = ReplicationData.generateChange(data.getData()); } catch (Exception e) { @@ -893,7 +873,7 @@ dbenv.clearDb(dbName); // RE-create the db db = dbenv.getOrAddDb(serverId, baseDn, (long)-1); db = dbenv.getOrAddDb(serverId, baseDn, -1); } catch(Exception e) { @@ -916,15 +896,8 @@ * @return The number of changes between provided start and stop changeNumber. * Returns 0 when an error occurs. */ public int count(ChangeNumber start, ChangeNumber stop) public long count(ChangeNumber start, ChangeNumber stop) { int counterRecord1 = 0; int counterRecord2 = 0; int distToCounterRecord1 = 0; int distBackToCounterRecord2 = 0; int count=0; OperationStatus status; try { dbCloseLock.readLock().lock(); @@ -937,144 +910,226 @@ { return 0; } if (start == null && stop == null) { return db.count(); } ChangeNumber cn ; if ((start==null)&&(stop==null)) return (int)db.count(); int[] counterValues = new int[2]; int[] distanceToCounterRecords = new int[2]; // Step 1 : from the start point, traverse db to the next counter record // or to the stop point. DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); cursor = db.openCursor(null, null); if (start != null) { key = new ReplicationKey(start); status = cursor.getSearchKey(key, data, LockMode.DEFAULT); if (status == OperationStatus.NOTFOUND) status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT); } else { status = cursor.getNext(key, data, LockMode.DEFAULT); } while (status == OperationStatus.SUCCESS) { // test whether the record is a regular change or a counter String csnString = decodeUTF8(key.getData()); cn = new ChangeNumber(csnString); if (cn.getServerId() != 0) { // reached a regular change record // test whether we reached the 'stop' target if (!cn.newer(stop)) { // let's loop distToCounterRecord1++; status = cursor.getNext(key, data, LockMode.DEFAULT); } else { // reached the end break; } } else { // counter record counterRecord1 = decodeCounterValue(data.getData()); break; } } findFirstCounterRecordAfterStartPoint(start, stop, cursor, counterValues, distanceToCounterRecords); cursor.close(); // cases // if (counterRecord1==0) return distToCounterRecord1; if (counterValues[START] == 0) return distanceToCounterRecords[START]; // Step 2 : from the stop point, traverse db to the next counter record // or to the start point. data = new DatabaseEntry(); key = new ReplicationKey(stop); cursor = db.openCursor(null, null); status = cursor.getSearchKey(key, data, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) if (!findFirstCounterRecordBeforeStopPoint(start, stop, cursor, counterValues, distanceToCounterRecords)) { cn = new ChangeNumber(decodeUTF8(key.getData())); } else { key = new DatabaseEntry(); data = new DatabaseEntry(); status = cursor.getLast(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { /* database is empty */ return 0; } } while (status == OperationStatus.SUCCESS) { cn = new ChangeNumber(decodeUTF8(key.getData())); if (!ReplicationDB.isaCounter(cn)) { // regular change record if (!cn.older(start)) { distBackToCounterRecord2++; status = cursor.getPrev(key, data, LockMode.DEFAULT); } else break; } else { // counter record counterRecord2 = decodeCounterValue(data.getData()); break; } // database is empty return 0; } cursor.close(); // Step 3 : Now consolidates the result if (counterRecord1!=0) { if (counterRecord1 == counterRecord2) { // only one cp between from and to - no need to use it count = distToCounterRecord1 + distBackToCounterRecord2; } else { // 2 cp between from and to count = distToCounterRecord1 + (counterRecord2-counterRecord1) + distBackToCounterRecord2; } } return computeDistance(counterValues, distanceToCounterRecords); } finally { closeLockedCursor(cursor); closeAndReleaseReadLock(cursor); } } catch (DatabaseException e) { replicationServer.handleUnexpectedDatabaseException(e); } return count; return 0; } private void findFirstCounterRecordAfterStartPoint(ChangeNumber start, ChangeNumber stop, Cursor cursor, int[] counterValues, int[] distanceToCounterRecords) { OperationStatus status; DatabaseEntry key; DatabaseEntry data = new DatabaseEntry(); if (start != null) { key = createReplicationKey(start); status = cursor.getSearchKey(key, data, LockMode.DEFAULT); if (status == OperationStatus.NOTFOUND) status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT); } else { key = new DatabaseEntry(); status = cursor.getNext(key, data, LockMode.DEFAULT); } while (status == OperationStatus.SUCCESS) { // test whether the record is a regular change or a counter final ChangeNumber cn = toChangeNumber(key.getData()); if (isACounterRecord(cn)) { // we have found the counter record counterValues[START] = decodeCounterValue(data.getData()); break; } // reached a regular change record // test whether we reached the 'stop' target if (!cn.newer(stop)) { // let's loop distanceToCounterRecords[START]++; status = cursor.getNext(key, data, LockMode.DEFAULT); } else { // reached the end break; } } } private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start, ChangeNumber stop, Cursor cursor, int[] counterValues, int[] distanceToCounterRecords) { DatabaseEntry key = createReplicationKey(stop); DatabaseEntry data = new DatabaseEntry(); OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { key = new DatabaseEntry(); data = new DatabaseEntry(); status = cursor.getLast(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { return false; } } while (status == OperationStatus.SUCCESS) { final ChangeNumber cn = toChangeNumber(key.getData()); if (isACounterRecord(cn)) { // we have found the counter record counterValues[STOP] = decodeCounterValue(data.getData()); break; } // it is a regular change record if (!cn.older(start)) { distanceToCounterRecords[STOP]++; status = cursor.getPrev(key, data, LockMode.DEFAULT); } else break; } return true; } /** * Test if a provided changeNumber represents a counter record. * @param cn The provided changeNumber. * @return True if the provided changenumber is a counter. * The diagram below shows a visual description of how the distance between * two change numbers in the database is computed. * * <pre> * +--------+ +--------+ * | CASE 1 | | CASE 2 | * +--------+ +--------+ * * CSN CSN * ----- ----- * START => ----- START => ----- * ^ ----- ^ ----- * | ----- | ----- * dist 1 ----- dist 1 ----- * | ----- | ----- * v ----- v ----- * CR 1&2 => [1000] CR 1 => [1000] * ^ ----- ----- * | ----- ----- * dist 2 ----- ----- * | ----- ----- * v ----- ----- * STOP => ----- ----- * ----- ----- * CR => [2000] CR 2 => [2000] * ----- ^ ----- * | ----- * dist 2 ----- * | ----- * v ----- * STOP => ----- * </pre> * * Explanation of the terms used: * <dl> * <dt>START</dt> * <dd>Start change number for the count</dd> * <dt>STOP</dt> * <dd>Stop change number for the count</dd> * <dt>dist</dt> * <dd>Distance from START (or STOP) to the counter record</dd> * <dt>CSN</dt> * <dd>Stands for "Change Sequence Number". Below it, the database is * symbolized, where each record is represented by using dashes "-----". The * database is ordered.</dd> * <dt>CR</dt> * <dd>Stands for "Counter Record". Counter Records are inserted in the * database along with real change numbers, but they are not real changes. * They are only used to speed up calculating the distance between 2 change * numbers without the need to scan the whole database in between.</dd> * </dl> */ static private boolean isaCounter(ChangeNumber cn) private long computeDistance(int[] counterValues, int[] distanceToCounterRecords) { return ((cn.getServerId()== 0) && (cn.getSeqnum()==0)); if (counterValues[START] != 0) { if (counterValues[START] == counterValues[STOP]) { // only one counter record between from and to - no need to use it return distanceToCounterRecords[START] + distanceToCounterRecords[STOP]; } // at least 2 counter records between from and to return distanceToCounterRecords[START] + (counterValues[STOP] - counterValues[START]) + distanceToCounterRecords[STOP]; } return 0; } /** * Whether a provided changeNumber represents a counter record. A counter * record is used to store TODO. * * @param cn * The changeNumber to test * @return true if the provided changenumber is a counter, false otherwise */ private static boolean isACounterRecord(ChangeNumber cn) { return cn.getServerId() == 0 && cn.getSeqnum() == 0; } private static ChangeNumber newCounterRecord(ChangeNumber changeNumber) { return new ChangeNumber(changeNumber.getTime(), 0, 0); } /** opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -315,25 +315,26 @@ serverId + " " + baseDn + " " + generationId); try { String key = serverId + FIELD_SEPARATOR + baseDn; final String serverIdKey = serverId + FIELD_SEPARATOR + baseDn; // Opens the database for the changes received from this server // on this domain. Create it if it does not already exist. DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(true); Database db = dbEnvironment.openDatabase(null, key, dbConfig); Database db = dbEnvironment.openDatabase(null, serverIdKey, dbConfig); // Creates the record serverId/domain base Dn in the stateDb // if it does not already exist. putInStateDBIfNotExist(key, key); putInStateDBIfNotExist(serverIdKey, serverIdKey); // Creates the record domain base Dn/ generationId in the stateDb // if it does not already exist. key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn; String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR + baseDn; putInStateDBIfNotExist(key, data); final String genIdKey = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn; final String genIdData = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR + baseDn; putInStateDBIfNotExist(genIdKey, genIdData); return db; } catch (UnsupportedEncodingException e) opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -43,27 +43,25 @@ private ReplServerDBCursor cursor = null; private DbHandler dbh; private ReplicationDB db; ChangeNumber lastNonNullCurrentCN; private ChangeNumber lastNonNullCurrentCN; /** * Creates a new ReplicationIterator. * All created iterator must be released by the caller using the * releaseCursor() method. * * @param id the Identifier of the server on which the iterator applies. * @param db The db where the iterator must be created. * @param changeNumber The ChangeNumber after which the iterator must start. * @param dbh The associated DbHandler. * @param dbHandler The associated DbHandler. * @throws Exception If there is no other change to push after change * with changeNumber number. * @throws DatabaseException if a database problem happened. */ public ReplicationIterator( int id, ReplicationDB db, ChangeNumber changeNumber, DbHandler dbh) throws Exception, DatabaseException public ReplicationIterator(ReplicationDB db, ChangeNumber changeNumber, DbHandler dbHandler) throws Exception, DatabaseException { this.db = db; this.dbh = dbh; this.dbh = dbHandler; this.lastNonNullCurrentCN = changeNumber; try @@ -79,7 +77,7 @@ if (cursor == null) { // flush the queue into the db dbh.flush(); dbHandler.flush(); // look again in the db cursor = db.openReadCursor(changeNumber); @@ -173,6 +171,7 @@ * Release the cursor in case the iterator was badly used and releaseCursor * was never called. */ @Override protected void finalize() { releaseCursor(); opends/src/server/org/opends/server/replication/server/ReplicationKey.java
File was deleted opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -32,34 +32,9 @@ import static org.opends.server.util.StaticUtils.*; import static org.testng.Assert.*; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PrintStream; import java.io.StringReader; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; import java.net.SocketException; import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.io.*; import java.net.*; import java.util.*; import java.util.logging.ConsoleHandler; import java.util.logging.Handler; import java.util.logging.LogManager; @@ -74,22 +49,13 @@ import org.opends.server.api.Backend; import org.opends.server.api.WorkQueue; import org.opends.server.backends.MemoryBackend; import org.opends.server.backends.jeb.BackendImpl; import org.opends.server.backends.jeb.DatabaseContainer; import org.opends.server.backends.jeb.EntryContainer; import org.opends.server.backends.jeb.Index; import org.opends.server.backends.jeb.RootContainer; import org.opends.server.backends.jeb.*; import org.opends.server.config.ConfigException; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.extensions.ConfigFileHandler; import org.opends.server.loggers.AccessLogger; import org.opends.server.loggers.ErrorLogger; import org.opends.server.loggers.HTTPAccessLogger; import org.opends.server.loggers.TextAccessLogPublisher; import org.opends.server.loggers.TextErrorLogPublisher; import org.opends.server.loggers.TextHTTPAccessLogPublisher; import org.opends.server.loggers.*; import org.opends.server.loggers.debug.DebugLogger; import org.opends.server.loggers.debug.TextDebugLogPublisher; import org.opends.server.plugins.InvocationCounterPlugin; @@ -103,19 +69,8 @@ import org.opends.server.protocols.ldap.LDAPReader; import org.opends.server.tools.LDAPModify; import org.opends.server.tools.dsconfig.DSConfig; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeTypeConstants; import org.opends.server.types.ByteString; import org.opends.server.types.DN; import org.opends.server.types.DirectoryEnvironmentConfig; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.*; import org.opends.server.types.FilePermission; import org.opends.server.types.InitializationException; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.OperatingSystem; import org.opends.server.types.ResultCode; import org.opends.server.types.Schema; import org.opends.server.util.BuildVersion; import org.opends.server.util.EmbeddedUtils; import org.opends.server.util.LDIFReader; @@ -790,8 +745,7 @@ private static ServerSocket bindPort(int port) throws IOException { ServerSocket serverLdapSocket; serverLdapSocket = new ServerSocket(); ServerSocket serverLdapSocket = new ServerSocket(); serverLdapSocket.setReuseAddress(true); serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", port)); return serverLdapSocket; @@ -806,10 +760,48 @@ */ public static ServerSocket bindFreePort() throws IOException { ServerSocket serverLdapSocket = new ServerSocket(); serverLdapSocket.setReuseAddress(true); serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", 0)); return serverLdapSocket; return bindPort(0); } /** * Find a free port on the local host. * * @throws IOException * in case of underlying exception. * @return the free port number found */ public static int findFreePort() throws IOException { return findFreePorts(1)[0]; } /** * Find nb free ports on the local host. * * @param nb * the number of free ports to find * @throws IOException * in case of underlying exception. * @return an array with the free port numbers found */ public static int[] findFreePorts(int nb) throws IOException { final ServerSocket[] sockets = new ServerSocket[nb]; try { final int[] ports = new int[nb]; for (int i = 0; i < nb; i++) { final ServerSocket socket = bindFreePort(); sockets[i] = socket; ports[i] = socket.getLocalPort(); } return ports; } finally { close(sockets); } } /** opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -27,10 +27,16 @@ */ package org.opends.server.replication.server; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.testng.Assert.*; import java.io.File; import java.net.ServerSocket; import java.io.IOException; import org.opends.server.TestCaseUtils; import org.opends.server.admin.std.server.ReplicationServerCfg; import org.opends.server.config.ConfigException; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.ChangeNumber; @@ -38,18 +44,15 @@ import org.opends.server.replication.protocol.DeleteMsg; import org.testng.annotations.Test; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.testng.Assert.*; /** * Test the dbHandler class */ @SuppressWarnings("javadoc") public class DbHandlerTest extends ReplicationTestCase { // The tracer object for the debug logger /** The tracer object for the debug logger */ private static final DebugTracer TRACER = getTracer(); /** * Utility - log debug message - highlight it is from the test and not * from the server code. Makes easier to observe the test steps. @@ -69,33 +72,15 @@ ReplicationServer replicationServer = null; ReplicationDbEnv dbEnv = null; DbHandler handler = null; ReplicationIterator it = null; try { TestCaseUtils.startServer(); // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int changelogPort = socket.getLocalPort(); socket.close(); // configure a ReplicationServer. ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, 100, null); replicationServer = new ReplicationServer(conf); replicationServer = configureReplicationServer(100); // create or clean a directory for the dbHandler String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + File.separator + "build"); path = path + File.separator + "unit-tests" + File.separator + "dbHandler"; testRoot = new File(path); if (testRoot.exists()) { TestCaseUtils.deleteDirectory(testRoot); } testRoot.mkdirs(); String path = getReplicationDbPath(); testRoot = createDirectory(path); dbEnv = new ReplicationDbEnv(path, replicationServer); @@ -109,18 +94,10 @@ ChangeNumber changeNumber4 = gen.newChangeNumber(); ChangeNumber changeNumber5 = gen.newChangeNumber(); DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid"); DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid"); DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid"); DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4, "uid"); handler.add(update1); handler.add(update2); handler.add(update3); handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid")); handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid")); handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid")); DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4, "uid"); //-- // Iterator tests with memory queue only populated @@ -128,31 +105,8 @@ // verify that memory queue is populated assertEquals(handler.getQueueSize(),3); // Iterator from existing CN it = handler.generateIterator(changeNumber1); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0, " Actual change number=" + it.getChange().getChangeNumber() + " Expect change number=" + changeNumber2); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertFalse(it.next()); it.releaseCursor(); it=null; // Iterator from NON existing CN Exception ec = null; try { it = handler.generateIterator(changeNumber5); } catch(Exception e) { ec = e; } assertNotNull(ec); assert(ec.getLocalizedMessage().equals("ChangeNumber not available")); assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3); assertNotFound(handler, changeNumber5); //-- // Iterator tests with db only populated @@ -161,30 +115,8 @@ // verify that memory queue is empty (all changes flushed in the db) assertEquals(handler.getQueueSize(),0); // Test iterator from existing CN it = handler.generateIterator(changeNumber1); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertFalse(it.next()); it.releaseCursor(); it=null; // Iterator from NON existing CN ec = null; try { it = handler.generateIterator(changeNumber5); } catch(Exception e) { ec = e; } assertNotNull(ec); assert(ec.getLocalizedMessage().equals("ChangeNumber not available")); assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3); assertNotFound(handler, changeNumber5); // Test first and last assertEquals(changeNumber1, handler.getFirstChange()); @@ -198,52 +130,11 @@ // verify memory queue contains this one assertEquals(handler.getQueueSize(),1); // Test iterator from existing CN it = handler.generateIterator(changeNumber1); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertFalse(it.next()); assertTrue(it.getChange()==null); it.releaseCursor(); it=null; assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3, changeNumber4); // Test iterator from existing CN at the limit between queue and db it = handler.generateIterator(changeNumber3); assertTrue(it.next()); assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0, " Actual change number=" + it.getChange().getChangeNumber()); assertFalse(it.next()); assertTrue(it.getChange()==null); it.releaseCursor(); it=null; // Test iterator from existing CN at the limit between queue and db it = handler.generateIterator(changeNumber4); assertFalse(it.next()); assertTrue(it.getChange()==null, " Actual change number=" + it.getChange()); it.releaseCursor(); it=null; // Test iterator from NON existing CN ec = null; try { it = handler.generateIterator(changeNumber5); } catch(Exception e) { ec = e; } assertNotNull(ec); assert(ec.getLocalizedMessage().equals("ChangeNumber not available")); assertFoundInOrder(handler, changeNumber3, changeNumber4); assertFoundInOrder(handler, changeNumber4); assertNotFound(handler, changeNumber5); handler.setPurgeDelay(1); @@ -262,13 +153,9 @@ purged = true; } } // FIXME should add an assert here } finally { if (it != null) { it.releaseCursor(); it=null; } if (handler != null) handler.shutdown(); if (dbEnv != null) @@ -280,6 +167,74 @@ } } private ReplicationServer configureReplicationServer(int windowSize) throws IOException, ConfigException { final int changelogPort = findFreePort(); final ReplicationServerCfg conf = new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, windowSize, null); return new ReplicationServer(conf); } private String getReplicationDbPath() { String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + File.separator + "build"); return path + File.separator + "unit-tests" + File.separator + "dbHandler"; } private File createDirectory(String path) throws IOException { File testRoot = new File(path); if (testRoot.exists()) { TestCaseUtils.deleteDirectory(testRoot); } testRoot.mkdirs(); return testRoot; } private ReplicationIterator assertFoundInOrder(DbHandler handler, ChangeNumber... changeNumbers) throws Exception { if (changeNumbers.length == 0) { return null; } ReplicationIterator it = handler.generateIterator(changeNumbers[0]); for (int i = 1; i < changeNumbers.length; i++) { assertTrue(it.next()); final ChangeNumber cn = it.getChange().getChangeNumber(); final boolean equals = cn.compareTo(changeNumbers[i]) == 0; assertTrue(equals, "Actual change number=" + cn + ", Expected change number=" + changeNumbers[i]); } assertFalse(it.next()); assertNull(it.getChange(), "Actual change number=" + it.getChange() + ", Expected null"); it.releaseCursor(); return it; } private void assertNotFound(DbHandler handler, ChangeNumber changeNumber) { try { ReplicationIterator iter = handler.generateIterator(changeNumber); iter.releaseCursor(); fail("Expected exception"); } catch (Exception e) { assertEquals(e.getLocalizedMessage(), "ChangeNumber not available"); } } /** * Test the feature of clearing a dbHandler used by a replication server. * The clear feature is used when a replication server receives a request @@ -296,28 +251,11 @@ { TestCaseUtils.startServer(); // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int changelogPort = socket.getLocalPort(); socket.close(); // configure a ReplicationServer. ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, 100, null); replicationServer = new ReplicationServer(conf); replicationServer = configureReplicationServer(100); // create or clean a directory for the dbHandler String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + File.separator + "build"); path = path + File.separator + "unit-tests" + File.separator + "dbHandler"; testRoot = new File(path); if (testRoot.exists()) { TestCaseUtils.deleteDirectory(testRoot); } testRoot.mkdirs(); String path = getReplicationDbPath(); testRoot = createDirectory(path); dbEnv = new ReplicationDbEnv(path, replicationServer); @@ -331,17 +269,10 @@ ChangeNumber changeNumber2 = gen.newChangeNumber(); ChangeNumber changeNumber3 = gen.newChangeNumber(); DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid"); DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid"); DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid"); // Add the changes handler.add(update1); handler.add(update2); handler.add(update3); handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid")); handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid")); handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid")); // Check they are here assertEquals(changeNumber1, handler.getFirstChange()); @@ -366,6 +297,7 @@ TestCaseUtils.deleteDirectory(testRoot); } } /** * Test the logic that manages counter records in the DbHandler in order to * optimize the counting of record in the replication changelog db. @@ -416,35 +348,17 @@ ReplicationServer replicationServer = null; ReplicationDbEnv dbEnv = null; DbHandler handler = null; ReplicationIterator ri = null; int actualCnt = 0; long actualCnt = 0; String testcase; try { TestCaseUtils.startServer(); // find a free port for the replicationServer ServerSocket socket = TestCaseUtils.bindFreePort(); int changelogPort = socket.getLocalPort(); socket.close(); // configure a ReplicationServer. ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, 100000, null); replicationServer = new ReplicationServer(conf); replicationServer = configureReplicationServer(100000); // create or clean a directory for the dbHandler String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot + File.separator + "build"); path = path + File.separator + "unit-tests" + File.separator + "dbHandler"; testRoot = new File(path); if (testRoot.exists()) { TestCaseUtils.deleteDirectory(testRoot); } testRoot.mkdirs(); String path = getReplicationDbPath(); testRoot = createDirectory(path); dbEnv = new ReplicationDbEnv(path, replicationServer); @@ -589,7 +503,7 @@ handler.setPurgeDelay(100); sleep(4000); int totalCount = handler.getCount(null, null); long totalCount = handler.getCount(null, null); debugInfo(tn,testcase + " After purge, total count=" + totalCount); testcase="AFTER PURGE (first, last)="; @@ -619,12 +533,9 @@ assertEquals(null, handler.getFirstChange()); assertEquals(null, handler.getLastChange()); debugInfo(tn,"Success"); } finally { if (ri!=null) ri.releaseCursor(); if (handler != null) handler.shutdown(); if (dbEnv != null) @@ -635,4 +546,5 @@ TestCaseUtils.deleteDirectory(testRoot); } } }