opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -1348,7 +1348,7 @@ * if a database problem occurs. */ private boolean assignChangeNumber(final ECLUpdateMsg oldestChange) throws DirectoryException, ChangelogException throws ChangelogException { // We also need to check if the draftCNdb is consistent with // the changelogdb. @@ -1451,7 +1451,7 @@ } private void assignNewChangeNumberAndStore(ECLUpdateMsg change) throws DirectoryException, ChangelogException throws ChangelogException { ChangeNumberIndexDB cnIndexDB = replicationServer.getChangeNumberIndexDB(); opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2202,7 +2202,17 @@ */ public void clearDbs() { changelogDB.removeDomain(baseDN); try { changelogDB.removeDomain(baseDN); } catch (ChangelogException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLEARING_DB.get(baseDN.toString(), e.getMessage() + " " + stackTraceToSingleLineString(e))); logError(mb.toMessage()); } } /** opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -172,8 +172,10 @@ * * @param baseDN * the replication domain baseDN * @throws ChangelogException * If a database problem happened */ void removeDomain(DN baseDN); void removeDomain(DN baseDN) throws ChangelogException; // serverId methods opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -653,14 +653,9 @@ return; } String dbName = db.getDatabaseName(); // Closing is requested by the Berkeley DB before truncate db.close(); final Database oldDb = db; db = null; // In case there's a failure between here and recreation. // Clears the changes dbenv.clearDb(dbName); dbenv.clearDb(oldDb); // RE-create the db db = dbenv.getOrCreateDraftCNDb(); @@ -687,6 +682,6 @@ */ private boolean isDBClosed() { return db == null; return db == null || !db.getEnvironment().isValid(); } } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -33,7 +33,6 @@ import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.server.config.ConfigException; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; @@ -43,13 +42,11 @@ import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.ReplicaDBCursor; import org.opends.server.types.DN; import org.opends.server.types.DebugLogLevel; import org.opends.server.util.Pair; import org.opends.server.util.StaticUtils; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; /** @@ -58,9 +55,6 @@ public class JEChangelogDB implements ChangelogDB { /** The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * This map contains the List of updates received from each LDAP server. */ @@ -313,6 +307,11 @@ */ public void clearDB() throws ChangelogException { if (!dbDirectory.exists()) { return; } // Remember the first exception because : // - we want to try to remove everything we want to remove // - then throw the first encountered exception @@ -402,6 +401,7 @@ public void shutdownDomain(DN baseDN) { shutdownDbHandlers(getDomainMap(baseDN)); sourceDbHandlers.remove(baseDN); } private void shutdownDbHandlers(Map<Integer, DbHandler> domainMap) @@ -446,8 +446,13 @@ /** {@inheritDoc} */ @Override public void removeDomain(DN baseDN) public void removeDomain(DN baseDN) throws ChangelogException { // Remember the first exception because : // - we want to try to remove everything we want to remove // - then throw the first encountered exception ChangelogException firstException = null; // 1- clear the replica DBs final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN); synchronized (domainMap) @@ -458,17 +463,13 @@ { dbHandler.clear(); } catch (Exception e) catch (ChangelogException e) { // TODO: i18n MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(), e .getMessage() + " " + stackTraceToSingleLineString(e))); logError(mb.toMessage()); firstException = e; } } shutdownDbHandlers(domainMap); sourceDbHandlers.remove(baseDN); } // 2- clear the ChangeNumber index DB @@ -480,11 +481,11 @@ { cnIndexDB.clear(baseDN); } catch (Exception ignored) catch (ChangelogException e) { if (debugEnabled()) if (firstException == null) { TRACER.debugCaught(DebugLogLevel.WARNING, ignored); firstException = e; } } } @@ -495,13 +496,18 @@ { dbEnv.clearGenerationId(baseDN); } catch (Exception ignored) catch (ChangelogException e) { if (debugEnabled()) if (firstException == null) { TRACER.debugCaught(DebugLogLevel.WARNING, ignored); firstException = e; } } if (firstException != null) { throw firstException; } } /** {@inheritDoc} */ opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -874,17 +874,12 @@ return; } String dbName = db.getDatabaseName(); // Clears the reference to this serverID dbenv.clearServerId(baseDN, serverId); // Closing is requested by the Berkeley DB before truncate db.close(); final Database oldDb = db; db = null; // In case there's a failure between here and recreation. // Clears the changes dbenv.clearDb(dbName); dbenv.clearDb(oldDb); // RE-create the db db = dbenv.getOrAddDb(serverId, baseDN, -1); @@ -1190,7 +1185,7 @@ */ private boolean isDBClosed() { return db == null; return db == null || !db.getEnvironment().isValid(); } } opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -29,7 +29,10 @@ import java.io.File; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; @@ -41,7 +44,9 @@ import org.opends.server.types.DirectoryException; import com.sleepycat.je.*; import com.sleepycat.je.config.EnvironmentParams; import static com.sleepycat.je.EnvironmentConfig.*; import static com.sleepycat.je.LockMode.*; import static com.sleepycat.je.OperationStatus.*; @@ -51,19 +56,19 @@ import static org.opends.server.util.StaticUtils.*; /** * This class is used to represent a Db environment that can be used * to create ReplicationDB. * This class represents a DB environment that acts as a factory for * ReplicationDBs. */ public class ReplicationDbEnv { private Environment dbEnvironment; private Database changelogStateDb; private final List<Database> allDbs = new CopyOnWriteArrayList<Database>(); private ReplicationServer replicationServer; private final AtomicBoolean isShuttingDown = new AtomicBoolean(false); private static final String GENERATION_ID_TAG = "GENID"; private static final String FIELD_SEPARATOR = " "; /** * The tracer object for the debug logger. */ /** The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** @@ -92,9 +97,9 @@ */ envConfig.setAllowCreate(true); envConfig.setTransactional(true); envConfig.setConfigParam("je.cleaner.threads", "2"); envConfig.setConfigParam("je.checkpointer.highPriority", "true"); envConfig.setConfigParam(STATS_COLLECT, "false"); envConfig.setConfigParam(CLEANER_THREADS, "2"); envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true"); /* * Tests have shown that since the parsing of the Replication log is * always done sequentially, it is not necessary to use a large DB cache. @@ -106,15 +111,15 @@ * read buffers. This will result in more scalable checkpointer and * cleaner performance. */ envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2)); envConfig.setConfigParam("je.log.iteratorReadSize", mb(2)); envConfig.setConfigParam("je.log.faultReadSize", kb(4)); envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2)); envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2)); envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4)); /* * The cache size must be bigger in order to accommodate the larger * buffers - see OPENDJ-943. */ envConfig.setConfigParam("je.maxMemory", mb(16)); envConfig.setConfigParam(MAX_MEMORY, mb(16)); } else { @@ -122,7 +127,7 @@ * Use 5M so that the replication can be used with 64M total for the * JVM. */ envConfig.setConfigParam("je.maxMemory", mb(5)); envConfig.setConfigParam(MAX_MEMORY, mb(5)); } // Since records are always added at the end of the Replication log and @@ -160,12 +165,27 @@ return String.valueOf(sizeInMb * 1024 * 1024); } private Database openDatabase(String databaseName) throws RuntimeException private Database openDatabase(String databaseName) throws ChangelogException, RuntimeException { if (isShuttingDown.get()) { // TODO JNR i18n throw new ChangelogException(Message.raw("DB is closing")); } final DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(true); return dbEnvironment.openDatabase(null, databaseName, dbConfig); final Database db = dbEnvironment.openDatabase(null, databaseName, dbConfig); if (isShuttingDown.get()) { closeDB(db); // TODO JNR i18n throw new ChangelogException(Message.raw("DB is closing")); } allDbs.add(db); return db; } /** @@ -395,44 +415,69 @@ } } /** * Shutdown the Db environment. */ public void shutdown() /** * Shutdown the Db environment. */ public void shutdown() { isShuttingDown.set(true); // CopyOnWriteArrayList iterator never throw ConcurrentModificationException // This code rely on openDatabase() to close databases opened concurrently // with this code final Database[] allDbsCopy = allDbs.toArray(new Database[0]); allDbs.clear(); for (Database db : allDbsCopy) { try { changelogStateDb.close(); } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get()); mb.append(" "); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); } try { dbEnvironment.close(); } catch (DatabaseException e) { MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get()); mb.append(" "); mb.append(stackTraceToSingleLineString(e)); logError(mb.toMessage()); } closeDB(db); } try { dbEnvironment.close(); } catch (DatabaseException e) { logError(newErrorMessage(e)); } } private void closeDB(Database db) { allDbs.remove(db); try { db.close(); } catch (DatabaseException e) { logError(newErrorMessage(e)); } } private Message newErrorMessage(DatabaseException e) { if (!isShuttingDown.get()) { return NOTE_EXCEPTION_CLOSING_DATABASE .get(stackTraceToSingleLineString(e)); } MessageBuilder mb = new MessageBuilder(); mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get()); mb.append(" "); mb.append(stackTraceToSingleLineString(e)); return mb.toMessage(); } /** * Clears the provided generationId associated to the provided baseDN from the * state Db. * * @param baseDN * The baseDN for which the generationID must be cleared. * @throws ChangelogException * If a database problem happened */ public void clearGenerationId(DN baseDN) public void clearGenerationId(DN baseDN) throws ChangelogException { deleteFromChangelogStateDB(buildGenIdKey(baseDN), "clearGenerationId(baseDN=" + baseDN + ")"); @@ -446,15 +491,17 @@ * The baseDN for which the serverId must be cleared. * @param serverId * The serverId to remove from the Db. * @throws ChangelogException * If a database problem happened */ public void clearServerId(DN baseDN, int serverId) public void clearServerId(DN baseDN, int serverId) throws ChangelogException { deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId), "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")"); } private void deleteFromChangelogStateDB(String keyString, String methodInvocation) String methodInvocation) throws ChangelogException { if (debugEnabled()) debug(methodInvocation + " starting"); @@ -490,18 +537,23 @@ } catch (RuntimeException dbe) { // FIXME can actually happen (see catch above) // what should we do about it? throw new ChangelogException(dbe); } } /** * Clears the database. * * @param databaseName The name of the database to clear. * @param db * The database to clear. */ public final void clearDb(String databaseName) public final void clearDb(Database db) { String databaseName = db.getDatabaseName(); // Closing is requested by Berkeley JE before truncate db.close(); Transaction txn = null; try { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -28,10 +28,7 @@ package org.opends.server.replication; import java.net.SocketException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; import java.util.*; import java.util.concurrent.locks.Lock; import org.opends.messages.Category; @@ -406,9 +403,6 @@ logError(Message.raw(Category.SYNC, Severity.NOTICE, " ##### Calling ReplicationTestCase.classCleanUp ##### ")); // Clean RS databases cleanUpReplicationServersDB(); removeReplicationServerDB(); cleanConfigEntries(); @@ -487,15 +481,18 @@ */ protected void removeReplicationServerDB() throws Exception { for (ReplicationServer rs : ReplicationServer.getAllInstances()) { clearChangelogDB(rs); rs.getChangelogDB().removeDB(); } // avoid ConcurrentModificationException remove(new ArrayList<ReplicationServer>(ReplicationServer.getAllInstances())); } protected void remove(ReplicationServer... replicationServers) throws Exception { remove(Arrays.asList(replicationServers)); } protected void remove(Collection<ReplicationServer> replicationServers) throws Exception { for (ReplicationServer rs : replicationServers) { if (rs != null) opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -29,7 +29,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; @@ -168,7 +167,6 @@ @Test(enabled=true, dependsOnMethods = { "searchBackend"}) public void replicationServerTest() throws Exception { clearChangelogDB(replicationServer); changelogBasic(); newClientLateServer1(); newClient(); @@ -192,7 +190,6 @@ @Test(enabled=false, dependsOnMethods = { "searchBackend"}) public void replicationServerTestLoop() throws Exception { clearChangelogDB(replicationServer); changelogBasic(); while (true) { @@ -286,7 +283,36 @@ "ReplicationServer basic : incorrect message type received: " + receivedMsg.getClass() + ": content: " + receivedMsg); assertEquals(receivedMsg.toString(), sentMsg.toString(), "ReplicationServer basic : incorrect message body received."); "ReplicationServer basic : incorrect message body received. CSN is same as \"" + getCSNFieldName(((DeleteMsg) receivedMsg).getCSN()) + "\" field."); } private String getCSNFieldName(CSN csn) { if (csn == null) { return ""; } if (csn.equals(firstCSNServer1)) { return "firstCSNServer1"; } else if (csn.equals(secondCSNServer1)) { return "secondCSNServer1"; } else if (csn.equals(firstCSNServer2)) { return "firstCSNServer2"; } else if (csn.equals(secondCSNServer2)) { return "secondCSNServer2"; } else if (csn.equals(unknownCSNServer1)) { return "unknownCSNServer1"; } return null; } private ServerState newServerState(CSN... csns) @@ -855,7 +881,6 @@ @Test(enabled=true, dependsOnMethods = { "searchBackend"}) public void windowProbeTest() throws Exception { debugInfo("Starting windowProbeTest"); final int WINDOW = 10; @@ -877,13 +902,13 @@ */ // open the first session to the replication server InetSocketAddress ServerAddr = new InetSocketAddress( InetAddress.getByName("localhost"), replicationServerPort); InetSocketAddress serverAddr = new HostPort("localhost", replicationServerPort).toInetSocketAddress(); Socket socket = new Socket(); socket.setReceiveBufferSize(1000000); socket.setTcpNoDelay(true); int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); socket.connect(ServerAddr, timeoutMS); socket.connect(serverAddr, timeoutMS); ReplSessionSecurity replSessionSecurity = getReplSessionSecurity(); Session session = replSessionSecurity.createClientSession(socket, timeoutMS);