opends/src/messages/messages/replication.properties
@@ -77,13 +77,13 @@ %s : %s SEVERE_ERR_EXCEPTION_DECODING_OPERATION_25=Error trying to replay %s, \ operation could not be decoded : FATAL_ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR_26=Error Trying to use the \ underlying database. The Replication Server is going to shut down FATAL_ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR_26=Error trying to use the \ underlying database. The Replication Server is going to shut down: %s SEVERE_ERR_ERROR_CLOSING_CHANGELOG_ENV_28=Error closing the Replication Server \ database : %s SEVERE_ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH_29=Error during the Replication \ Server database trimming or flush process. The Changelog service is going to \ shutdown shutdown: %s SEVERE_ERR_WRITER_UNEXPECTED_EXCEPTION_32=An unexpected error happened \ handling connection with %s. This connection is going to be closed SEVERE_ERR_RS_ERROR_SENDING_ACK_33=In replication server %s: an unexpected error \ opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -429,11 +429,8 @@ * job properly anymore and needs to close all its connections and * shutdown itself. */ MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(" "); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); logError(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR .get(stackTraceToSingleLineString(e))); localReplicationServer.shutdown(); return false; } opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -67,7 +67,7 @@ * Creates a new database or open existing database that will be used * to store and retrieve changes from an LDAP server. * @param dbenv The Db environment to use to create the db. * @throws ChangelogException If a database problem happened. * @throws ChangelogException If a database problem happened */ public DraftCNDB(ReplicationDbEnv dbenv) throws ChangelogException { @@ -82,8 +82,11 @@ * * @param record * the provided {@link ChangeNumberIndexRecord} to be stored. * @throws ChangelogException * If a database problem happened */ public void addRecord(ChangeNumberIndexRecord record) throws ChangelogException { try { @@ -110,40 +113,13 @@ } finally { abort(txn); JEUtils.abort(txn); dbCloseLock.readLock().unlock(); } } catch (DatabaseException e) { dbenv.shutdownOnException(e); } catch (ChangelogException e) { dbenv.shutdownOnException(e); } } /** * Aborts the current transaction. It has no effect if the transaction has * committed. * * @param txn * the transaction to abort */ private static void abort(Transaction txn) { if (txn != null) { try { txn.abort(); } catch (DatabaseException ignored) { // Ignore. TRACER.debugCaught(DebugLogLevel.ERROR, ignored); } throw new ChangelogException(e); } } @@ -173,9 +149,9 @@ * Create a cursor that can be used to search or iterate on this DB. * * @param changeNumber The change number from which the cursor must start. * @return The ReplServerDBCursor * @throws ChangelogException If a database error prevented the cursor * creation. * @return The ReplServerDBCursor. */ public DraftCNDBCursor openReadCursor(long changeNumber) throws ChangelogException @@ -187,9 +163,9 @@ * Create a cursor that can be used to delete some record from this * ReplicationServer database. * * @return The ReplServerDBCursor * @throws ChangelogException If a database error prevented the cursor * creation. * @return The ReplServerDBCursor. */ public DraftCNDBCursor openDeleteCursor() throws ChangelogException { @@ -246,8 +222,7 @@ } catch (DatabaseException e) { dbenv.shutdownOnException(e); return null; throw new ChangelogException(e); } } @@ -323,8 +298,7 @@ } catch (DatabaseException e) { dbenv.shutdownOnException(e); return null; throw new ChangelogException(e); } } @@ -360,7 +334,7 @@ * @param startChangeNumber * the change number from which the cursor must start. * @throws ChangelogException * when the startChangeNumber does not exist. * If a database problem happened */ private DraftCNDBCursor(long startChangeNumber) throws ChangelogException { @@ -469,13 +443,13 @@ catch (DatabaseException e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); DraftCNDB.abort(localTxn); JEUtils.abort(localTxn); throw new ChangelogException(e); } catch (ChangelogException e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); DraftCNDB.abort(localTxn); JEUtils.abort(localTxn); throw e; } finally @@ -521,9 +495,9 @@ } /** * Abort the Cursor after a Deadlock Exception. * This method catch and ignore the DeadlockException because * this must be done when aborting a cursor after a DeadlockException * Abort the Cursor after a DatabaseException. * This method catch and ignore the DatabaseException because * this must be done when aborting a cursor after a DatabaseException * (per the Cursor documentation). * This should not be used in any other case. */ @@ -539,18 +513,7 @@ } closeLockedCursor(cursor); if (txn != null) { try { txn.abort(); } catch (DatabaseException e) { dbenv.shutdownOnException(e); } } JEUtils.abort(txn); } /** @@ -570,8 +533,10 @@ /** * Go to the next record on the cursor. * * @return the next record on this cursor. * @throws ChangelogException a. * @throws ChangelogException * If a database problem happened */ public boolean next() throws ChangelogException { @@ -600,7 +565,8 @@ /** * Delete the record at the current cursor position. * * @throws ChangelogException In case of database problem. * @throws ChangelogException * If a database problem happened */ public void delete() throws ChangelogException { @@ -630,7 +596,8 @@ /** * Clears this change DB from the changes it contains. * * @throws ChangelogException Throws a DatabaseException when it occurs. * @throws ChangelogException * If a database problem happened */ public void clear() throws ChangelogException { opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.opends.messages.MessageBuilder; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.DirectoryThread; import org.opends.server.api.MonitorProvider; @@ -283,15 +282,15 @@ Thread.currentThread().interrupt(); } } } catch (Exception end) } catch (Exception end) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get()); mb.append(" "); mb.append(stackTraceToSingleLineString(end)); logError(mb.toMessage()); logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH .get(stackTraceToSingleLineString(end))); if (replicationServer != null) { replicationServer.shutdown(); } break; } try { @@ -309,13 +308,12 @@ } } catch (Exception end) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get()); mb.append(" "); mb.append(stackTraceToSingleLineString(end)); logError(mb.toMessage()); logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH .get(stackTraceToSingleLineString(end))); if (replicationServer != null) { replicationServer.shutdown(); } break; } } @@ -454,18 +452,12 @@ } catch (ChangelogException e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() cursor.abort(); shutdown.set(true); throw e; } catch (Exception e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() cursor.abort(); shutdown.set(true); throw new ChangelogException(e); } } opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -31,7 +31,6 @@ import java.util.LinkedList; import java.util.List; import org.opends.messages.MessageBuilder; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.DirectoryThread; import org.opends.server.api.MonitorProvider; @@ -270,7 +269,7 @@ * @return a new {@link DBCursor} that allows to browse the db managed by this * ReplicaDB and starting at the position defined by a given CSN. * @throws ChangelogException * if a database problem happened. * if a database problem happened */ public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException @@ -325,7 +324,16 @@ while (msgQueue.size() != 0) { flush(); try { flush(); } catch (ChangelogException e) { // We are already shutting down logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH .get(stackTraceToSingleLineString(e))); } } db.shutdown(); @@ -372,25 +380,21 @@ } catch (Exception end) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get()); mb.append(" "); mb.append(stackTraceToSingleLineString(end)); logError(mb.toMessage()); thread.initiateShutdown(); if (replicationServer != null) { replicationServer.shutdown(); } stop(end); break; } } // call flush a last time before exiting to make sure that // no change was forgotten in the msgQueue flush(); try { // call flush a last time before exiting to make sure that // no change was forgotten in the msgQueue flush(); } catch (ChangelogException e) { stop(e); } } finally { @@ -403,6 +407,19 @@ } } private void stop(Exception e) { logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH .get(stackTraceToSingleLineString(e))); thread.initiateShutdown(); if (replicationServer != null) { replicationServer.shutdown(); } } /** * Retrieves the latest trim date. * @return the latest trim date. @@ -489,10 +506,14 @@ /** * Flush a number of updates from the memory list to the stable storage. * Flush is done by chunk sized to 500 messages, starting from the * beginning of the list. * <p> * Flush is done by chunk sized to 500 messages, starting from the beginning * of the list. * * @throws ChangelogException * If a database problem happened */ public void flush() public void flush() throws ChangelogException { int size; int chunksize = Math.min(queueMaxSize, 500); opends/src/server/org/opends/server/replication/server/changelog/je/JEUtils.java
New file @@ -0,0 +1,74 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2013 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.types.DebugLogLevel; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Transaction; import static org.opends.server.loggers.debug.DebugLogger.*; /** * Utility class for JE. */ public final class JEUtils { /** The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); private JEUtils() { // Utility class } /** * Aborts the current transaction. It has no effect if the transaction has * committed. * <p> * This method should only be used after an exception was caught and is about * to be rethrown . * * @param txn * the transaction to abort */ public static void abort(Transaction txn) { if (txn != null) { try { txn.abort(); } catch (DatabaseException ignored) { // Ignored because code is already throwing an exception TRACER.debugCaught(DebugLogLevel.ERROR, ignored); } } } } opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -119,7 +119,7 @@ * @param baseDN The baseDN of the replication domain. * @param replicationServer The ReplicationServer that needs to be shutdown. * @param dbenv The Db environment to use to create the db. * @throws ChangelogException If a database problem happened. * @throws ChangelogException If a database problem happened */ public ReplicationDB(int serverId, DN baseDN, ReplicationServer replicationServer, ReplicationDbEnv dbenv) @@ -184,9 +184,12 @@ /** * add a list of changes to the underlying db. * * @param changes The list of changes to add to the underlying db. * @param changes * The list of changes to add to the underlying db. * @throws ChangelogException * If a database problem happened */ public void addEntries(List<UpdateMsg> changes) public void addEntries(List<UpdateMsg> changes) throws ChangelogException { dbCloseLock.readLock().lock(); try @@ -209,7 +212,7 @@ } catch (DatabaseException e) { dbenv.shutdownOnException(e); throw new ChangelogException(e); } finally { @@ -280,9 +283,9 @@ * @param startCSN * The CSN from which the cursor must start.If null, start from the * oldest CSN * @throws ChangelogException * When a problem occurs or the startCSN does not exist. * @return The ReplServerDBCursor. * @throws ChangelogException * If a database problem happened */ public ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException @@ -320,8 +323,10 @@ * Read the oldest CSN present in the database. * * @return the oldest CSN in the DB, null if the DB is empty or closed * @throws ChangelogException * If a database problem happened */ public CSN readOldestCSN() public CSN readOldestCSN() throws ChangelogException { dbCloseLock.readLock().lock(); @@ -362,8 +367,7 @@ } catch (DatabaseException e) { dbenv.shutdownOnException(e); return null; throw new ChangelogException(e); } finally { @@ -375,8 +379,10 @@ * Read the newest CSN present in the database. * * @return the newest CSN in the DB, null if the DB is empty or closed * @throws ChangelogException * If a database problem happened */ public CSN readNewestCSN() public CSN readNewestCSN() throws ChangelogException { dbCloseLock.readLock().lock(); @@ -419,8 +425,7 @@ } catch (DatabaseException e) { dbenv.shutdownOnException(e); return null; throw new ChangelogException(e); } finally { @@ -435,8 +440,10 @@ * The CSN from which we start searching. * @return the CSN right before the one passed as a parameter. Can return null * if there is none. * @throws ChangelogException * If a database problem happened */ public CSN getPreviousCSN(CSN csn) public CSN getPreviousCSN(CSN csn) throws ChangelogException { if (csn == null) { @@ -479,7 +486,7 @@ } catch (DatabaseException e) { dbenv.shutdownOnException(e); throw new ChangelogException(e); } finally { @@ -652,12 +659,12 @@ } catch (ChangelogException e) { abort(localTxn); JEUtils.abort(localTxn); throw e; } catch (Exception e) { abort(localTxn); JEUtils.abort(localTxn); throw new ChangelogException(e); } finally @@ -669,21 +676,6 @@ } } private void abort(Transaction localTxn) { if (localTxn != null) { try { localTxn.abort(); } catch (DatabaseException ignore) { // Ignore. } } } /** * Close the ReplicationServer Cursor. */ @@ -716,9 +708,9 @@ } /** * Abort the Cursor after a Deadlock Exception. * This method catch and ignore the DeadlockException because * this must be done when aborting a cursor after a DeadlockException * Abort the cursor after an Exception. * This method catch and ignore the DatabaseException because * this must be done when aborting a cursor after a DatabaseException * (per the Cursor documentation). * This should not be used in any other case. */ @@ -734,18 +726,7 @@ } closeAndReleaseReadLock(cursor); if (txn != null) { try { txn.abort(); } catch (DatabaseException e) { dbenv.shutdownOnException(e); } } JEUtils.abort(txn); } /** opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -614,30 +614,8 @@ */ void shutdownOnException(DatabaseException e) { innerShutdownOnException(e); } /** * Shuts down replication when an unexpected changelog exception occurs. Note * that we do not expect lock timeouts or txn timeouts because the replication * databases are deadlock free, thus all operations should complete * eventually. * * @param e * The unexpected changelog exception. */ void shutdownOnException(ChangelogException e) { innerShutdownOnException(e); } private void innerShutdownOnException(Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get()); mb.append(". "); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); logError(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR .get(stackTraceToSingleLineString(e))); replicationServer.shutdown(); }