opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,6 +51,7 @@ import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.DeadlockException; /** * This class is used for managing the replicationServer database for each @@ -99,6 +100,9 @@ final static int MSG_QUEUE_HIMARK = 5000; final static int MSG_QUEUE_LOWMARK = 4000; // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; /** * * The trim age in milliseconds. Changes record in the change DB that @@ -285,7 +289,10 @@ } } return new ReplicationIterator(serverId, db, changeNumber); ReplicationIterator it = new ReplicationIterator(serverId, db, changeNumber); return it; } /** @@ -397,46 +404,67 @@ return; int size = 0; boolean finished = false; boolean done = false; ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage, (short) 0, (short)0); /* the trim is done by group in order to save some CPU and IO bandwidth * start the transaction then do a bunch of remove then commit */ ReplServerDBCursor cursor; // In case of deadlock detection by the Database, this thread can // by aborted by a DeadlockException. This is a transient error and // the transaction should be attempted again. // We will try DEADLOCK_RETRIES times before failing. int tries = 0; while ((tries++ < DEADLOCK_RETRIES) && (!done)) { /* the trim is done by group in order to save some CPU and IO bandwidth * start the transaction then do a bunch of remove then commit */ ReplServerDBCursor cursor; cursor = db.openDeleteCursor(); cursor = db.openDeleteCursor(); try { while ((size < 5000 ) && (!finished)) try { ChangeNumber changeNumber = cursor.nextChangeNumber(); if (changeNumber != null) while ((size < 5000 ) && (!finished)) { if ((!changeNumber.equals(lastChange)) && (changeNumber.older(trimDate))) ChangeNumber changeNumber = cursor.nextChangeNumber(); if (changeNumber != null) { size++; cursor.delete(); if ((!changeNumber.equals(lastChange)) && (changeNumber.older(trimDate))) { size++; cursor.delete(); } else { firstChange = changeNumber; finished = true; } } else { firstChange = changeNumber; finished = true; } } else finished = true; cursor.close(); done = true; } cursor.close(); } catch (DatabaseException e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() shutdown = true; cursor.close(); throw (e); catch (DeadlockException e) { cursor.abort(); if (tries == DEADLOCK_RETRIES) { // could not handle the Deadlock after DEADLOCK_RETRIES tries. // shutdown the ReplicationServer. shutdown = true; throw (e); } } catch (DatabaseException e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() shutdown = true; cursor.abort(); throw (e); } } } opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -22,7 +22,7 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import org.opends.messages.MessageBuilder; @@ -42,6 +42,7 @@ import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Database; import com.sleepycat.je.DeadlockException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.Transaction; @@ -59,6 +60,9 @@ private Short serverId; private DN baseDn; // The maximum number of retries in case of DatabaseDeadlock Exception. private static final int DEADLOCK_RETRIES = 10; /** * Creates a new database or open existing database that will be used * to store and retrieve changes from an LDAP server. @@ -95,28 +99,49 @@ try { txn = dbenv.beginTransaction(); int tries = 0; boolean done = false; for (UpdateMessage change : changes) // The database can return a Deadlock Exception if several threads are // accessing the database at the same time. This Exception is a // transient state, when it happens the transaction is aborted and // the operation is attempted again up to DEADLOCK_RETRIES times. while ((tries++ < DEADLOCK_RETRIES) && (!done)) { DatabaseEntry key = new ReplicationKey(change.getChangeNumber()); DatabaseEntry data = new ReplicationData(change); try { db.put(txn, key, data); } catch (DatabaseException e) txn = dbenv.beginTransaction(); for (UpdateMessage change : changes) { DatabaseEntry key = new ReplicationKey(change.getChangeNumber()); DatabaseEntry data = new ReplicationData(change); db.put(txn, key, data); } txn.commitWriteNoSync(); txn = null; done = true; } catch (DeadlockException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); txn.abort(); txn = null; } } txn.commitWriteNoSync(); txn = null; if (!done) { // Could not write to the DB after DEADLOCK_RETRIES tries. // This ReplicationServer is not reliable and will be shutdown. MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); logError(mb.toMessage()); if (txn != null) { txn.abort(); } replicationServer.shutdown(); } } catch (DatabaseException e) { @@ -332,31 +357,40 @@ { cursor = db.openCursor(txn, null); if (startingChangeNumber != null) try { key = new ReplicationKey(startingChangeNumber); data = new DatabaseEntry(); if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) if (startingChangeNumber != null) { if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) != key = new ReplicationKey(startingChangeNumber); data = new DatabaseEntry(); if (cursor.getSearchKey(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { throw new Exception("ChangeNumber not available"); } else { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { cursor = db.openCursor(txn, null); } } else { DatabaseEntry key = new DatabaseEntry(); DatabaseEntry data = new DatabaseEntry(); if (cursor.getPrev(key, data, LockMode.DEFAULT) != OperationStatus.SUCCESS) { cursor.close(); cursor = db.openCursor(txn, null); } } } } } catch (Exception e) { cursor.close(); throw (e); } } private ReplServerDBCursor() throws DatabaseException @@ -370,13 +404,15 @@ */ public void close() { if (cursor == null) return; try { cursor.close(); cursor = null; } catch (DatabaseException e) if (cursor != null) { cursor.close(); cursor = null; } } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); @@ -401,6 +437,52 @@ } /** * Abort the Cursor after a Deadlock Exception. * This method catch and ignore the DeadlockException because * this must be done when aborting a cursor after a DeadlockException * (per the Cursor documentation). * This should not be used in any other case. */ public void abort() { if (cursor == null) return; try { cursor.close(); cursor = null; } catch (DeadlockException e1) { // The DB documentation states that a DeadlockException // on the close method of a cursor that is aborting should // be ignored. } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } if (txn != null) { try { txn.abort(); } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); replicationServer.shutdown(); } } } /** * Get the next ChangeNumber in the database from this Cursor. * * @return The next ChangeNumber in the database from this cursor. opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -22,16 +22,16 @@ * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. * Portions Copyright 2006-2008 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import com.sleepycat.je.DatabaseException; import org.opends.server.replication.common.ChangeNumber; import org.opends.server.replication.protocol.UpdateMessage; import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor; import com.sleepycat.je.DatabaseException; /** * This class allows to iterate through the changes received from a given * LDAP Server Identifier. @@ -43,6 +43,9 @@ /** * Creates a new ReplicationIterator. * All created iterator must be released by the caller using the * releaseCursor() method. * * @param id the Identifier of the server on which the iterator applies. * @param db The db where the iterator must be created. * @param changeNumber The ChangeNumber after which the iterator must start. @@ -56,7 +59,9 @@ { cursor = db.openReadCursor(changeNumber); if (cursor == null) { throw new Exception("no new change"); } if (this.next() == false) { cursor.close(); opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -572,6 +572,9 @@ /** * Creates and returns an iterator. * When the iterator is not used anymore, the caller MUST call the * ReplicationIterator.releaseCursor() method to free the ressources * and locks used by the ReplicationIterator. * * @param serverId Identifier of the server for which the iterator is created. * @param changeNumber Starting point for the iterator. opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -857,16 +857,6 @@ if (olderUpdateCN == null) return null; ReplicationIterator ri = replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN); if (ri != null) { if (ri.next()) { ChangeNumber firstMissingChange = ri.getChange().getChangeNumber(); return firstMissingChange.getTime(); } } return olderUpdateCN.getTime(); } @@ -1081,9 +1071,16 @@ ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId); ReplicationIterator iterator = replicationServerDomain.getChangelogIterator(serverId, lastCsn); if ((iterator != null) && (iterator.getChange() != null)) if (iterator != null) { iteratorSortedSet.add(iterator); if (iterator.getChange() != null) { iteratorSortedSet.add(iterator); } else { iterator.releaseCursor(); } } } while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))