From 7a34cefa2a5bbdf339f1a50b856e3d7441006b8d Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 08 Jun 2011 14:33:10 +0000
Subject: [PATCH] Fix OPENDJ-184: Transient errors when accessing cn=changelog DraftCN DB result in complete shutdown of the replication service
---
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 23 +
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java | 146 ++---
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 675 ++++++++++++------------------
opends/src/server/org/opends/server/replication/server/DbHandler.java | 120 +---
opends/src/server/org/opends/server/replication/server/DraftCNDB.java | 211 ++-------
opends/src/server/org/opends/server/replication/server/DraftCNData.java | 21
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java | 50 +
opends/src/server/org/opends/server/replication/server/ReplicationData.java | 16
8 files changed, 486 insertions(+), 776 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index cfb83e4..b147246 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,7 +51,6 @@
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.LockConflictException;
/**
* This class is used for managing the replicationServer database for each
@@ -111,9 +110,6 @@
private final Object flushLock = new Object();
private ReplicationServer replicationServer;
- // The maximum number of retries in case of DatabaseDeadlock Exception.
- private static final int DEADLOCK_RETRIES = 10;
-
private long latestTrimDate = 0;
/**
@@ -122,7 +118,7 @@
* are older than this age are removed.
*
*/
- private long trimage;
+ private long trimAge;
/**
* Creates a new dbHandler associated to a given LDAP server.
@@ -143,7 +139,7 @@
this.replicationServer = replicationServer;
serverId = id;
this.baseDn = baseDn;
- trimage = replicationServer.getTrimage();
+ trimAge = replicationServer.getTrimAge();
queueMaxSize = queueSize;
queueLowmark = queueSize * 1 / 5;
queueHimark = queueSize * 4 / 5;
@@ -291,43 +287,6 @@
}
/**
- * Return the number of changes between 2 provided change numbers.
- * @param from The lower (older) change number.
- * @param to The upper (newer) change number.
- * @return The computed number of changes.
- */
- public int traverseAndCount(ChangeNumber from, ChangeNumber to)
- {
- int count = 0;
- flush();
- ReplServerDBCursor cursor = null;
- try
- {
- try
- {
- cursor = db.openReadCursor(from);
- }
- catch(Exception e)
- {
- return 0;
- }
- ChangeNumber curr = null;
- while ((curr = cursor.nextChangeNumber())!=null)
- {
- if (curr.newerOrEquals(to))
- break;
- count++;
- }
- }
- finally
- {
- if (cursor != null)
- cursor.abort();
- }
- return count;
- }
-
- /**
* Removes the provided number of messages from the beginning of the msgQueue.
*
* @param number the number of changes to be removed.
@@ -456,80 +415,67 @@
*/
private void trim() throws DatabaseException, Exception
{
- if (trimage == 0)
+ if (trimAge == 0)
+ {
return;
- int size = 0;
- boolean finished = false;
- boolean done = false;
+ }
- latestTrimDate = TimeThread.getTime() - trimage;
+ latestTrimDate = TimeThread.getTime() - trimAge;
ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0);
// Find the last changeNumber before the trimDate, in the Database.
- ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate);
+ ChangeNumber lastBeforeTrimDate = db
+ .getPreviousChangeNumber(trimDate);
if (lastBeforeTrimDate != null)
{
// If we found it, we want to stop trimming when reaching it.
trimDate = lastBeforeTrimDate;
}
- // In case of deadlock detection by the Database, this thread can
- // by aborted by a DeadlockException. This is a transient error and
- // the transaction should be attempted again.
- // We will try DEADLOCK_RETRIES times before failing.
- int tries = 0;
- while ((tries++ < DEADLOCK_RETRIES) && (!done))
+
+ for (int i = 0; i < 100; i++)
{
synchronized (flushLock)
{
- /* the trim is done by group in order to save some CPU and IO bandwidth
+ /*
+ * the trim is done by group in order to save some CPU and IO bandwidth
* start the transaction then do a bunch of remove then commit
*/
- ReplServerDBCursor cursor = db.openDeleteCursor();
-
+ final ReplServerDBCursor cursor = db.openDeleteCursor();
try
{
- while ((size < 5000 ) && (!finished))
+ for (int j = 0; j < 50; j++)
{
ChangeNumber changeNumber = cursor.nextChangeNumber();
- if (changeNumber != null)
+ if (changeNumber == null)
{
- if ((!changeNumber.equals(lastChange))
- && (changeNumber.older(trimDate)))
- {
- size++;
- cursor.delete();
- }
- else
- {
- firstChange = changeNumber;
- finished = true;
- }
+ cursor.close();
+ done = true;
+ return;
+ }
+
+ if ((!changeNumber.equals(lastChange))
+ && (changeNumber.older(trimDate)))
+ {
+ cursor.delete();
}
else
- finished = true;
+ {
+ firstChange = changeNumber;
+ cursor.close();
+ done = true;
+ return;
+ }
}
cursor.close();
- done = true;
- }
- catch (LockConflictException e)
- {
- cursor.abort();
- if (tries == DEADLOCK_RETRIES)
- {
- // could not handle the Deadlock after DEADLOCK_RETRIES tries.
- // shutdown the ReplicationServer.
- shutdown = true;
- throw (e);
- }
}
catch (Exception e)
{
// mark shutdown for this db so that we don't try again to
// stop it from cursor.close() or methods called by cursor.close()
- shutdown = true;
cursor.abort();
- throw (e);
+ shutdown = true;
+ throw e;
}
}
}
@@ -644,7 +590,7 @@
*/
public void setPurgeDelay(long delay)
{
- trimage = delay;
+ trimAge = delay;
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index 85af56e..d060a5b 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -32,7 +32,6 @@
import static org.opends.server.util.StaticUtils.decodeUTF8;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import java.io.UnsupportedEncodingException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.messages.MessageBuilder;
@@ -40,14 +39,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.types.DebugLogLevel;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
+import com.sleepycat.je.*;
/**
* This class implements the interface between the underlying database
@@ -61,9 +53,6 @@
private ReplicationDbEnv dbenv = null;
private ReplicationServer replicationServer;
- // The maximum number of retries in case of DatabaseDeadlock Exception.
- private static final int DEADLOCK_RETRIES = 10;
-
// The lock used to provide exclusive access to the thread that
// close the db (shutdown or clear).
private ReentrantReadWriteLock dbCloseLock;
@@ -103,71 +92,41 @@
public void addEntry(int draftCN, String value, String domainBaseDN,
ChangeNumber changeNumber)
{
- Transaction txn = null;
try
{
- int tries = 0;
- boolean done = false;
+ DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
+ DatabaseEntry data = new DraftCNData(draftCN,
+ value, domainBaseDN, changeNumber);
- // The database can return a Deadlock Exception if several threads are
- // accessing the database at the same time. This Exception is a
- // transient state, when it happens the transaction is aborted and
- // the operation is attempted again up to DEADLOCK_RETRIES times.
- while ((tries++ < DEADLOCK_RETRIES) && (!done))
+ // Use a transaction so that we can override durability.
+ Transaction txn = null;
+ dbCloseLock.readLock().lock();
+ try
{
- dbCloseLock.readLock().lock();
- try
- {
- txn = dbenv.beginTransaction();
-
- DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
- DatabaseEntry data = new DraftCNData(draftCN,
- value, domainBaseDN, changeNumber);
- db.put(txn, key, data);
- txn.commitWriteNoSync();
- txn = null;
- done = true;
- }
- catch (LockConflictException e)
- {
- // Try again.
- }
- finally
- {
- if (txn != null)
- {
- // No effect if txn has committed.
- txn.abort();
- txn = null;
- }
- dbCloseLock.readLock().unlock();
- }
+ txn = dbenv.beginTransaction();
+ db.put(txn, key, data);
+ txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
}
- if (!done)
+ finally
{
- // Could not write to the DB after DEADLOCK_RETRIES tries.
- // This ReplicationServer is not reliable and will be shutdown.
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- logError(mb.toMessage());
- replicationServer.shutdown();
+ if (txn != null)
+ {
+ // No effect if txn has committed.
+ try
+ {
+ txn.abort();
+ }
+ catch (Exception e)
+ {
+ // Ignored.
+ }
+ }
+ dbCloseLock.readLock().unlock();
}
}
catch (DatabaseException e)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
- }
- catch (UnsupportedEncodingException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
+ replicationServer.handleUnexpectedDatabaseException(e);
}
}
@@ -229,12 +188,20 @@
}
private void closeLockedCursor(Cursor cursor)
- throws DatabaseException
{
try
{
if (cursor != null)
- cursor.close();
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch (DatabaseException e)
+ {
+ // Ignore.
+ }
+ }
}
finally
{
@@ -248,12 +215,10 @@
*/
public int readFirstDraftCN()
{
- Cursor cursor = null;
- String str = null;
-
try
{
dbCloseLock.readLock().lock();
+ Cursor cursor = null;
try
{
cursor = db.openCursor(null, null);
@@ -265,9 +230,8 @@
/* database is empty */
return 0;
}
- str = decodeUTF8(key.getData());
- int sn = new Integer(str);
- return sn;
+
+ return new Integer(decodeUTF8(key.getData()));
}
finally
{
@@ -277,20 +241,7 @@
catch (DatabaseException e)
{
/* database is faulty */
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
- return 0;
- }
- catch (Exception e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
+ replicationServer.handleUnexpectedDatabaseException(e);
return 0;
}
}
@@ -318,12 +269,10 @@
*/
public int readLastDraftCN()
{
- Cursor cursor = null;
- String str = null;
-
try
{
dbCloseLock.readLock().lock();
+ Cursor cursor = null;
try
{
cursor = db.openCursor(null, null);
@@ -335,9 +284,8 @@
/* database is empty */
return 0;
}
- str = decodeUTF8(key.getData());
- int sn = new Integer(str);
- return sn;
+
+ return new Integer(decodeUTF8(key.getData()));
}
finally
{
@@ -346,16 +294,7 @@
}
catch (DatabaseException e)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
- return 0;
- }
- catch (Exception e)
- {
- replicationServer.shutdown();
+ replicationServer.handleUnexpectedDatabaseException(e);
return 0;
}
}
@@ -519,20 +458,7 @@
isClosed = true;
}
- boolean closeHasFailed = false;
-
- try
- {
- closeLockedCursor(cursor);
- }
- catch (Exception e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- closeHasFailed = true;
- }
+ closeLockedCursor(cursor);
if (txn != null)
{
@@ -540,20 +466,11 @@
{
txn.commit();
}
- catch (Exception e)
+ catch (DatabaseException e)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- closeHasFailed = true;
+ replicationServer.handleUnexpectedDatabaseException(e);
}
}
-
- if (closeHasFailed)
- {
- replicationServer.shutdown();
- }
}
/**
@@ -574,26 +491,7 @@
isClosed = true;
}
- boolean closeHasFailed = false;
-
- try
- {
- closeLockedCursor(cursor);
- }
- catch (LockConflictException e)
- {
- // The DB documentation states that a DeadlockException
- // on the close method of a cursor that is aborting should
- // be ignored.
- }
- catch (Exception e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- closeHasFailed = true;
- }
+ closeLockedCursor(cursor);
if (txn != null)
{
@@ -601,20 +499,11 @@
{
txn.abort();
}
- catch (Exception e)
+ catch (DatabaseException e)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- closeHasFailed = true;
+ replicationServer.handleUnexpectedDatabaseException(e);
}
}
-
- if (closeHasFailed)
- {
- replicationServer.shutdown();
- }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNData.java b/opends/src/server/org/opends/server/replication/server/DraftCNData.java
index fc4c171..47ce17e 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNData.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNData.java
@@ -23,10 +23,12 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
- * Portions Copyright 2010 ForgeRock AS.
+ * Portions Copyright 2010-2011 ForgeRock AS.
*/
package org.opends.server.replication.server;
+import static org.opends.server.util.StaticUtils.getBytes;
+
import java.io.UnsupportedEncodingException;
import org.opends.messages.Message;
@@ -53,29 +55,14 @@
* @param value The value (cookie).
* @param serviceID The serviceID (domain DN).
* @param changeNumber The replication change number.
- *
- * @throws UnsupportedEncodingException When the encoding of the message
- * failed because the UTF-8 encoding is not supported.
*/
public DraftCNData(int draftCN, String value,
String serviceID, ChangeNumber changeNumber)
- throws UnsupportedEncodingException
{
String record = value
+ FIELD_SEPARATOR + serviceID
+ FIELD_SEPARATOR + changeNumber;
-
- byte[] byteValue;
- try
- {
- byteValue = record.getBytes("UTF-8");
- this.setData(byteValue);
- }
- catch (UnsupportedEncodingException e)
- {
- // can't happen
- return;
- }
+ setData(getBytes(record));
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index ef4df7c..9230497 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -50,7 +50,6 @@
import org.opends.server.types.InitializationException;
import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.LockConflictException;
/**
* This class is used for managing the replicationServer database for each
@@ -84,16 +83,13 @@
private DirectoryThread thread = null;
private ReplicationServer replicationServer;
- // The maximum number of retries in case of DatabaseDeadlock Exception.
- private static final int DEADLOCK_RETRIES = 10;
-
/**
*
* The trim age in milliseconds. Changes record in the change DB that
* are older than this age are removed.
*
*/
- private long trimage;
+ private long trimAge;
/**
* Creates a new dbHandler associated to a given LDAP server.
@@ -108,7 +104,7 @@
throws DatabaseException
{
this.replicationServer = replicationServer;
- this.trimage = replicationServer.getTrimage();
+ this.trimAge = replicationServer.getTrimAge();
// DB initialization
db = new DraftCNDB(replicationServer, dbenv);
@@ -312,7 +308,7 @@
*/
public void trim() throws DatabaseException, Exception
{
- if (trimage == 0)
+ if (trimAge == 0)
return;
clear(null);
@@ -330,106 +326,86 @@
*
*/
public void clear(String serviceIDToClear)
- throws DatabaseException, Exception
+ throws DatabaseException, Exception
{
- if (this.count()==0)
+ if (this.count() == 0)
+ {
return;
+ }
- int size = 0;
- int tries = 0;
- boolean finished = false;
- boolean done = false;
+ ChangeNumber crossDomainEligibleCN = replicationServer
+ .getEligibleCN();
- ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
- // In case of deadlock detection by the Database, this thread can
- // by aborted by a DeadlockException. This is a transient error and
- // the transaction should be attempted again.
- // We will try DEADLOCK_RETRIES times before failing.
- while ((tries++ < DEADLOCK_RETRIES) && (!done))
+ for (int i = 0; i < 100; i++)
{
DraftCNDBCursor cursor = db.openDeleteCursor();
try
{
- while ((size < 5000 ) && (!finished))
+ for (int j = 0; j < 50; j++)
{
// let's traverse the DraftCNDb
if (!cursor.next())
{
- finished=true;
+ cursor.close();
+ return;
}
- else
+
+ ChangeNumber cn = cursor.currentChangeNumber();
+
+ // From the draftCNDb change record, get the domain and changeNumber
+ String serviceID = cursor.currentServiceID();
+
+ if ((serviceIDToClear != null)
+ && (serviceIDToClear.equalsIgnoreCase(serviceID)))
{
- ChangeNumber cn = cursor.currentChangeNumber();
-
- // From the draftCNDb change record, get the domain and changeNumber
- String serviceID = cursor.currentServiceID();
-
- if ((serviceIDToClear!=null) &&
- (serviceIDToClear.equalsIgnoreCase(serviceID)))
- {
- size++;
- cursor.delete();
- continue;
- }
-
- ReplicationServerDomain domain =
- replicationServer.getReplicationServerDomain(serviceID, false);
-
- if (domain==null)
- {
- // the domain has been removed since the record was written in the
- // draftCNDb, thus it makes no sense to keep the record in the
- // draftCNDb.
- size++;
- cursor.delete();
- }
- else
- {
- ServerState startState = domain.getStartState();
-
- // We don't use the returned endState but it's updating CN as
- // reading
- domain.getEligibleState(crossDomainEligibleCN);
-
- ChangeNumber fcn = startState.getMaxChangeNumber(
- cn.getServerId());
-
- int currentKey = cursor.currentKey();
- // Do not delete the lastKey. This should allow us to
- // preserve last change number over time.
- if ((currentKey != lastkey) && (cn.older(fcn)))
- {
- size++;
- cursor.delete();
- }
- else
- {
- firstkey = currentKey;
- finished = true;
- }
- }
+ cursor.delete();
+ continue;
}
+
+ ReplicationServerDomain domain = replicationServer
+ .getReplicationServerDomain(serviceID, false);
+
+ if (domain == null)
+ {
+ // the domain has been removed since the record was written in the
+ // draftCNDb, thus it makes no sense to keep the record in the
+ // draftCNDb.
+ cursor.delete();
+ continue;
+ }
+
+ ServerState startState = domain.getStartState();
+
+ // We don't use the returned endState but it's updating CN as
+ // reading
+ domain.getEligibleState(crossDomainEligibleCN);
+
+ ChangeNumber fcn = startState.getMaxChangeNumber(cn
+ .getServerId());
+
+ int currentKey = cursor.currentKey();
+
+ // Do not delete the lastKey. This should allow us to
+ // preserve last change number over time.
+ if ((currentKey != lastkey) && (cn.older(fcn)))
+ {
+ cursor.delete();
+ continue;
+ }
+
+ firstkey = currentKey;
+ cursor.close();
+ return;
}
+
cursor.close();
- done = true;
- }
- catch (LockConflictException e)
- {
- cursor.abort();
- if (tries == DEADLOCK_RETRIES)
- {
- // could not handle the Deadlock after DEADLOCK_RETRIES tries.
- // shutdown the ReplicationServer.
- shutdown = true;
- throw e;
- }
}
catch (Exception e)
{
// mark shutdown for this db so that we don't try again to
// stop it from cursor.close() or methods called by cursor.close()
- shutdown = true;
cursor.abort();
+ shutdown = true;
throw e;
}
}
@@ -492,7 +468,7 @@
*/
public void setPurgeDelay(long delay)
{
- trimage = delay;
+ trimAge = delay;
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 2a6c676..411c776 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -35,21 +35,13 @@
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.util.List;
-import java.io.UnsupportedEncodingException;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
+import com.sleepycat.je.*;
/**
* This class implements the interface between the underlying database
@@ -64,9 +56,6 @@
private int serverId;
private String baseDn;
- // The maximum number of retries in case of DatabaseDeadlock Exception.
- private static final int DEADLOCK_RETRIES = 10;
-
// The lock used to provide exclusive access to the thread that
// close the db (shutdown or clear).
private ReentrantReadWriteLock dbCloseLock;
@@ -180,98 +169,48 @@
*/
public void addEntries(List<UpdateMsg> changes)
{
- Transaction txn = null;
-
try
{
- int tries = 0;
- boolean done = false;
-
- // The database can return a Deadlock Exception if several threads are
- // accessing the database at the same time. This Exception is a
- // transient state, when it happens the transaction is aborted and
- // the operation is attempted again up to DEADLOCK_RETRIES times.
- while ((tries++ < DEADLOCK_RETRIES) && (!done))
+ dbCloseLock.readLock().lock();
+ try
{
- dbCloseLock.readLock().lock();
- try
+ for (UpdateMsg change : changes)
{
- txn = dbenv.beginTransaction();
+ DatabaseEntry key = new ReplicationKey(
+ change.getChangeNumber());
+ DatabaseEntry data = new ReplicationData(change);
- for (UpdateMsg change : changes)
+ if ((counterCurrValue != 0)
+ && (counterCurrValue % counterWindowSize == 0))
{
- DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
- DatabaseEntry data = new ReplicationData(change);
-
- if ((counterCurrValue!=0) &&
- (counterCurrValue%counterWindowSize == 0))
- {
- // enough changes to generate a counter record - wait for the next
- // change fo time
- counterTsLimit = change.getChangeNumber().getTime();
- }
- if ((counterTsLimit!=0)
- && (change.getChangeNumber().getTime() != counterTsLimit))
- {
- // Write the counter record
- DatabaseEntry counterKey = new ReplicationKey(
- new ChangeNumber(
- change.getChangeNumber().getTime(),
- 0, 0));
- DatabaseEntry counterValue =
- encodeCounterValue(counterCurrValue-1);
- db.put(txn, counterKey, counterValue);
- counterTsLimit=0;
- }
- db.put(txn, key, data);
- counterCurrValue++;
-
+ // enough changes to generate a counter record - wait for the next
+ // change of time
+ counterTsLimit = change.getChangeNumber().getTime();
}
- txn.commitWriteNoSync();
- txn = null;
- done = true;
- }
- catch (LockConflictException e)
- {
- // Try again.
- }
- finally
- {
- if (txn != null)
+ if ((counterTsLimit != 0)
+ && (change.getChangeNumber().getTime() != counterTsLimit))
{
- // No effect if txn has committed.
- txn.abort();
- txn = null;
+ // Write the counter record
+ DatabaseEntry counterKey = new ReplicationKey(
+ new ChangeNumber(change.getChangeNumber().getTime(),
+ 0, 0));
+ DatabaseEntry counterValue =
+ encodeCounterValue(counterCurrValue - 1);
+ db.put(null, counterKey, counterValue);
+ counterTsLimit = 0;
}
-
- dbCloseLock.readLock().unlock();
+ db.put(null, key, data);
+ counterCurrValue++;
}
}
- if (!done)
+ finally
{
- // Could not write to the DB after DEADLOCK_RETRIES tries.
- // This ReplicationServer is not reliable and will be shutdown.
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- logError(mb.toMessage());
- replicationServer.shutdown();
+ dbCloseLock.readLock().unlock();
}
}
catch (DatabaseException e)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
- }
- catch (UnsupportedEncodingException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
+ replicationServer.handleUnexpectedDatabaseException(e);
}
}
@@ -334,13 +273,24 @@
return new ReplServerDBCursor();
}
+
+
private void closeLockedCursor(Cursor cursor)
- throws DatabaseException
+ throws DatabaseException
{
try
{
if (cursor != null)
- cursor.close();
+ {
+ try
+ {
+ cursor.close();
+ }
+ catch (DatabaseException e)
+ {
+ // Ignore.
+ }
+ }
}
finally
{
@@ -358,53 +308,53 @@
String str = null;
ChangeNumber cn = null;
- dbCloseLock.readLock().lock();
try
{
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
-
- cursor = db.openCursor(null, null);
-
- OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
-
- if (status != OperationStatus.SUCCESS)
+ dbCloseLock.readLock().lock();
+ try
{
- /* database is empty */
- return null;
- }
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
- str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
- {
- // First record is a counter record .. go next
- status = cursor.getNext(key, data, LockMode.DEFAULT);
+ cursor = db.openCursor(null, null);
+
+ OperationStatus status = cursor.getFirst(key, data,
+ LockMode.DEFAULT);
+
if (status != OperationStatus.SUCCESS)
{
- // DB contains only a counter record
+ /* database is empty */
return null;
}
- else
+
+ str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
{
- cn = new ChangeNumber(decodeUTF8(key.getData()));
+ // First record is a counter record .. go next
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
+ if (status != OperationStatus.SUCCESS)
+ {
+ // DB contains only a counter record
+ return null;
+ }
+ else
+ {
+ cn = new ChangeNumber(decodeUTF8(key.getData()));
+ }
}
}
+ finally
+ {
+ closeLockedCursor(cursor);
+ }
}
catch (DatabaseException e)
{
/* database is faulty */
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
+ replicationServer.handleUnexpectedDatabaseException(e);
cn = null;
}
- finally
- {
- closeLockedCursor(cursor);
- }
return cn;
}
@@ -420,57 +370,56 @@
Cursor cursor = null;
ChangeNumber cn = null;
- dbCloseLock.readLock().lock();
try
{
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
-
- cursor = db.openCursor(null, null);
-
- OperationStatus status = cursor.getLast(key, data,
- LockMode.DEFAULT);
-
- if (status != OperationStatus.SUCCESS)
+ dbCloseLock.readLock().lock();
+ try
{
- /* database is empty */
- return null;
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+
+ cursor = db.openCursor(null, null);
+
+ OperationStatus status = cursor.getLast(key, data,
+ LockMode.DEFAULT);
+
+ if (status != OperationStatus.SUCCESS)
+ {
+ /* database is empty */
+ return null;
+ }
+
+ String str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
+ {
+ if (cursor.getPrev(key, data, LockMode.DEFAULT)
+ != OperationStatus.SUCCESS)
+ {
+ /*
+ * database only contain a counter record - don't know how much it
+ * can be possible but ...
+ */
+ cn = null;
+ }
+ else
+ {
+ str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ // There can't be 2 counter record next to each other
+ }
+ }
}
-
- String str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
+ finally
{
- if (cursor.getPrev(key, data,
- LockMode.DEFAULT) != OperationStatus.SUCCESS)
- {
- /*
- * database only contain a counter record - don't know how much it can
- * be possible but ...
- */
- cn = null;
- }
- else
- {
- str = decodeUTF8(key.getData());
- cn= new ChangeNumber(str);
- // There can't be 2 counter record next to each other
- }
+ closeLockedCursor(cursor);
}
}
catch (DatabaseException e)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- replicationServer.shutdown();
+ replicationServer.handleUnexpectedDatabaseException(e);
cn = null;
}
- finally
- {
- closeLockedCursor(cursor);
- }
return cn;
}
@@ -496,85 +445,80 @@
DatabaseEntry key = new ReplicationKey(changeNumber);
DatabaseEntry data = new DatabaseEntry();
- Transaction txn = null;
-
- dbCloseLock.readLock().lock();
try
{
- cursor = db.openCursor(txn, null);
- if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) ==
- OperationStatus.SUCCESS)
+ dbCloseLock.readLock().lock();
+ try
{
- // We can move close to the changeNumber.
- // Let's move to the previous change.
- if (cursor.getPrev(key, data, LockMode.DEFAULT) ==
- OperationStatus.SUCCESS)
+ cursor = db.openCursor(null, null);
+ if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT)
+ == OperationStatus.SUCCESS)
{
- String str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
+ // We can move close to the changeNumber.
+ // Let's move to the previous change.
+ if (cursor.getPrev(key, data, LockMode.DEFAULT)
+ == OperationStatus.SUCCESS)
{
- if (cursor.getPrev(key, data,
- LockMode.DEFAULT) != OperationStatus.SUCCESS)
+ String str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
{
- // database starts with a counter record.
- cn = null;
+ if (cursor.getPrev(key, data, LockMode.DEFAULT)
+ != OperationStatus.SUCCESS)
+ {
+ // database starts with a counter record.
+ cn = null;
+ }
+ else
+ {
+ str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ // There can't be 2 counter record next to each other
+ }
}
- else
+ }
+ // else, there was no change previous to our changeNumber.
+ }
+ else
+ {
+ // We could not move the cursor past to the changeNumber
+ // Check if the last change is older than changeNumber
+ if (cursor.getLast(key, data, LockMode.DEFAULT)
+ == OperationStatus.SUCCESS)
+ {
+ String str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ if (ReplicationDB.isaCounter(cn))
{
- str = decodeUTF8(key.getData());
- cn= new ChangeNumber(str);
- // There can't be 2 counter record next to each other
+ if (cursor.getPrev(key, data, LockMode.DEFAULT)
+ != OperationStatus.SUCCESS)
+ {
+ /*
+ * database only contain a counter record, should not be
+ * possible, but Ok, let's just say no change Number
+ */
+ cn = null;
+ }
+ else
+ {
+ str = decodeUTF8(key.getData());
+ cn = new ChangeNumber(str);
+ // There can't be 2 counter record next to each other
+ }
}
}
}
- // else, there was no change previous to our changeNumber.
}
- else
+ finally
{
- // We could not move the cursor past to the changeNumber
- // Check if the last change is older than changeNumber
- if (cursor.getLast(key, data, LockMode.DEFAULT) ==
- OperationStatus.SUCCESS)
- {
- String str = decodeUTF8(key.getData());
- cn = new ChangeNumber(str);
- if (ReplicationDB.isaCounter(cn))
- {
- if (cursor.getPrev(key, data,
- LockMode.DEFAULT) != OperationStatus.SUCCESS)
- {
- /*
- * database only contain a counter record, should not be
- * possible, but Ok, let's just say no change Number
- */
- cn = null;
- }
- else
- {
- str = decodeUTF8(key.getData());
- cn= new ChangeNumber(str);
- // There can't be 2 counter record next to each other
- }
- }
- }
+ closeLockedCursor(cursor);
}
}
catch (DatabaseException e)
{
- /* database is faulty */
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- // TODO: Verify if shutting down replication is the right thing to do
- replicationServer.shutdown();
+ replicationServer.handleUnexpectedDatabaseException(e);
cn = null;
}
- finally
- {
- closeLockedCursor(cursor);
- }
return cn;
}
@@ -669,14 +613,7 @@
catch (Exception e)
{
// Unlocking is required before throwing any exception
- try
- {
- closeLockedCursor(localCursor);
- }
- catch (Exception ignore)
- {
- // Ignore.
- }
+ closeLockedCursor(localCursor);
throw e;
}
}
@@ -703,14 +640,7 @@
}
catch (Exception e)
{
- try
- {
- closeLockedCursor(localCursor);
- }
- catch (Exception ignore)
- {
- // Ignore.
- }
+ closeLockedCursor(localCursor);
if (localTxn != null)
{
@@ -741,41 +671,19 @@
isClosed = true;
}
- boolean closeHasFailed = false;
-
- try
- {
- closeLockedCursor(cursor);
- }
- catch (DatabaseException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- closeHasFailed = true;
- }
-
+ closeLockedCursor(cursor);
if (txn != null)
{
try
{
- txn.commit();
+ // No need for durability when purging.
+ txn.commit(Durability.COMMIT_NO_SYNC);
}
catch (DatabaseException e)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- closeHasFailed = true;
+ replicationServer.handleUnexpectedDatabaseException(e);
}
}
-
- if (closeHasFailed)
- {
- replicationServer.shutdown();
- }
}
/**
@@ -796,26 +704,7 @@
isClosed = true;
}
- boolean closeHasFailed = false;
-
- try
- {
- closeLockedCursor(cursor);
- }
- catch (LockConflictException e)
- {
- // The DB documentation states that a DeadlockException
- // on the close method of a cursor that is aborting should
- // be ignored.
- }
- catch (DatabaseException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- closeHasFailed = true;
- }
+ closeLockedCursor(cursor);
if (txn != null)
{
@@ -825,18 +714,9 @@
}
catch (DatabaseException e)
{
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- closeHasFailed = true;
+ replicationServer.handleUnexpectedDatabaseException(e);
}
}
-
- if (closeHasFailed)
- {
- replicationServer.shutdown();
- }
}
/**
@@ -972,143 +852,140 @@
int distToCounterRecord1 = 0;
int distBackToCounterRecord2 = 0;
int count=0;
- Cursor cursor = null;
- Transaction txn = null;
OperationStatus status;
try
{
- ChangeNumber cn ;
-
- if ((start==null)&&(stop==null))
- return (int)db.count();
-
- // Step 1 : from the start point, traverse db to the next counter record
- // or to the stop point.
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry data = new DatabaseEntry();
- cursor = db.openCursor(txn, null);
- if (start != null)
+ Cursor cursor = null;
+ try
{
- key = new ReplicationKey(start);
- status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
- if (status == OperationStatus.NOTFOUND)
- status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
- }
- else
- {
- status = cursor.getNext(key, data, LockMode.DEFAULT);
- }
+ ChangeNumber cn ;
- while (status == OperationStatus.SUCCESS)
- {
- // test whether the record is a regular change or a counter
- String csnString = decodeUTF8(key.getData());
- cn = new ChangeNumber(csnString);
- if (cn.getServerId() != 0)
+ if ((start==null)&&(stop==null))
+ return (int)db.count();
+
+ // Step 1 : from the start point, traverse db to the next counter record
+ // or to the stop point.
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry data = new DatabaseEntry();
+ cursor = db.openCursor(null, null);
+ if (start != null)
{
- // reached a regular change record
- // test whether we reached the 'stop' target
- if (!cn.newer(stop))
- {
- // let's loop
- distToCounterRecord1++;
- status = cursor.getNext(key, data, LockMode.DEFAULT);
- }
- else
- {
- // reached the end
- break;
- }
+ key = new ReplicationKey(start);
+ status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+ if (status == OperationStatus.NOTFOUND)
+ status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
}
else
{
- // counter record
- counterRecord1 = decodeCounterValue(data.getData());
- break;
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
}
- }
- cursor.close();
- // cases
- //
- if (counterRecord1==0)
- return distToCounterRecord1;
+ while (status == OperationStatus.SUCCESS)
+ {
+ // test whether the record is a regular change or a counter
+ String csnString = decodeUTF8(key.getData());
+ cn = new ChangeNumber(csnString);
+ if (cn.getServerId() != 0)
+ {
+ // reached a regular change record
+ // test whether we reached the 'stop' target
+ if (!cn.newer(stop))
+ {
+ // let's loop
+ distToCounterRecord1++;
+ status = cursor.getNext(key, data, LockMode.DEFAULT);
+ }
+ else
+ {
+ // reached the end
+ break;
+ }
+ }
+ else
+ {
+ // counter record
+ counterRecord1 = decodeCounterValue(data.getData());
+ break;
+ }
+ }
+ cursor.close();
- // Step 2 : from the stop point, traverse db to the next counter record
- // or to the start point.
- txn = null;
- data = new DatabaseEntry();
- key = new ReplicationKey(stop);
- cursor = db.openCursor(txn, null);
- status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
- if (status == OperationStatus.SUCCESS)
- {
- cn = new ChangeNumber(decodeUTF8(key.getData()));
- }
- else
- {
- key = new DatabaseEntry();
+ // cases
+ //
+ if (counterRecord1==0)
+ return distToCounterRecord1;
+
+ // Step 2 : from the stop point, traverse db to the next counter record
+ // or to the start point.
data = new DatabaseEntry();
- status = cursor.getLast(key, data, LockMode.DEFAULT);
- if (status != OperationStatus.SUCCESS)
+ key = new ReplicationKey(stop);
+ cursor = db.openCursor(null, null);
+ status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+ if (status == OperationStatus.SUCCESS)
{
- /* database is empty */
- return 0;
+ cn = new ChangeNumber(decodeUTF8(key.getData()));
}
- }
- while (status == OperationStatus.SUCCESS)
- {
- cn = new ChangeNumber(decodeUTF8(key.getData()));
- if (!ReplicationDB.isaCounter(cn))
+ else
{
- // regular change record
- if (!cn.older(start))
+ key = new DatabaseEntry();
+ data = new DatabaseEntry();
+ status = cursor.getLast(key, data, LockMode.DEFAULT);
+ if (status != OperationStatus.SUCCESS)
{
- distBackToCounterRecord2++;
- status = cursor.getPrev(key, data, LockMode.DEFAULT);
+ /* database is empty */
+ return 0;
+ }
+ }
+ while (status == OperationStatus.SUCCESS)
+ {
+ cn = new ChangeNumber(decodeUTF8(key.getData()));
+ if (!ReplicationDB.isaCounter(cn))
+ {
+ // regular change record
+ if (!cn.older(start))
+ {
+ distBackToCounterRecord2++;
+ status = cursor.getPrev(key, data, LockMode.DEFAULT);
+ }
+ else
+ break;
}
else
+ {
+ // counter record
+ counterRecord2 = decodeCounterValue(data.getData());
break;
+ }
}
- else
+ cursor.close();
+
+ // Step 3 : Now consolidates the result
+ if (counterRecord1!=0)
{
- // counter record
- counterRecord2 = decodeCounterValue(data.getData());
- break;
+ if (counterRecord1 == counterRecord2)
+ {
+ // only one cp between from and to - no need to use it
+ count = distToCounterRecord1 + distBackToCounterRecord2;
+ }
+ else
+ {
+ // 2 cp between from and to
+ count = distToCounterRecord1 + (counterRecord2-counterRecord1)
+ + distBackToCounterRecord2;
+ }
}
}
- cursor.close();
-
- // Step 3 : Now consolidates the result
- if (counterRecord1!=0)
+ finally
{
- if (counterRecord1 == counterRecord2)
+ if (cursor != null)
{
- // only one cp between from and to - no need to use it
- count = distToCounterRecord1 + distBackToCounterRecord2;
- }
- else
- {
- // 2 cp between from and to
- count = distToCounterRecord1 + (counterRecord2-counterRecord1)
- + distBackToCounterRecord2;
+ cursor.close();
}
}
}
- finally
+ catch (DatabaseException e)
{
- if (cursor != null)
- cursor.close();
- if (txn != null)
- {
- try
- {
- txn.abort();
- } catch (DatabaseException e1)
- {
- // can't do much more. The ReplicationServer is shutting down.
- }
- }
+ replicationServer.handleUnexpectedDatabaseException(e);
}
return count;
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationData.java b/opends/src/server/org/opends/server/replication/server/ReplicationData.java
index 52022f9..867f052 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationData.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2010 ForgeRock AS.
+ * Portions Copyright 2010-2011 ForgeRock AS.
*/
package org.opends.server.replication.server;
@@ -47,16 +47,20 @@
* Creates a new ReplicationData object from an UpdateMsg.
*
* @param change the UpdateMsg used to create the ReplicationData.
- *
- * @throws UnsupportedEncodingException When the encoding of the message
- * failed because the UTF-8 encoding is not supported.
*/
public ReplicationData(UpdateMsg change)
- throws UnsupportedEncodingException
{
// Always keep messages in the replication DB with the current protocol
// version
- this.setData(change.getBytes());
+ try
+ {
+ this.setData(change.getBytes());
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // This should not happen - UTF-8 is always available.
+ throw new RuntimeException(e);
+ }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
index cb34f92..7c28c16 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -40,16 +40,8 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
+import com.sleepycat.je.*;
+
import java.util.concurrent.TimeUnit;
/**
@@ -92,10 +84,24 @@
*/
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
- envConfig.setConfigParam("je.cleaner.expunge", "true");
envConfig.setConfigParam("je.cleaner.threads", "2");
envConfig.setConfigParam("je.checkpointer.highPriority", "true");
+ // If the JVM is reasonably large then we can safely default to
+ // bigger read buffers. This will result in more scalable checkpointer
+ // and cleaner performance.
+ if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
+ {
+ envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", String
+ .valueOf(2 * 1024 * 1024));
+
+ envConfig.setConfigParam("je.log.iteratorReadSize", String
+ .valueOf(2 * 1024 * 1024));
+
+ envConfig.setConfigParam("je.log.faultReadSize", String
+ .valueOf(4 * 1024));
+ }
+
// Tests have shown that since the parsing of the Replication log is always
// done sequentially, it is not necessary to use a large DB cache.
// Use 5M so that the replication can be used with 64M total for the JVM.
@@ -103,9 +109,14 @@
// Since records are always added at the end of the Replication log and
// deleted at the beginning of the Replication log, this should never
- // cause any deadlock. It is therefore safe to increase the TXN timeout
- // to 10 seconds.
- envConfig.setTxnTimeout(10, TimeUnit.SECONDS);
+ // cause any deadlock.
+ envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
+ envConfig.setLockTimeout(0, TimeUnit.SECONDS);
+
+ // Since replication provides durability, we can reduce the DB durability
+ // level so that we are immune to application / JVM crashes.
+ envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
+
dbEnvironment = new Environment(new File(path), envConfig);
/*
@@ -120,7 +131,6 @@
stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig);
start();
-
}
/**
@@ -316,7 +326,7 @@
TRACER.debugInfo("getOrAddDb() Created in the state Db record " +
" serverId/Domain=<"+stringId+">");
stateDb.put(txn, key, data);
- txn.commitWriteNoSync();
+ txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
} catch (DatabaseException dbe)
{
// Abort the txn and propagate the Exception to the caller
@@ -347,7 +357,7 @@
"Created in the state Db record Tag/Domain/GenId key=" +
stringId + " value=" + dataStringId);
stateDb.put(txn, key, data);
- txn.commitWriteNoSync();
+ txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
} catch (DatabaseException dbe)
{
// Abort the txn and propagate the Exception to the caller
@@ -432,7 +442,7 @@
try
{
stateDb.delete(txn, key);
- txn.commitWriteNoSync();
+ txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
if (debugEnabled())
TRACER.debugInfo(
"In " + this.replicationServer.getMonitorInstanceName() +
@@ -495,7 +505,7 @@
try {
data.setData(byteId);
stateDb.delete(txn, key);
- txn.commitWriteNoSync();
+ txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
if (debugEnabled())
TRACER.debugInfo(
" In " + this.replicationServer.getMonitorInstanceName() +
@@ -532,7 +542,7 @@
{
txn = dbEnvironment.beginTransaction(null, null);
dbEnvironment.truncateDatabase(txn, databaseName, false);
- txn.commitWriteNoSync();
+ txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
txn = null;
}
catch (DatabaseException e)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index a21b51b..5c96d23 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -32,6 +32,7 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.ServerConstants.EOL;
import static org.opends.server.util.StaticUtils.getFileForPath;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.File;
import java.io.IOException;
@@ -992,7 +993,7 @@
* @return The time after which changes must be deleted from the
* persistent storage (in milliseconds).
*/
- long getTrimage()
+ long getTrimAge()
{
return purgeDelay * 1000;
}
@@ -2002,4 +2003,24 @@
}
}
+
+
+ /**
+ * Shuts down replication when an unexpected database exception occurs. Note
+ * that we do not expect lock timeouts or txn timeouts because the replication
+ * databases are deadlock free, thus all operations should complete
+ * eventually.
+ *
+ * @param e
+ * The unexpected database exception.
+ */
+ void handleUnexpectedDatabaseException(DatabaseException e)
+ {
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+ mb.append(stackTraceToSingleLineString(e));
+ logError(mb.toMessage());
+ shutdown();
+ }
+
}
--
Gitblit v1.10.0