From 8170a25a086fb89f90a2b2b8613e7b7d82a47208 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 08 Aug 2013 13:37:11 +0000
Subject: [PATCH] Big refactoring of ReplicationDB to make it readable. Changed count() return type from int to long. Removed a useless parameter from ReplicationIterator ctor. Various code cleanups in other classes.
---
/dev/null | 59 ---
opends/src/server/org/opends/server/replication/server/ReplicationIterator.java | 15
opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java | 106 ++---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java | 300 +++++----------
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 625 +++++++++++++++++---------------
opends/src/server/org/opends/server/replication/server/DbHandler.java | 4
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java | 15
7 files changed, 512 insertions(+), 612 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 8231012..219204c 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -282,7 +282,7 @@
{
flush();
}
- return new ReplicationIterator(serverId, db, changeNumber, this);
+ return new ReplicationIterator(db, changeNumber, this);
}
/**
@@ -657,7 +657,7 @@
* @param to The upper (newer) change number.
* @return The computed number of changes.
*/
- public int getCount(ChangeNumber from, ChangeNumber to)
+ public long getCount(ChangeNumber from, ChangeNumber to)
{
// Now that we always keep the last ChangeNumber in the DB to avoid
// expiring cookies too quickly, we need to check if the "to"
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index fb0155b..5ea35d9 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -32,6 +32,7 @@
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
+import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -50,6 +51,9 @@
*/
public class ReplicationDB
{
+ private static final int START = 0;
+ private static final int STOP = 1;
+
private Database db = null;
private ReplicationDbEnv dbenv = null;
private ReplicationServer replicationServer;
@@ -60,7 +64,8 @@
* The lock used to provide exclusive access to the thread that close the db
* (shutdown or clear).
*/
- private ReentrantReadWriteLock dbCloseLock;
+ private final ReentrantReadWriteLock dbCloseLock =
+ new ReentrantReadWriteLock(true);
// Change counter management
// The Db itself does not allow to count records between a start and an end
@@ -76,9 +81,9 @@
// - a counter value : count of changes since previous counter record.
//
// A counter record has to follow the order of the db, so it needs to have
- // a changenumber key that follow the order.
+ // a changenumber key that follows the order.
// A counter record must have its own changenumber key since the Db does not
- // support duplicate key (it is a compatibility breaker character of the DB).
+ // support duplicate keys (it is a compatibility breaker character of the DB).
//
// We define 2 conditions to store a counter record :
// 1/- at least 'counterWindowSize' changes have been stored in the Db
@@ -88,7 +93,7 @@
/** Current value of the counter. */
- private int counterCurrValue = 1;
+ private int counterCurrValue = 1;
/**
* When not null, the next change with a ts different from
@@ -100,7 +105,7 @@
* The counter record will never be written to the db more often than each
* counterWindowSize changes.
*/
- private int counterWindowSize = 1000;
+ private int counterWindowSize = 1000;
/**
* Creates a new database or open existing database that will be used
@@ -122,40 +127,37 @@
this.replicationServer = replicationServer;
// Get or create the associated ReplicationServerDomain and Db.
- db = dbenv.getOrAddDb(serverId, baseDn,
- replicationServer.getReplicationServerDomain(baseDn,
- true).getGenerationId());
+ final ReplicationServerDomain domain =
+ replicationServer.getReplicationServerDomain(baseDn, true);
+ db = dbenv.getOrAddDb(serverId, baseDn, domain.getGenerationId());
- dbCloseLock = new ReentrantReadWriteLock(true);
- Cursor cursor;
- Transaction txn = null;
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- OperationStatus status;
- int distBackToCounterRecord = 0;
+ intializeCounters();
+ }
- // Initialize counter
+ private void intializeCounters()
+ {
this.counterCurrValue = 1;
- cursor = db.openCursor(txn, null);
+
+ Cursor cursor = db.openCursor(null, null);
try
{
- status = cursor.getLast(key, data, LockMode.DEFAULT);
+ int distBackToCounterRecord = 0;
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
while (status == OperationStatus.SUCCESS)
{
- ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData()));
- if (!ReplicationDB.isaCounter(cn))
+ ChangeNumber cn = toChangeNumber(key.getData());
+ if (isACounterRecord(cn))
{
- status = cursor.getPrev(key, data, LockMode.DEFAULT);
- distBackToCounterRecord++;
- }
- else
- {
- // counter record
counterCurrValue = decodeCounterValue(data.getData()) + 1;
counterTsLimit = cn.getTime();
break;
}
+
+ status = cursor.getPrev(key, data, LockMode.DEFAULT);
+ distBackToCounterRecord++;
}
counterCurrValue += distBackToCounterRecord;
}
@@ -165,6 +167,10 @@
}
}
+ private static ChangeNumber toChangeNumber(byte[] data)
+ {
+ return new ChangeNumber(decodeUTF8(data));
+ }
/**
@@ -188,29 +194,11 @@
for (UpdateMsg change : changes)
{
- DatabaseEntry key = new ReplicationKey(
- change.getChangeNumber());
- DatabaseEntry data = new ReplicationData(change);
+ final DatabaseEntry key =
+ createReplicationKey(change.getChangeNumber());
+ final DatabaseEntry data = new ReplicationData(change);
- if ((counterCurrValue != 0)
- && (counterCurrValue % counterWindowSize == 0))
- {
- // enough changes to generate a counter record - wait for the next
- // change of time
- counterTsLimit = change.getChangeNumber().getTime();
- }
- if ((counterTsLimit != 0)
- && (change.getChangeNumber().getTime() != counterTsLimit))
- {
- // Write the counter record
- DatabaseEntry counterKey = new ReplicationKey(
- new ChangeNumber(change.getChangeNumber().getTime(),
- 0, 0));
- DatabaseEntry counterValue =
- encodeCounterValue(counterCurrValue - 1);
- db.put(null, counterKey, counterValue);
- counterTsLimit = 0;
- }
+ insertCounterRecordIfNeeded(change.getChangeNumber());
db.put(null, key, data);
counterCurrValue++;
}
@@ -226,6 +214,39 @@
}
}
+ private void insertCounterRecordIfNeeded(ChangeNumber changeNumber)
+ {
+ if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0))
+ {
+ // enough changes to generate a counter record
+ // wait for the next change of time
+ counterTsLimit = changeNumber.getTime();
+ }
+ if (counterTsLimit != 0 && changeNumber.getTime() != counterTsLimit)
+ {
+ // Write the counter record
+ final ChangeNumber counterRecord = newCounterRecord(changeNumber);
+ DatabaseEntry counterKey = createReplicationKey(counterRecord);
+ DatabaseEntry counterValue = encodeCounterValue(counterCurrValue - 1);
+ db.put(null, counterKey, counterValue);
+ counterTsLimit = 0;
+ }
+ }
+
+ private DatabaseEntry createReplicationKey(ChangeNumber changeNumber)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ try
+ {
+ key.setData(changeNumber.toString().getBytes("UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // Should never happens, UTF-8 is always supported
+ // TODO : add better logging
+ }
+ return key;
+ }
/**
* Shutdown the database.
@@ -288,8 +309,7 @@
- private void closeLockedCursor(Cursor cursor)
- throws DatabaseException
+ private void closeAndReleaseReadLock(Cursor cursor) throws DatabaseException
{
try
{
@@ -308,8 +328,6 @@
public ChangeNumber readFirstChange()
{
Cursor cursor = null;
- ChangeNumber cn = null;
-
try
{
dbCloseLock.readLock().lock();
@@ -321,49 +339,43 @@
return null;
}
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
-
cursor = db.openCursor(null, null);
- OperationStatus status = cursor.getFirst(key, data,
- LockMode.DEFAULT);
-
- if (status != OperationStatus.SUCCESS)
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ LockMode defaultMode = LockMode.DEFAULT;
+ if (cursor.getFirst(key, data, defaultMode) != OperationStatus.SUCCESS)
{
- /* database is empty */
+ // database is empty
return null;
}
- String str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
+ final ChangeNumber cn = toChangeNumber(key.getData());
+ if (!isACounterRecord(cn))
{
- // First record is a counter record .. go next
- status = cursor.getNext(key, data, LockMode.DEFAULT);
- if (status != OperationStatus.SUCCESS)
- {
- // DB contains only a counter record
- return null;
- }
- else
- {
- cn = new ChangeNumber(decodeUTF8(key.getData()));
- }
+ return cn;
}
+
+ // First record is a counter record .. go next
+ if (cursor.getNext(key, data, defaultMode) != OperationStatus.SUCCESS)
+ {
+ // DB contains only a counter record
+ return null;
+ }
+ // There cannot be 2 counter record next to each other,
+ // it is safe to return this record
+ return toChangeNumber(key.getData());
}
finally
{
- closeLockedCursor(cursor);
+ closeAndReleaseReadLock(cursor);
}
}
catch (DatabaseException e)
{
- /* database is faulty */
replicationServer.handleUnexpectedDatabaseException(e);
- cn = null;
+ return null;
}
- return cn;
}
@@ -376,8 +388,6 @@
public ChangeNumber readLastChange()
{
Cursor cursor = null;
- ChangeNumber cn = null;
-
try
{
dbCloseLock.readLock().lock();
@@ -389,53 +399,45 @@
return null;
}
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
-
cursor = db.openCursor(null, null);
- OperationStatus status = cursor.getLast(key, data,
- LockMode.DEFAULT);
-
- if (status != OperationStatus.SUCCESS)
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ LockMode defaultMode = LockMode.DEFAULT;
+ if (cursor.getLast(key, data, defaultMode) != OperationStatus.SUCCESS)
{
- /* database is empty */
+ // database is empty
return null;
}
- String str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
+ final ChangeNumber cn = toChangeNumber(key.getData());
+ if (!isACounterRecord(cn))
{
- if (cursor.getPrev(key, data, LockMode.DEFAULT)
- != OperationStatus.SUCCESS)
- {
- /*
- * database only contain a counter record - don't know how much it
- * can be possible but ...
- */
- cn = null;
- }
- else
- {
- str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- // There can't be 2 counter record next to each other
- }
+ return cn;
}
+
+ if (cursor.getPrev(key, data, defaultMode) != OperationStatus.SUCCESS)
+ {
+ /*
+ * database only contain a counter record - don't know how much it can
+ * be possible but ...
+ */
+ return null;
+ }
+ // There cannot be 2 counter record next to each other,
+ // it is safe to return this record
+ return toChangeNumber(key.getData());
}
finally
{
- closeLockedCursor(cursor);
+ closeAndReleaseReadLock(cursor);
}
}
catch (DatabaseException e)
{
replicationServer.handleUnexpectedDatabaseException(e);
- cn = null;
+ return null;
}
-
- return cn;
}
/**
@@ -455,11 +457,6 @@
}
Cursor cursor = null;
- ChangeNumber cn = null;
-
- DatabaseEntry key = new ReplicationKey(changeNumber);
- DatabaseEntry data = new DatabaseEntry();
-
try
{
dbCloseLock.readLock().lock();
@@ -471,6 +468,8 @@
return null;
}
+ DatabaseEntry key = createReplicationKey(changeNumber);
+ DatabaseEntry data = new DatabaseEntry();
cursor = db.openCursor(null, null);
if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT)
== OperationStatus.SUCCESS)
@@ -480,23 +479,7 @@
if (cursor.getPrev(key, data, LockMode.DEFAULT)
== OperationStatus.SUCCESS)
{
- String str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
- {
- if (cursor.getPrev(key, data, LockMode.DEFAULT)
- != OperationStatus.SUCCESS)
- {
- // database starts with a counter record.
- cn = null;
- }
- else
- {
- str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- // There can't be 2 counter record next to each other
- }
- }
+ return getRegularRecord(cursor, key, data);
}
// else, there was no change previous to our changeNumber.
}
@@ -507,40 +490,41 @@
if (cursor.getLast(key, data, LockMode.DEFAULT)
== OperationStatus.SUCCESS)
{
- String str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
- {
- if (cursor.getPrev(key, data, LockMode.DEFAULT)
- != OperationStatus.SUCCESS)
- {
- /*
- * database only contain a counter record, should not be
- * possible, but Ok, let's just say no change Number
- */
- cn = null;
- }
- else
- {
- str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- // There can't be 2 counter record next to each other
- }
- }
+ return getRegularRecord(cursor, key, data);
}
}
}
finally
{
- closeLockedCursor(cursor);
+ closeAndReleaseReadLock(cursor);
}
}
catch (DatabaseException e)
{
replicationServer.handleUnexpectedDatabaseException(e);
- cn = null;
}
- return cn;
+ return null;
+ }
+
+ private ChangeNumber getRegularRecord(Cursor cursor, DatabaseEntry key,
+ DatabaseEntry data)
+ {
+ final ChangeNumber cn = toChangeNumber(key.getData());
+ if (!isACounterRecord(cn))
+ {
+ return cn;
+ }
+
+ // There cannot be 2 counter record next to each other,
+ // it is safe to return previous record which must exist
+ if (cursor.getPrev(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ return toChangeNumber(key.getData());
+ }
+
+ // database only contain a counter record, which should not be possible
+ // let's just say no changeNumber
+ return null;
}
@@ -587,7 +571,7 @@
{
if (startingChangeNumber != null)
{
- key = new ReplicationKey(startingChangeNumber);
+ key = createReplicationKey(startingChangeNumber);
}
else
{
@@ -646,7 +630,7 @@
catch (Exception e)
{
// Unlocking is required before throwing any exception
- closeLockedCursor(localCursor);
+ closeAndReleaseReadLock(localCursor);
throw e;
}
}
@@ -682,7 +666,7 @@
}
catch (Exception e)
{
- closeLockedCursor(localCursor);
+ closeAndReleaseReadLock(localCursor);
if (localTxn != null)
{
@@ -714,7 +698,8 @@
isClosed = true;
}
- closeLockedCursor(cursor);
+ closeAndReleaseReadLock(cursor);
+
if (txn != null)
{
try
@@ -747,7 +732,7 @@
isClosed = true;
}
- closeLockedCursor(cursor);
+ closeAndReleaseReadLock(cursor);
if (txn != null)
{
@@ -776,13 +761,11 @@
}
OperationStatus status = cursor.getNext(key, data, LockMode.DEFAULT);
-
if (status != OperationStatus.SUCCESS)
{
return null;
}
- String csnString = decodeUTF8(key.getData());
- return new ChangeNumber(csnString);
+ return toChangeNumber(key.getData());
}
/**
@@ -815,15 +798,12 @@
ChangeNumber cn = null;
try
{
- cn = new ChangeNumber(
- decodeUTF8(key.getData()));
- if (ReplicationDB.isaCounter(cn))
+ cn = toChangeNumber(key.getData());
+ if (isACounterRecord(cn))
{
- // counter record
continue;
}
- currentChange = ReplicationData.generateChange(data
- .getData());
+ currentChange = ReplicationData.generateChange(data.getData());
}
catch (Exception e)
{
@@ -893,7 +873,7 @@
dbenv.clearDb(dbName);
// RE-create the db
- db = dbenv.getOrAddDb(serverId, baseDn, (long)-1);
+ db = dbenv.getOrAddDb(serverId, baseDn, -1);
}
catch(Exception e)
{
@@ -916,15 +896,8 @@
* @return The number of changes between provided start and stop changeNumber.
* Returns 0 when an error occurs.
*/
- public int count(ChangeNumber start, ChangeNumber stop)
+ public long count(ChangeNumber start, ChangeNumber stop)
{
- int counterRecord1 = 0;
- int counterRecord2 = 0;
- int distToCounterRecord1 = 0;
- int distBackToCounterRecord2 = 0;
- int count=0;
- OperationStatus status;
-
try
{
dbCloseLock.readLock().lock();
@@ -937,144 +910,226 @@
{
return 0;
}
+ if (start == null && stop == null)
+ {
+ return db.count();
+ }
- ChangeNumber cn ;
-
- if ((start==null)&&(stop==null))
- return (int)db.count();
+ int[] counterValues = new int[2];
+ int[] distanceToCounterRecords = new int[2];
// Step 1 : from the start point, traverse db to the next counter record
// or to the stop point.
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
cursor = db.openCursor(null, null);
- if (start != null)
- {
- key = new ReplicationKey(start);
- status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
- if (status == OperationStatus.NOTFOUND)
- status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
- }
- else
- {
- status = cursor.getNext(key, data, LockMode.DEFAULT);
- }
-
- while (status == OperationStatus.SUCCESS)
- {
- // test whether the record is a regular change or a counter
- String csnString = decodeUTF8(key.getData());
- cn = new ChangeNumber(csnString);
- if (cn.getServerId() != 0)
- {
- // reached a regular change record
- // test whether we reached the 'stop' target
- if (!cn.newer(stop))
- {
- // let's loop
- distToCounterRecord1++;
- status = cursor.getNext(key, data, LockMode.DEFAULT);
- }
- else
- {
- // reached the end
- break;
- }
- }
- else
- {
- // counter record
- counterRecord1 = decodeCounterValue(data.getData());
- break;
- }
- }
+ findFirstCounterRecordAfterStartPoint(start, stop, cursor,
+ counterValues, distanceToCounterRecords);
cursor.close();
// cases
- //
- if (counterRecord1==0)
- return distToCounterRecord1;
+ if (counterValues[START] == 0)
+ return distanceToCounterRecords[START];
// Step 2 : from the stop point, traverse db to the next counter record
// or to the start point.
- data = new DatabaseEntry();
- key = new ReplicationKey(stop);
cursor = db.openCursor(null, null);
- status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
+ if (!findFirstCounterRecordBeforeStopPoint(start, stop, cursor,
+ counterValues, distanceToCounterRecords))
{
- cn = new ChangeNumber(decodeUTF8(key.getData()));
- }
- else
- {
- key = new DatabaseEntry();
- data = new DatabaseEntry();
- status = cursor.getLast(key, data, LockMode.DEFAULT);
- if (status != OperationStatus.SUCCESS)
- {
- /* database is empty */
- return 0;
- }
- }
- while (status == OperationStatus.SUCCESS)
- {
- cn = new ChangeNumber(decodeUTF8(key.getData()));
- if (!ReplicationDB.isaCounter(cn))
- {
- // regular change record
- if (!cn.older(start))
- {
- distBackToCounterRecord2++;
- status = cursor.getPrev(key, data, LockMode.DEFAULT);
- }
- else
- break;
- }
- else
- {
- // counter record
- counterRecord2 = decodeCounterValue(data.getData());
- break;
- }
+ // database is empty
+ return 0;
}
cursor.close();
// Step 3 : Now consolidates the result
- if (counterRecord1!=0)
- {
- if (counterRecord1 == counterRecord2)
- {
- // only one cp between from and to - no need to use it
- count = distToCounterRecord1 + distBackToCounterRecord2;
- }
- else
- {
- // 2 cp between from and to
- count = distToCounterRecord1 + (counterRecord2-counterRecord1)
- + distBackToCounterRecord2;
- }
- }
+ return computeDistance(counterValues, distanceToCounterRecords);
}
finally
{
- closeLockedCursor(cursor);
+ closeAndReleaseReadLock(cursor);
}
}
catch (DatabaseException e)
{
replicationServer.handleUnexpectedDatabaseException(e);
}
- return count;
+ return 0;
+ }
+
+
+ private void findFirstCounterRecordAfterStartPoint(ChangeNumber start,
+ ChangeNumber stop, Cursor cursor, int[] counterValues,
+ int[] distanceToCounterRecords)
+ {
+ OperationStatus status;
+ DatabaseEntry key;
+ DatabaseEntry data = new DatabaseEntry();
+ if (start != null)
+ {
+ key = createReplicationKey(start);
+ status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+ if (status == OperationStatus.NOTFOUND)
+ status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
+ }
+ else
+ {
+ key = new DatabaseEntry();
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
+ }
+
+ while (status == OperationStatus.SUCCESS)
+ {
+ // test whether the record is a regular change or a counter
+ final ChangeNumber cn = toChangeNumber(key.getData());
+ if (isACounterRecord(cn))
+ {
+ // we have found the counter record
+ counterValues[START] = decodeCounterValue(data.getData());
+ break;
+ }
+
+ // reached a regular change record
+ // test whether we reached the 'stop' target
+ if (!cn.newer(stop))
+ {
+ // let's loop
+ distanceToCounterRecords[START]++;
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
+ }
+ else
+ {
+ // reached the end
+ break;
+ }
+ }
+ }
+
+ private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start,
+ ChangeNumber stop, Cursor cursor, int[] counterValues,
+ int[] distanceToCounterRecords)
+ {
+ DatabaseEntry key = createReplicationKey(stop);
+ DatabaseEntry data = new DatabaseEntry();
+ OperationStatus status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+ if (status != OperationStatus.SUCCESS)
+ {
+ key = new DatabaseEntry();
+ data = new DatabaseEntry();
+ status = cursor.getLast(key, data, LockMode.DEFAULT);
+ if (status != OperationStatus.SUCCESS)
+ {
+ return false;
+ }
+ }
+
+ while (status == OperationStatus.SUCCESS)
+ {
+ final ChangeNumber cn = toChangeNumber(key.getData());
+ if (isACounterRecord(cn))
+ {
+ // we have found the counter record
+ counterValues[STOP] = decodeCounterValue(data.getData());
+ break;
+ }
+
+ // it is a regular change record
+ if (!cn.older(start))
+ {
+ distanceToCounterRecords[STOP]++;
+ status = cursor.getPrev(key, data, LockMode.DEFAULT);
+ }
+ else
+ break;
+ }
+ return true;
}
/**
- * Test if a provided changeNumber represents a counter record.
- * @param cn The provided changeNumber.
- * @return True if the provided changenumber is a counter.
+ * The diagram below shows a visual description of how the distance between
+ * two change numbers in the database is computed.
+ *
+ * <pre>
+ * +--------+ +--------+
+ * | CASE 1 | | CASE 2 |
+ * +--------+ +--------+
+ *
+ * CSN CSN
+ * ----- -----
+ * START => ----- START => -----
+ * ^ ----- ^ -----
+ * | ----- | -----
+ * dist 1 ----- dist 1 -----
+ * | ----- | -----
+ * v ----- v -----
+ * CR 1&2 => [1000] CR 1 => [1000]
+ * ^ ----- -----
+ * | ----- -----
+ * dist 2 ----- -----
+ * | ----- -----
+ * v ----- -----
+ * STOP => ----- -----
+ * ----- -----
+ * CR => [2000] CR 2 => [2000]
+ * ----- ^ -----
+ * | -----
+ * dist 2 -----
+ * | -----
+ * v -----
+ * STOP => -----
+ * </pre>
+ *
+ * Explanation of the terms used:
+ * <dl>
+ * <dt>START</dt>
+ * <dd>Start change number for the count</dd>
+ * <dt>STOP</dt>
+ * <dd>Stop change number for the count</dd>
+ * <dt>dist</dt>
+ * <dd>Distance from START (or STOP) to the counter record</dd>
+ * <dt>CSN</dt>
+ * <dd>Stands for "Change Sequence Number". Below it, the database is
+ * symbolized, where each record is represented by using dashes "-----". The
+ * database is ordered.</dd>
+ * <dt>CR</dt>
+ * <dd>Stands for "Counter Record". Counter Records are inserted in the
+ * database along with real change numbers, but they are not real changes.
+ * They are only used to speed up calculating the distance between 2 change
+ * numbers without the need to scan the whole database in between.</dd>
+ * </dl>
*/
- static private boolean isaCounter(ChangeNumber cn)
+ private long computeDistance(int[] counterValues,
+ int[] distanceToCounterRecords)
{
- return ((cn.getServerId()== 0) && (cn.getSeqnum()==0));
+ if (counterValues[START] != 0)
+ {
+ if (counterValues[START] == counterValues[STOP])
+ {
+ // only one counter record between from and to - no need to use it
+ return distanceToCounterRecords[START] + distanceToCounterRecords[STOP];
+ }
+ // at least 2 counter records between from and to
+ return distanceToCounterRecords[START]
+ + (counterValues[STOP] - counterValues[START])
+ + distanceToCounterRecords[STOP];
+ }
+ return 0;
+ }
+
+ /**
+ * Whether a provided changeNumber represents a counter record. A counter
+ * record is used to store TODO.
+ *
+ * @param cn
+ * The changeNumber to test
+ * @return true if the provided changenumber is a counter, false otherwise
+ */
+ private static boolean isACounterRecord(ChangeNumber cn)
+ {
+ return cn.getServerId() == 0 && cn.getSeqnum() == 0;
+ }
+
+ private static ChangeNumber newCounterRecord(ChangeNumber changeNumber)
+ {
+ return new ChangeNumber(changeNumber.getTime(), 0, 0);
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index dc33cc8..3758a13 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -315,25 +315,26 @@
serverId + " " + baseDn + " " + generationId);
try
{
- String key = serverId + FIELD_SEPARATOR + baseDn;
+ final String serverIdKey = serverId + FIELD_SEPARATOR + baseDn;
// Opens the database for the changes received from this server
// on this domain. Create it if it does not already exist.
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(true);
- Database db = dbEnvironment.openDatabase(null, key, dbConfig);
+ Database db = dbEnvironment.openDatabase(null, serverIdKey, dbConfig);
// Creates the record serverId/domain base Dn in the stateDb
// if it does not already exist.
- putInStateDBIfNotExist(key, key);
+ putInStateDBIfNotExist(serverIdKey, serverIdKey);
// Creates the record domain base Dn/ generationId in the stateDb
// if it does not already exist.
- key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
- String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
- + FIELD_SEPARATOR + baseDn;
- putInStateDBIfNotExist(key, data);
+ final String genIdKey = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
+ final String genIdData = GENERATION_ID_TAG
+ + FIELD_SEPARATOR + generationId
+ + FIELD_SEPARATOR + baseDn;
+ putInStateDBIfNotExist(genIdKey, genIdData);
return db;
}
catch (UnsupportedEncodingException e)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
index 29c086b..4e7a898 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -43,27 +43,25 @@
private ReplServerDBCursor cursor = null;
private DbHandler dbh;
private ReplicationDB db;
- ChangeNumber lastNonNullCurrentCN;
+ private ChangeNumber lastNonNullCurrentCN;
/**
* Creates a new ReplicationIterator.
* All created iterator must be released by the caller using the
* releaseCursor() method.
*
- * @param id the Identifier of the server on which the iterator applies.
* @param db The db where the iterator must be created.
* @param changeNumber The ChangeNumber after which the iterator must start.
- * @param dbh The associated DbHandler.
+ * @param dbHandler The associated DbHandler.
* @throws Exception If there is no other change to push after change
* with changeNumber number.
* @throws DatabaseException if a database problem happened.
*/
- public ReplicationIterator(
- int id, ReplicationDB db, ChangeNumber changeNumber, DbHandler dbh)
- throws Exception, DatabaseException
+ public ReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
+ DbHandler dbHandler) throws Exception, DatabaseException
{
this.db = db;
- this.dbh = dbh;
+ this.dbh = dbHandler;
this.lastNonNullCurrentCN = changeNumber;
try
@@ -79,7 +77,7 @@
if (cursor == null)
{
// flush the queue into the db
- dbh.flush();
+ dbHandler.flush();
// look again in the db
cursor = db.openReadCursor(changeNumber);
@@ -173,6 +171,7 @@
* Release the cursor in case the iterator was badly used and releaseCursor
* was never called.
*/
+ @Override
protected void finalize()
{
releaseCursor();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationKey.java b/opends/src/server/org/opends/server/replication/server/ReplicationKey.java
deleted file mode 100644
index 2fdbdcb..0000000
--- a/opends/src/server/org/opends/server/replication/server/ReplicationKey.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- * Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2010 ForgeRock AS.
- */
-package org.opends.server.replication.server;
-
-import java.io.UnsupportedEncodingException;
-
-import com.sleepycat.je.DatabaseEntry;
-
-import org.opends.server.replication.common.ChangeNumber;
-
-/**
- * Superclass of DatabaseEntry.
- * Useful to create ReplicationServer keys from ChangeNumbers.
- */
-public class ReplicationKey extends DatabaseEntry
-{
- private static final long serialVersionUID = 1L;
-
- /**
- * Creates a new ReplicationKey from the given ChangeNumber.
- * @param changeNumber The changeNumber to use.
- */
- public ReplicationKey(ChangeNumber changeNumber)
- {
- try
- {
- this.setData(changeNumber.toString().getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e)
- {
- // Should never happens, UTF-8 is always supported
- // TODO : add better logging
- }
- }
-}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index 6d3561d..a554143 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -32,34 +32,9 @@
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import java.io.*;
+import java.net.*;
+import java.util.*;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
import java.util.logging.LogManager;
@@ -74,22 +49,13 @@
import org.opends.server.api.Backend;
import org.opends.server.api.WorkQueue;
import org.opends.server.backends.MemoryBackend;
-import org.opends.server.backends.jeb.BackendImpl;
-import org.opends.server.backends.jeb.DatabaseContainer;
-import org.opends.server.backends.jeb.EntryContainer;
-import org.opends.server.backends.jeb.Index;
-import org.opends.server.backends.jeb.RootContainer;
+import org.opends.server.backends.jeb.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.extensions.ConfigFileHandler;
-import org.opends.server.loggers.AccessLogger;
-import org.opends.server.loggers.ErrorLogger;
-import org.opends.server.loggers.HTTPAccessLogger;
-import org.opends.server.loggers.TextAccessLogPublisher;
-import org.opends.server.loggers.TextErrorLogPublisher;
-import org.opends.server.loggers.TextHTTPAccessLogPublisher;
+import org.opends.server.loggers.*;
import org.opends.server.loggers.debug.DebugLogger;
import org.opends.server.loggers.debug.TextDebugLogPublisher;
import org.opends.server.plugins.InvocationCounterPlugin;
@@ -103,19 +69,8 @@
import org.opends.server.protocols.ldap.LDAPReader;
import org.opends.server.tools.LDAPModify;
import org.opends.server.tools.dsconfig.DSConfig;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeTypeConstants;
-import org.opends.server.types.ByteString;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryEnvironmentConfig;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
+import org.opends.server.types.*;
import org.opends.server.types.FilePermission;
-import org.opends.server.types.InitializationException;
-import org.opends.server.types.LDIFImportConfig;
-import org.opends.server.types.OperatingSystem;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.Schema;
import org.opends.server.util.BuildVersion;
import org.opends.server.util.EmbeddedUtils;
import org.opends.server.util.LDIFReader;
@@ -790,8 +745,7 @@
private static ServerSocket bindPort(int port)
throws IOException
{
- ServerSocket serverLdapSocket;
- serverLdapSocket = new ServerSocket();
+ ServerSocket serverLdapSocket = new ServerSocket();
serverLdapSocket.setReuseAddress(true);
serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", port));
return serverLdapSocket;
@@ -806,10 +760,48 @@
*/
public static ServerSocket bindFreePort() throws IOException
{
- ServerSocket serverLdapSocket = new ServerSocket();
- serverLdapSocket.setReuseAddress(true);
- serverLdapSocket.bind(new InetSocketAddress("127.0.0.1", 0));
- return serverLdapSocket;
+ return bindPort(0);
+ }
+
+ /**
+ * Find a free port on the local host.
+ *
+ * @throws IOException
+ * in case of underlying exception.
+ * @return the free port number found
+ */
+ public static int findFreePort() throws IOException
+ {
+ return findFreePorts(1)[0];
+ }
+
+ /**
+ * Find nb free ports on the local host.
+ *
+ * @param nb
+ * the number of free ports to find
+ * @throws IOException
+ * in case of underlying exception.
+ * @return an array with the free port numbers found
+ */
+ public static int[] findFreePorts(int nb) throws IOException
+ {
+ final ServerSocket[] sockets = new ServerSocket[nb];
+ try
+ {
+ final int[] ports = new int[nb];
+ for (int i = 0; i < nb; i++)
+ {
+ final ServerSocket socket = bindFreePort();
+ sockets[i] = socket;
+ ports[i] = socket.getLocalPort();
+ }
+ return ports;
+ }
+ finally
+ {
+ close(sockets);
+ }
}
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index 39e17c1..5da4f99 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -27,10 +27,16 @@
*/
package org.opends.server.replication.server;
+import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.testng.Assert.*;
+
import java.io.File;
-import java.net.ServerSocket;
+import java.io.IOException;
import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
@@ -38,18 +44,15 @@
import org.opends.server.replication.protocol.DeleteMsg;
import org.testng.annotations.Test;
-import static org.opends.server.TestCaseUtils.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.testng.Assert.*;
-
/**
* Test the dbHandler class
*/
@SuppressWarnings("javadoc")
public class DbHandlerTest extends ReplicationTestCase
{
- // The tracer object for the debug logger
+ /** The tracer object for the debug logger */
private static final DebugTracer TRACER = getTracer();
+
/**
* Utility - log debug message - highlight it is from the test and not
* from the server code. Makes easier to observe the test steps.
@@ -69,33 +72,15 @@
ReplicationServer replicationServer = null;
ReplicationDbEnv dbEnv = null;
DbHandler handler = null;
- ReplicationIterator it = null;
try
{
TestCaseUtils.startServer();
- // find a free port for the replicationServer
- ServerSocket socket = TestCaseUtils.bindFreePort();
- int changelogPort = socket.getLocalPort();
- socket.close();
-
- // configure a ReplicationServer.
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPort, null, 0,
- 2, 0, 100, null);
- replicationServer = new ReplicationServer(conf);
+ replicationServer = configureReplicationServer(100);
// create or clean a directory for the dbHandler
- String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
- String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
- buildRoot + File.separator + "build");
- path = path + File.separator + "unit-tests" + File.separator + "dbHandler";
- testRoot = new File(path);
- if (testRoot.exists())
- {
- TestCaseUtils.deleteDirectory(testRoot);
- }
- testRoot.mkdirs();
+ String path = getReplicationDbPath();
+ testRoot = createDirectory(path);
dbEnv = new ReplicationDbEnv(path, replicationServer);
@@ -109,18 +94,10 @@
ChangeNumber changeNumber4 = gen.newChangeNumber();
ChangeNumber changeNumber5 = gen.newChangeNumber();
- DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1,
- "uid");
- DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2,
- "uid");
- DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3,
- "uid");
- DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4,
- "uid");
-
- handler.add(update1);
- handler.add(update2);
- handler.add(update3);
+ handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid"));
+ handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid"));
+ handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid"));
+ DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber4, "uid");
//--
// Iterator tests with memory queue only populated
@@ -128,31 +105,8 @@
// verify that memory queue is populated
assertEquals(handler.getQueueSize(),3);
- // Iterator from existing CN
- it = handler.generateIterator(changeNumber1);
- assertTrue(it.next());
- assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
- " Actual change number=" + it.getChange().getChangeNumber() +
- " Expect change number=" + changeNumber2);
- assertTrue(it.next());
- assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
- " Actual change number=" + it.getChange().getChangeNumber());
- assertFalse(it.next());
- it.releaseCursor();
- it=null;
-
- // Iterator from NON existing CN
- Exception ec = null;
- try
- {
- it = handler.generateIterator(changeNumber5);
- }
- catch(Exception e)
- {
- ec = e;
- }
- assertNotNull(ec);
- assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+ assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3);
+ assertNotFound(handler, changeNumber5);
//--
// Iterator tests with db only populated
@@ -161,30 +115,8 @@
// verify that memory queue is empty (all changes flushed in the db)
assertEquals(handler.getQueueSize(),0);
- // Test iterator from existing CN
- it = handler.generateIterator(changeNumber1);
- assertTrue(it.next());
- assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
- " Actual change number=" + it.getChange().getChangeNumber());
- assertTrue(it.next());
- assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
- " Actual change number=" + it.getChange().getChangeNumber());
- assertFalse(it.next());
- it.releaseCursor();
- it=null;
-
- // Iterator from NON existing CN
- ec = null;
- try
- {
- it = handler.generateIterator(changeNumber5);
- }
- catch(Exception e)
- {
- ec = e;
- }
- assertNotNull(ec);
- assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+ assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3);
+ assertNotFound(handler, changeNumber5);
// Test first and last
assertEquals(changeNumber1, handler.getFirstChange());
@@ -198,52 +130,11 @@
// verify memory queue contains this one
assertEquals(handler.getQueueSize(),1);
- // Test iterator from existing CN
- it = handler.generateIterator(changeNumber1);
- assertTrue(it.next());
- assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber2)==0,
- " Actual change number=" + it.getChange().getChangeNumber());
- assertTrue(it.next());
- assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber3)==0,
- " Actual change number=" + it.getChange().getChangeNumber());
- assertTrue(it.next());
- assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
- " Actual change number=" + it.getChange().getChangeNumber());
- assertFalse(it.next());
- assertTrue(it.getChange()==null);
- it.releaseCursor();
- it=null;
-
+ assertFoundInOrder(handler, changeNumber1, changeNumber2, changeNumber3, changeNumber4);
// Test iterator from existing CN at the limit between queue and db
- it = handler.generateIterator(changeNumber3);
- assertTrue(it.next());
- assertTrue(it.getChange().getChangeNumber().compareTo(changeNumber4)==0,
- " Actual change number=" + it.getChange().getChangeNumber());
- assertFalse(it.next());
- assertTrue(it.getChange()==null);
- it.releaseCursor();
- it=null;
-
- // Test iterator from existing CN at the limit between queue and db
- it = handler.generateIterator(changeNumber4);
- assertFalse(it.next());
- assertTrue(it.getChange()==null,
- " Actual change number=" + it.getChange());
- it.releaseCursor();
- it=null;
-
- // Test iterator from NON existing CN
- ec = null;
- try
- {
- it = handler.generateIterator(changeNumber5);
- }
- catch(Exception e)
- {
- ec = e;
- }
- assertNotNull(ec);
- assert(ec.getLocalizedMessage().equals("ChangeNumber not available"));
+ assertFoundInOrder(handler, changeNumber3, changeNumber4);
+ assertFoundInOrder(handler, changeNumber4);
+ assertNotFound(handler, changeNumber5);
handler.setPurgeDelay(1);
@@ -262,13 +153,9 @@
purged = true;
}
}
+ // FIXME should add an assert here
} finally
{
- if (it != null)
- {
- it.releaseCursor();
- it=null;
- }
if (handler != null)
handler.shutdown();
if (dbEnv != null)
@@ -280,6 +167,74 @@
}
}
+ private ReplicationServer configureReplicationServer(int windowSize)
+ throws IOException, ConfigException
+ {
+ final int changelogPort = findFreePort();
+ final ReplicationServerCfg conf =
+ new ReplServerFakeConfiguration(changelogPort, null, 0, 2, 0, windowSize, null);
+ return new ReplicationServer(conf);
+ }
+
+ private String getReplicationDbPath()
+ {
+ String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+ String path =
+ System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
+ + File.separator + "build");
+ return path + File.separator + "unit-tests" + File.separator + "dbHandler";
+ }
+
+ private File createDirectory(String path) throws IOException
+ {
+ File testRoot = new File(path);
+ if (testRoot.exists())
+ {
+ TestCaseUtils.deleteDirectory(testRoot);
+ }
+ testRoot.mkdirs();
+ return testRoot;
+ }
+
+ private ReplicationIterator assertFoundInOrder(DbHandler handler,
+ ChangeNumber... changeNumbers) throws Exception
+ {
+ if (changeNumbers.length == 0)
+ {
+ return null;
+ }
+
+ ReplicationIterator it = handler.generateIterator(changeNumbers[0]);
+ for (int i = 1; i < changeNumbers.length; i++)
+ {
+ assertTrue(it.next());
+ final ChangeNumber cn = it.getChange().getChangeNumber();
+ final boolean equals = cn.compareTo(changeNumbers[i]) == 0;
+ assertTrue(equals, "Actual change number=" + cn
+ + ", Expected change number=" + changeNumbers[i]);
+ }
+ assertFalse(it.next());
+ assertNull(it.getChange(), "Actual change number=" + it.getChange()
+ + ", Expected null");
+
+ it.releaseCursor();
+ return it;
+ }
+
+ private void assertNotFound(DbHandler handler, ChangeNumber changeNumber)
+ {
+ try
+ {
+ ReplicationIterator iter = handler.generateIterator(changeNumber);
+ iter.releaseCursor();
+ fail("Expected exception");
+ }
+ catch (Exception e)
+ {
+ assertEquals(e.getLocalizedMessage(), "ChangeNumber not available");
+ }
+ }
+
/**
* Test the feature of clearing a dbHandler used by a replication server.
* The clear feature is used when a replication server receives a request
@@ -296,28 +251,11 @@
{
TestCaseUtils.startServer();
- // find a free port for the replicationServer
- ServerSocket socket = TestCaseUtils.bindFreePort();
- int changelogPort = socket.getLocalPort();
- socket.close();
-
- // configure a ReplicationServer.
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPort, null, 0,
- 2, 0, 100, null);
- replicationServer = new ReplicationServer(conf);
+ replicationServer = configureReplicationServer(100);
// create or clean a directory for the dbHandler
- String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
- String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
- buildRoot + File.separator + "build");
- path = path + File.separator + "unit-tests" + File.separator + "dbHandler";
- testRoot = new File(path);
- if (testRoot.exists())
- {
- TestCaseUtils.deleteDirectory(testRoot);
- }
- testRoot.mkdirs();
+ String path = getReplicationDbPath();
+ testRoot = createDirectory(path);
dbEnv = new ReplicationDbEnv(path, replicationServer);
@@ -331,17 +269,10 @@
ChangeNumber changeNumber2 = gen.newChangeNumber();
ChangeNumber changeNumber3 = gen.newChangeNumber();
- DeleteMsg update1 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1,
- "uid");
- DeleteMsg update2 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2,
- "uid");
- DeleteMsg update3 = new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3,
- "uid");
-
// Add the changes
- handler.add(update1);
- handler.add(update2);
- handler.add(update3);
+ handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber1, "uid"));
+ handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber2, "uid"));
+ handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, changeNumber3, "uid"));
// Check they are here
assertEquals(changeNumber1, handler.getFirstChange());
@@ -366,6 +297,7 @@
TestCaseUtils.deleteDirectory(testRoot);
}
}
+
/**
* Test the logic that manages counter records in the DbHandler in order to
* optimize the counting of record in the replication changelog db.
@@ -416,35 +348,17 @@
ReplicationServer replicationServer = null;
ReplicationDbEnv dbEnv = null;
DbHandler handler = null;
- ReplicationIterator ri = null;
- int actualCnt = 0;
+ long actualCnt = 0;
String testcase;
try
{
TestCaseUtils.startServer();
- // find a free port for the replicationServer
- ServerSocket socket = TestCaseUtils.bindFreePort();
- int changelogPort = socket.getLocalPort();
- socket.close();
-
- // configure a ReplicationServer.
- ReplServerFakeConfiguration conf =
- new ReplServerFakeConfiguration(changelogPort, null, 0,
- 2, 0, 100000, null);
- replicationServer = new ReplicationServer(conf);
+ replicationServer = configureReplicationServer(100000);
// create or clean a directory for the dbHandler
- String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
- String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR,
- buildRoot + File.separator + "build");
- path = path + File.separator + "unit-tests" + File.separator + "dbHandler";
- testRoot = new File(path);
- if (testRoot.exists())
- {
- TestCaseUtils.deleteDirectory(testRoot);
- }
- testRoot.mkdirs();
+ String path = getReplicationDbPath();
+ testRoot = createDirectory(path);
dbEnv = new ReplicationDbEnv(path, replicationServer);
@@ -589,7 +503,7 @@
handler.setPurgeDelay(100);
sleep(4000);
- int totalCount = handler.getCount(null, null);
+ long totalCount = handler.getCount(null, null);
debugInfo(tn,testcase + " After purge, total count=" + totalCount);
testcase="AFTER PURGE (first, last)=";
@@ -619,12 +533,9 @@
assertEquals(null, handler.getFirstChange());
assertEquals(null, handler.getLastChange());
debugInfo(tn,"Success");
-
}
finally
{
- if (ri!=null)
- ri.releaseCursor();
if (handler != null)
handler.shutdown();
if (dbEnv != null)
@@ -635,4 +546,5 @@
TestCaseUtils.deleteDirectory(testRoot);
}
}
+
}
--
Gitblit v1.10.0