opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,7 +51,6 @@ import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.LockConflictException; /** * This class is used for managing the replicationServer database for each @@ -111,9 +110,6 @@ private final Object flushLock = new Object(); private ReplicationServer replicationServer; // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; private long latestTrimDate = 0; /** @@ -122,7 +118,7 @@ * are older than this age are removed. * */ private long trimage; private long trimAge; /** * Creates a new dbHandler associated to a given LDAP server. @@ -143,7 +139,7 @@ this.replicationServer = replicationServer; serverId = id; this.baseDn = baseDn; trimage = replicationServer.getTrimage(); trimAge = replicationServer.getTrimAge(); queueMaxSize = queueSize; queueLowmark = queueSize * 1 / 5; queueHimark = queueSize * 4 / 5; @@ -291,43 +287,6 @@ } /** * Return the number of changes between 2 provided change numbers. * @param from The lower (older) change number. * @param to The upper (newer) change number. * @return The computed number of changes. */ public int traverseAndCount(ChangeNumber from, ChangeNumber to) { int count = 0; flush(); ReplServerDBCursor cursor = null; try { try { cursor = db.openReadCursor(from); } catch(Exception e) { return 0; } ChangeNumber curr = null; while ((curr = cursor.nextChangeNumber())!=null) { if (curr.newerOrEquals(to)) break; count++; } } finally { if (cursor != null) cursor.abort(); } return count; } /** * Removes the provided number of messages from the beginning of the msgQueue. * * @param number the number of changes to be removed. @@ -456,80 +415,67 @@ */ private void trim() throws DatabaseException, Exception { if (trimage == 0) if (trimAge == 0) { return; int size = 0; boolean finished = false; boolean done = false; } latestTrimDate = TimeThread.getTime() - trimage; latestTrimDate = TimeThread.getTime() - trimAge; ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0); // Find the last changeNumber before the trimDate, in the Database. ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate); ChangeNumber lastBeforeTrimDate = db .getPreviousChangeNumber(trimDate); if (lastBeforeTrimDate != null) { // If we found it, we want to stop trimming when reaching it. trimDate = lastBeforeTrimDate; } // In case of deadlock detection by the Database, this thread can // by aborted by a DeadlockException. This is a transient error and // the transaction should be attempted again. // We will try DEADLOCK_RETRIES times before failing. int tries = 0; while ((tries++ < DEADLOCK_RETRIES) && (!done)) for (int i = 0; i < 100; i++) { synchronized (flushLock) { /* the trim is done by group in order to save some CPU and IO bandwidth /* * the trim is done by group in order to save some CPU and IO bandwidth * start the transaction then do a bunch of remove then commit */ ReplServerDBCursor cursor = db.openDeleteCursor(); final ReplServerDBCursor cursor = db.openDeleteCursor(); try { while ((size < 5000 ) && (!finished)) for (int j = 0; j < 50; j++) { ChangeNumber changeNumber = cursor.nextChangeNumber(); if (changeNumber != null) if (changeNumber == null) { cursor.close(); done = true; return; } if ((!changeNumber.equals(lastChange)) && (changeNumber.older(trimDate))) { size++; cursor.delete(); } else { firstChange = changeNumber; finished = true; } } else finished = true; } cursor.close(); done = true; return; } catch (LockConflictException e) { cursor.abort(); if (tries == DEADLOCK_RETRIES) { // could not handle the Deadlock after DEADLOCK_RETRIES tries. // shutdown the ReplicationServer. shutdown = true; throw (e); } cursor.close(); } catch (Exception e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() shutdown = true; cursor.abort(); throw (e); shutdown = true; throw e; } } } @@ -644,7 +590,7 @@ */ public void setPurgeDelay(long delay) { trimage = delay; trimAge = delay; } /** opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -32,7 +32,6 @@ import static org.opends.server.util.StaticUtils.decodeUTF8; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.UnsupportedEncodingException; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.opends.messages.MessageBuilder; @@ -40,14 +39,7 @@ import org.opends.server.replication.common.ChangeNumber; import org.opends.server.types.DebugLogLevel; import com.sleepycat.je.Cursor; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.Transaction; import com.sleepycat.je.*; /** * This class implements the interface between the underlying database @@ -61,9 +53,6 @@ private ReplicationDbEnv dbenv = null; private ReplicationServer replicationServer; // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; // The lock used to provide exclusive access to the thread that // close the db (shutdown or clear). private ReentrantReadWriteLock dbCloseLock; @@ -103,71 +92,41 @@ public void addEntry(int draftCN, String value, String domainBaseDN, ChangeNumber changeNumber) { Transaction txn = null; try { int tries = 0; boolean done = false; DatabaseEntry key = new ReplicationDraftCNKey(draftCN); DatabaseEntry data = new DraftCNData(draftCN, value, domainBaseDN, changeNumber); // The database can return a Deadlock Exception if several threads are // accessing the database at the same time. This Exception is a // transient state, when it happens the transaction is aborted and // the operation is attempted again up to DEADLOCK_RETRIES times. while ((tries++ < DEADLOCK_RETRIES) && (!done)) { // Use a transaction so that we can override durability. Transaction txn = null; dbCloseLock.readLock().lock(); try { txn = dbenv.beginTransaction(); DatabaseEntry key = new ReplicationDraftCNKey(draftCN); DatabaseEntry data = new DraftCNData(draftCN, value, domainBaseDN, changeNumber); db.put(txn, key, data); txn.commitWriteNoSync(); txn = null; done = true; } catch (LockConflictException e) { // Try again. txn.commit(Durability.COMMIT_WRITE_NO_SYNC); } finally { if (txn != null) { // No effect if txn has committed. try { txn.abort(); txn = null; } catch (Exception e) { // Ignored. } } dbCloseLock.readLock().unlock(); } } if (!done) { // Could not write to the DB after DEADLOCK_RETRIES tries. // This ReplicationServer is not reliable and will be shutdown. MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); logError(mb.toMessage()); replicationServer.shutdown(); } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } catch (UnsupportedEncodingException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); replicationServer.handleUnexpectedDatabaseException(e); } } @@ -229,13 +188,21 @@ } private void closeLockedCursor(Cursor cursor) throws DatabaseException { try { if (cursor != null) { try { cursor.close(); } catch (DatabaseException e) { // Ignore. } } } finally { dbCloseLock.readLock().unlock(); @@ -248,12 +215,10 @@ */ public int readFirstDraftCN() { Cursor cursor = null; String str = null; try { dbCloseLock.readLock().lock(); Cursor cursor = null; try { cursor = db.openCursor(null, null); @@ -265,9 +230,8 @@ /* database is empty */ return 0; } str = decodeUTF8(key.getData()); int sn = new Integer(str); return sn; return new Integer(decodeUTF8(key.getData())); } finally { @@ -277,20 +241,7 @@ catch (DatabaseException e) { /* database is faulty */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); return 0; } catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); replicationServer.handleUnexpectedDatabaseException(e); return 0; } } @@ -318,12 +269,10 @@ */ public int readLastDraftCN() { Cursor cursor = null; String str = null; try { dbCloseLock.readLock().lock(); Cursor cursor = null; try { cursor = db.openCursor(null, null); @@ -335,9 +284,8 @@ /* database is empty */ return 0; } str = decodeUTF8(key.getData()); int sn = new Integer(str); return sn; return new Integer(decodeUTF8(key.getData())); } finally { @@ -346,16 +294,7 @@ } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); return 0; } catch (Exception e) { replicationServer.shutdown(); replicationServer.handleUnexpectedDatabaseException(e); return 0; } } @@ -519,20 +458,7 @@ isClosed = true; } boolean closeHasFailed = false; try { closeLockedCursor(cursor); } catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); closeHasFailed = true; } if (txn != null) { @@ -540,20 +466,11 @@ { txn.commit(); } catch (Exception e) catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); closeHasFailed = true; replicationServer.handleUnexpectedDatabaseException(e); } } if (closeHasFailed) { replicationServer.shutdown(); } } /** @@ -574,26 +491,7 @@ isClosed = true; } boolean closeHasFailed = false; try { closeLockedCursor(cursor); } catch (LockConflictException e) { // The DB documentation states that a DeadlockException // on the close method of a cursor that is aborting should // be ignored. } catch (Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); closeHasFailed = true; } if (txn != null) { @@ -601,20 +499,11 @@ { txn.abort(); } catch (Exception e) catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); closeHasFailed = true; replicationServer.handleUnexpectedDatabaseException(e); } } if (closeHasFailed) { replicationServer.shutdown(); } } /** opends/src/server/org/opends/server/replication/server/DraftCNData.java
@@ -23,10 +23,12 @@ * * * Copyright 2009 Sun Microsystems, Inc. * Portions Copyright 2010 ForgeRock AS. * Portions Copyright 2010-2011 ForgeRock AS. */ package org.opends.server.replication.server; import static org.opends.server.util.StaticUtils.getBytes; import java.io.UnsupportedEncodingException; import org.opends.messages.Message; @@ -53,29 +55,14 @@ * @param value The value (cookie). * @param serviceID The serviceID (domain DN). * @param changeNumber The replication change number. * * @throws UnsupportedEncodingException When the encoding of the message * failed because the UTF-8 encoding is not supported. */ public DraftCNData(int draftCN, String value, String serviceID, ChangeNumber changeNumber) throws UnsupportedEncodingException { String record = value + FIELD_SEPARATOR + serviceID + FIELD_SEPARATOR + changeNumber; byte[] byteValue; try { byteValue = record.getBytes("UTF-8"); this.setData(byteValue); } catch (UnsupportedEncodingException e) { // can't happen return; } setData(getBytes(record)); } /** opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -50,7 +50,6 @@ import org.opends.server.types.InitializationException; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.LockConflictException; /** * This class is used for managing the replicationServer database for each @@ -84,16 +83,13 @@ private DirectoryThread thread = null; private ReplicationServer replicationServer; // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; /** * * The trim age in milliseconds. Changes record in the change DB that * are older than this age are removed. * */ private long trimage; private long trimAge; /** * Creates a new dbHandler associated to a given LDAP server. @@ -108,7 +104,7 @@ throws DatabaseException { this.replicationServer = replicationServer; this.trimage = replicationServer.getTrimage(); this.trimAge = replicationServer.getTrimAge(); // DB initialization db = new DraftCNDB(replicationServer, dbenv); @@ -312,7 +308,7 @@ */ public void trim() throws DatabaseException, Exception { if (trimage == 0) if (trimAge == 0) return; clear(null); @@ -333,103 +329,83 @@ throws DatabaseException, Exception { if (this.count()==0) { return; } int size = 0; int tries = 0; boolean finished = false; boolean done = false; ChangeNumber crossDomainEligibleCN = replicationServer .getEligibleCN(); ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN(); // In case of deadlock detection by the Database, this thread can // by aborted by a DeadlockException. This is a transient error and // the transaction should be attempted again. // We will try DEADLOCK_RETRIES times before failing. while ((tries++ < DEADLOCK_RETRIES) && (!done)) for (int i = 0; i < 100; i++) { DraftCNDBCursor cursor = db.openDeleteCursor(); try { while ((size < 5000 ) && (!finished)) for (int j = 0; j < 50; j++) { // let's traverse the DraftCNDb if (!cursor.next()) { finished=true; cursor.close(); return; } else { ChangeNumber cn = cursor.currentChangeNumber(); // From the draftCNDb change record, get the domain and changeNumber String serviceID = cursor.currentServiceID(); if ((serviceIDToClear!=null) && (serviceIDToClear.equalsIgnoreCase(serviceID))) if ((serviceIDToClear != null) && (serviceIDToClear.equalsIgnoreCase(serviceID))) { size++; cursor.delete(); continue; } ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(serviceID, false); ReplicationServerDomain domain = replicationServer .getReplicationServerDomain(serviceID, false); if (domain==null) { // the domain has been removed since the record was written in the // draftCNDb, thus it makes no sense to keep the record in the // draftCNDb. size++; cursor.delete(); continue; } else { ServerState startState = domain.getStartState(); // We don't use the returned endState but it's updating CN as // reading domain.getEligibleState(crossDomainEligibleCN); ChangeNumber fcn = startState.getMaxChangeNumber( cn.getServerId()); ChangeNumber fcn = startState.getMaxChangeNumber(cn .getServerId()); int currentKey = cursor.currentKey(); // Do not delete the lastKey. This should allow us to // preserve last change number over time. if ((currentKey != lastkey) && (cn.older(fcn))) { size++; cursor.delete(); continue; } else { firstkey = currentKey; finished = true; } } } } cursor.close(); done = true; return; } catch (LockConflictException e) { cursor.abort(); if (tries == DEADLOCK_RETRIES) { // could not handle the Deadlock after DEADLOCK_RETRIES tries. // shutdown the ReplicationServer. shutdown = true; throw e; } cursor.close(); } catch (Exception e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() shutdown = true; cursor.abort(); shutdown = true; throw e; } } @@ -492,7 +468,7 @@ */ public void setPurgeDelay(long delay) { trimage = delay; trimAge = delay; } /** opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -35,21 +35,13 @@ import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.util.List; import java.io.UnsupportedEncodingException; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.UpdateMsg; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.sleepycat.je.Cursor; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Database; import com.sleepycat.je.LockConflictException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.Transaction; import com.sleepycat.je.*; /** * This class implements the interface between the underlying database @@ -64,9 +56,6 @@ private int serverId; private String baseDn; // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; // The lock used to provide exclusive access to the thread that // close the db (shutdown or clear). private ReentrantReadWriteLock dbCloseLock; @@ -180,34 +169,22 @@ */ public void addEntries(List<UpdateMsg> changes) { Transaction txn = null; try { int tries = 0; boolean done = false; // The database can return a Deadlock Exception if several threads are // accessing the database at the same time. This Exception is a // transient state, when it happens the transaction is aborted and // the operation is attempted again up to DEADLOCK_RETRIES times. while ((tries++ < DEADLOCK_RETRIES) && (!done)) { dbCloseLock.readLock().lock(); try { txn = dbenv.beginTransaction(); for (UpdateMsg change : changes) { DatabaseEntry key = new ReplicationKey(change.getChangeNumber()); DatabaseEntry key = new ReplicationKey( change.getChangeNumber()); DatabaseEntry data = new ReplicationData(change); if ((counterCurrValue!=0) && (counterCurrValue%counterWindowSize == 0)) if ((counterCurrValue != 0) && (counterCurrValue % counterWindowSize == 0)) { // enough changes to generate a counter record - wait for the next // change fo time // change of time counterTsLimit = change.getChangeNumber().getTime(); } if ((counterTsLimit!=0) @@ -215,63 +192,25 @@ { // Write the counter record DatabaseEntry counterKey = new ReplicationKey( new ChangeNumber( change.getChangeNumber().getTime(), new ChangeNumber(change.getChangeNumber().getTime(), 0, 0)); DatabaseEntry counterValue = encodeCounterValue(counterCurrValue-1); db.put(txn, counterKey, counterValue); db.put(null, counterKey, counterValue); counterTsLimit=0; } db.put(txn, key, data); db.put(null, key, data); counterCurrValue++; } txn.commitWriteNoSync(); txn = null; done = true; } catch (LockConflictException e) { // Try again. } finally { if (txn != null) { // No effect if txn has committed. txn.abort(); txn = null; } dbCloseLock.readLock().unlock(); } } if (!done) { // Could not write to the DB after DEADLOCK_RETRIES tries. // This ReplicationServer is not reliable and will be shutdown. MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); logError(mb.toMessage()); replicationServer.shutdown(); } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } catch (UnsupportedEncodingException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); replicationServer.handleUnexpectedDatabaseException(e); } } @@ -334,14 +273,25 @@ return new ReplServerDBCursor(); } private void closeLockedCursor(Cursor cursor) throws DatabaseException { try { if (cursor != null) { try { cursor.close(); } catch (DatabaseException e) { // Ignore. } } } finally { dbCloseLock.readLock().unlock(); @@ -358,6 +308,8 @@ String str = null; ChangeNumber cn = null; try { dbCloseLock.readLock().lock(); try { @@ -366,7 +318,8 @@ cursor = db.openCursor(null, null); OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT); if (status != OperationStatus.SUCCESS) { @@ -391,20 +344,17 @@ } } } catch (DatabaseException e) { /* database is faulty */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); cn = null; } finally { closeLockedCursor(cursor); } } catch (DatabaseException e) { /* database is faulty */ replicationServer.handleUnexpectedDatabaseException(e); cn = null; } return cn; } @@ -420,6 +370,8 @@ Cursor cursor = null; ChangeNumber cn = null; try { dbCloseLock.readLock().lock(); try { @@ -441,12 +393,12 @@ cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { /* * database only contain a counter record - don't know how much it can * be possible but ... * database only contain a counter record - don't know how much it * can be possible but ... */ cn = null; } @@ -458,19 +410,16 @@ } } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); cn = null; } finally { closeLockedCursor(cursor); } } catch (DatabaseException e) { replicationServer.handleUnexpectedDatabaseException(e); cn = null; } return cn; } @@ -496,26 +445,26 @@ DatabaseEntry key = new ReplicationKey(changeNumber); DatabaseEntry data = new DatabaseEntry(); Transaction txn = null; try { dbCloseLock.readLock().lock(); try { cursor = db.openCursor(txn, null); if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) cursor = db.openCursor(null, null); if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) { // We can move close to the changeNumber. // Let's move to the previous change. if (cursor.getPrev(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) if (cursor.getPrev(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) { String str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { // database starts with a counter record. cn = null; @@ -534,15 +483,15 @@ { // We could not move the cursor past to the changeNumber // Check if the last change is older than changeNumber if (cursor.getLast(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) if (cursor.getLast(key, data, LockMode.DEFAULT) == OperationStatus.SUCCESS) { String str = decodeUTF8(key.getData()); cn = new ChangeNumber(str); if (ReplicationDB.isaCounter(cn)) { if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { /* * database only contain a counter record, should not be @@ -560,21 +509,16 @@ } } } catch (DatabaseException e) { /* database is faulty */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); // TODO: Verify if shutting down replication is the right thing to do replicationServer.shutdown(); cn = null; } finally { closeLockedCursor(cursor); } } catch (DatabaseException e) { replicationServer.handleUnexpectedDatabaseException(e); cn = null; } return cn; } @@ -669,14 +613,7 @@ catch (Exception e) { // Unlocking is required before throwing any exception try { closeLockedCursor(localCursor); } catch (Exception ignore) { // Ignore. } throw e; } } @@ -703,14 +640,7 @@ } catch (Exception e) { try { closeLockedCursor(localCursor); } catch (Exception ignore) { // Ignore. } if (localTxn != null) { @@ -741,41 +671,19 @@ isClosed = true; } boolean closeHasFailed = false; try { closeLockedCursor(cursor); } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); closeHasFailed = true; } if (txn != null) { try { txn.commit(); // No need for durability when purging. txn.commit(Durability.COMMIT_NO_SYNC); } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); closeHasFailed = true; replicationServer.handleUnexpectedDatabaseException(e); } } if (closeHasFailed) { replicationServer.shutdown(); } } /** @@ -796,26 +704,7 @@ isClosed = true; } boolean closeHasFailed = false; try { closeLockedCursor(cursor); } catch (LockConflictException e) { // The DB documentation states that a DeadlockException // on the close method of a cursor that is aborting should // be ignored. } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); closeHasFailed = true; } if (txn != null) { @@ -825,18 +714,9 @@ } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); closeHasFailed = true; replicationServer.handleUnexpectedDatabaseException(e); } } if (closeHasFailed) { replicationServer.shutdown(); } } /** @@ -972,11 +852,12 @@ int distToCounterRecord1 = 0; int distBackToCounterRecord2 = 0; int count=0; Cursor cursor = null; Transaction txn = null; OperationStatus status; try { Cursor cursor = null; try { ChangeNumber cn ; if ((start==null)&&(stop==null)) @@ -986,7 +867,7 @@ // or to the stop point. DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); cursor = db.openCursor(txn, null); cursor = db.openCursor(null, null); if (start != null) { key = new ReplicationKey(start); @@ -1036,10 +917,9 @@ // Step 2 : from the stop point, traverse db to the next counter record // or to the start point. txn = null; data = new DatabaseEntry(); key = new ReplicationKey(stop); cursor = db.openCursor(txn, null); cursor = db.openCursor(null, null); status = cursor.getSearchKey(key, data, LockMode.DEFAULT); if (status == OperationStatus.SUCCESS) { @@ -1098,18 +978,15 @@ finally { if (cursor != null) { cursor.close(); if (txn != null) { try { txn.abort(); } catch (DatabaseException e1) { // can't do much more. The ReplicationServer is shutting down. } } } catch (DatabaseException e) { replicationServer.handleUnexpectedDatabaseException(e); } return count; } opends/src/server/org/opends/server/replication/server/ReplicationData.java
@@ -23,7 +23,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2010 ForgeRock AS. * Portions Copyright 2010-2011 ForgeRock AS. */ package org.opends.server.replication.server; @@ -47,17 +47,21 @@ * Creates a new ReplicationData object from an UpdateMsg. * * @param change the UpdateMsg used to create the ReplicationData. * * @throws UnsupportedEncodingException When the encoding of the message * failed because the UTF-8 encoding is not supported. */ public ReplicationData(UpdateMsg change) throws UnsupportedEncodingException { // Always keep messages in the replication DB with the current protocol // version try { this.setData(change.getBytes()); } catch (UnsupportedEncodingException e) { // This should not happen - UTF-8 is always available. throw new RuntimeException(e); } } /** * Generate an UpdateMsg from its byte[] form. opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
@@ -40,16 +40,8 @@ import java.io.File; import java.io.UnsupportedEncodingException; import com.sleepycat.je.Cursor; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.Transaction; import com.sleepycat.je.*; import java.util.concurrent.TimeUnit; /** @@ -92,10 +84,24 @@ */ envConfig.setAllowCreate(true); envConfig.setTransactional(true); envConfig.setConfigParam("je.cleaner.expunge", "true"); envConfig.setConfigParam("je.cleaner.threads", "2"); envConfig.setConfigParam("je.checkpointer.highPriority", "true"); // If the JVM is reasonably large then we can safely default to // bigger read buffers. This will result in more scalable checkpointer // and cleaner performance. if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024) { envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", String .valueOf(2 * 1024 * 1024)); envConfig.setConfigParam("je.log.iteratorReadSize", String .valueOf(2 * 1024 * 1024)); envConfig.setConfigParam("je.log.faultReadSize", String .valueOf(4 * 1024)); } // Tests have shown that since the parsing of the Replication log is always // done sequentially, it is not necessary to use a large DB cache. // Use 5M so that the replication can be used with 64M total for the JVM. @@ -103,9 +109,14 @@ // Since records are always added at the end of the Replication log and // deleted at the beginning of the Replication log, this should never // cause any deadlock. It is therefore safe to increase the TXN timeout // to 10 seconds. envConfig.setTxnTimeout(10, TimeUnit.SECONDS); // cause any deadlock. envConfig.setTxnTimeout(0, TimeUnit.SECONDS); envConfig.setLockTimeout(0, TimeUnit.SECONDS); // Since replication provides durability, we can reduce the DB durability // level so that we are immune to application / JVM crashes. envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC); dbEnvironment = new Environment(new File(path), envConfig); /* @@ -120,7 +131,6 @@ stateDb = dbEnvironment.openDatabase(null, "changelogstate", dbConfig); start(); } /** @@ -316,7 +326,7 @@ TRACER.debugInfo("getOrAddDb() Created in the state Db record " + " serverId/Domain=<"+stringId+">"); stateDb.put(txn, key, data); txn.commitWriteNoSync(); txn.commit(Durability.COMMIT_WRITE_NO_SYNC); } catch (DatabaseException dbe) { // Abort the txn and propagate the Exception to the caller @@ -347,7 +357,7 @@ "Created in the state Db record Tag/Domain/GenId key=" + stringId + " value=" + dataStringId); stateDb.put(txn, key, data); txn.commitWriteNoSync(); txn.commit(Durability.COMMIT_WRITE_NO_SYNC); } catch (DatabaseException dbe) { // Abort the txn and propagate the Exception to the caller @@ -432,7 +442,7 @@ try { stateDb.delete(txn, key); txn.commitWriteNoSync(); txn.commit(Durability.COMMIT_WRITE_NO_SYNC); if (debugEnabled()) TRACER.debugInfo( "In " + this.replicationServer.getMonitorInstanceName() + @@ -495,7 +505,7 @@ try { data.setData(byteId); stateDb.delete(txn, key); txn.commitWriteNoSync(); txn.commit(Durability.COMMIT_WRITE_NO_SYNC); if (debugEnabled()) TRACER.debugInfo( " In " + this.replicationServer.getMonitorInstanceName() + @@ -532,7 +542,7 @@ { txn = dbEnvironment.beginTransaction(null, null); dbEnvironment.truncateDatabase(txn, databaseName, false); txn.commitWriteNoSync(); txn.commit(Durability.COMMIT_WRITE_NO_SYNC); txn = null; } catch (DatabaseException e) opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -32,6 +32,7 @@ import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.ServerConstants.EOL; import static org.opends.server.util.StaticUtils.getFileForPath; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.File; import java.io.IOException; @@ -992,7 +993,7 @@ * @return The time after which changes must be deleted from the * persistent storage (in milliseconds). */ long getTrimage() long getTrimAge() { return purgeDelay * 1000; } @@ -2002,4 +2003,24 @@ } } /** * Shuts down replication when an unexpected database exception occurs. Note * that we do not expect lock timeouts or txn timeouts because the replication * databases are deadlock free, thus all operations should complete * eventually. * * @param e * The unexpected database exception. */ void handleUnexpectedDatabaseException(DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); shutdown(); } }