opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -22,7 +22,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions copyright 2013 ForgeRock AS. * Portions copyright 2013-2014 ForgeRock AS. */ package org.opends.server.replication.protocol; @@ -66,8 +66,6 @@ return csn; } /** * Creates a message from a provided byte array. * @@ -113,11 +111,7 @@ } } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public byte[] getBytes(short protocolVersion) { @@ -141,4 +135,10 @@ } } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + ", csn=" + csn.toStringUI(); } } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -947,17 +947,6 @@ } /** * Get the queueSize for this replication server. * * @return The maximum size of the queues for this Replication Server * */ public int getQueueSize() { return this.config.getQueueSize(); } /** * Creates the backend associated to this replication server. */ private void createBackend() throws ConfigException opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -62,8 +62,6 @@ private static int NO_KEY = 0; private DraftCNDB db; /** FIXME What is this field used for? */ private volatile long oldestChangeNumber = NO_KEY; /** * The newest changenumber stored in the DB. It is used to avoid purging the * record with the newest changenumber. The newest record in the changenumber @@ -96,14 +94,11 @@ public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException { db = new DraftCNDB(dbEnv); final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord(); final ChangeNumberIndexRecord newestRecord = db.readLastRecord(); oldestChangeNumber = getChangeNumber(oldestRecord); final long newestCN = getChangeNumber(newestRecord); newestChangeNumber = newestCN; newestChangeNumber = getChangeNumber(newestRecord); // initialization of the lastGeneratedChangeNumber from the DB content // if DB is empty => last record does not exist => default to 0 lastGeneratedChangeNumber = new AtomicLong(newestCN); lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber); // Monitoring registration DirectoryServer.deregisterMonitorProvider(dbMonitor); @@ -117,7 +112,7 @@ { return record.getChangeNumber(); } return 0; return NO_KEY; } /** {@inheritDoc} */ @@ -198,19 +193,11 @@ */ public void shutdown() { if (shutdown.get()) if (shutdown.compareAndSet(false, true)) { return; db.shutdown(); DirectoryServer.deregisterMonitorProvider(dbMonitor); } shutdown.set(true); synchronized (this) { notifyAll(); } db.shutdown(); DirectoryServer.deregisterMonitorProvider(dbMonitor); } /** @@ -236,11 +223,6 @@ while (!mustShutdown(shutdown) && cursor.next()) { final ChangeNumberIndexRecord record = cursor.currentRecord(); if (record.getChangeNumber() != oldestChangeNumber) { oldestChangeNumber = record.getChangeNumber(); } if (record.getChangeNumber() != newestChangeNumber && record.getCSN().isOlderThan(purgeCSN)) { @@ -293,14 +275,9 @@ final DraftCNDBCursor cursor = db.openDeleteCursor(); try { boolean isOldestRecord = true; while (!mustShutdown(shutdown) && cursor.next()) { final ChangeNumberIndexRecord record = cursor.currentRecord(); if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber) { oldestChangeNumber = record.getChangeNumber(); } if (record.getChangeNumber() == newestChangeNumber) { // do not purge the newest record to avoid having the last generated @@ -312,10 +289,6 @@ { cursor.delete(); } else { isOldestRecord = false; } } } catch (ChangelogException e) @@ -398,7 +371,7 @@ @Override public String toString() { return getClass().getSimpleName() + ": " + oldestChangeNumber + " " return getClass().getSimpleName() + ", newestChangeNumber=" + newestChangeNumber; } @@ -411,8 +384,7 @@ public void clear() throws ChangelogException { db.clear(); oldestChangeNumber = getChangeNumber(db.readFirstRecord()); newestChangeNumber = getChangeNumber(db.readLastRecord()); newestChangeNumber = NO_KEY; } } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -385,11 +385,13 @@ if (indexer != null) { indexer.initiateShutdown(); indexer.interrupt(); } final ChangelogDBPurger purger = cnPurger.getAndSet(null); if (purger != null) { purger.initiateShutdown(); purger.interrupt(); } try @@ -417,6 +419,23 @@ if (dbEnv != null) { // wait for shutdown of the threads holding cursors try { if (indexer != null) { indexer.join(); } if (purger != null) { purger.join(); } } catch (InterruptedException e) { // do nothing: we are already shutting down } dbEnv.shutdown(); } opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -28,14 +28,10 @@ import java.util.ArrayList; import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.DirectoryThread; import org.opends.server.api.MonitorProvider; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; @@ -52,8 +48,6 @@ import org.opends.server.types.InitializationException; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.util.StaticUtils.*; /** * This class is used for managing the replicationServer database for each @@ -67,7 +61,7 @@ * <p> * This class publish some monitoring information below cn=monitor. */ public class JEReplicaDB implements Runnable public class JEReplicaDB { /** @@ -89,29 +83,7 @@ } /** * The msgQueue holds all the updates not yet saved to stable storage. * <p> * This blocking queue is only used as a temporary placeholder so that the * write in the stable storage can be grouped for efficiency reason. Adding an * update synchronously add the update to this list. A dedicated thread * flushes this blocking queue. * <p> * Changes are not read back by replicationServer threads that are responsible * for pushing the changes to other replication server or to LDAP server */ private final LinkedBlockingQueue<UpdateMsg> msgQueue = new LinkedBlockingQueue<UpdateMsg>(); /** * Semaphore used to limit the number of bytes used in memory by the queue. * The threads calling {@link #add(UpdateMsg)} method will be blocked if the * size of msgQueue becomes larger than the available permits and will resume * only when the number of available permits allow it. */ private final Semaphore queueSizeBytes; private final int queueMaxBytes; private final AtomicBoolean shutdown = new AtomicBoolean(false); private ReplicationDB db; /** * Holds the oldest and newest CSNs for this replicaDB for fast retrieval. @@ -122,12 +94,6 @@ private int serverId; private DN baseDN; private DbMonitorProvider dbMonitor = new DbMonitorProvider(); private DirectoryThread thread; /** * Used to prevent race conditions between threads calling {@link #flush()}. * This can happen with the thread flushing the queue, or else on shutdown. */ private final Object flushLock = new Object(); private ReplicationServer replicationServer; /** @@ -147,15 +113,8 @@ this.replicationServer = replicationServer; this.serverId = serverId; this.baseDN = baseDN; queueMaxBytes = replicationServer.getQueueSize() * 200; queueSizeBytes = new Semaphore(queueMaxBytes); db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv); csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); thread = new DirectoryThread(this, "Replication server RS(" + replicationServer.getServerId() + ") flusher thread for Replica DS(" + serverId + ") for domain \"" + baseDN + "\""); thread.start(); DirectoryServer.deregisterMonitorProvider(dbMonitor); DirectoryServer.registerMonitorProvider(dbMonitor); @@ -174,34 +133,14 @@ */ public void add(UpdateMsg updateMsg) throws ChangelogException { if (thread.isShutdownInitiated()) if (shutdown.get()) { throw new ChangelogException( ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg .toString(), String.valueOf(baseDN), String.valueOf(serverId))); } final int msgSize = updateMsg.size(); if (msgSize < queueMaxBytes) { try { queueSizeBytes.acquire(msgSize); } catch (InterruptedException e) { throw new ChangelogException( ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg .toString(), String.valueOf(baseDN), String.valueOf(serverId), stackTraceToSingleLineString(e))); } } else { // edge case with a very large message collectAllPermits(); } msgQueue.add(updateMsg); db.addEntry(updateMsg); final CSNLimits limits = csnLimits; final boolean updateNew = limits.newestCSN == null @@ -215,22 +154,6 @@ } } /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */ private void collectAllPermits() { int collectedPermits = queueSizeBytes.drainPermits(); while (collectedPermits != queueMaxBytes) { // Do not use Thread.sleep() because: // 1) it is expected the permits will be released very soon // 2) we want to collect all the permits, so do not leave a chance to // other threads to steal them from us. // 3) we want to keep low latency Thread.yield(); collectedPermits += queueSizeBytes.drainPermits(); } } /** * Get the oldest CSN that has not been purged yet. * @@ -289,80 +212,10 @@ */ public void shutdown() { if (thread.isShutdownInitiated()) if (shutdown.compareAndSet(false, true)) { return; } thread.initiateShutdown(); while (msgQueue.size() != 0) { try { flush(); } catch (ChangelogException e) { // We are already shutting down logError(e.getMessageObject()); } } db.shutdown(); DirectoryServer.deregisterMonitorProvider(dbMonitor); } /** * Flushes the replicaDB queue from memory to stable storage. */ @Override public void run() { thread.startWork(); try { while (!thread.isShutdownInitiated()) { try { flush(); } catch (ChangelogException end) { stop(end); break; } } try { // call flush a last time before exiting to make sure that // no change was forgotten in the msgQueue flush(); } catch (ChangelogException e) { stop(e); } } finally { thread.stopWork(); } } private void stop(Exception e) { logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH .get(stackTraceToSingleLineString(e))); thread.initiateShutdown(); if (replicationServer != null) { replicationServer.shutdown(); db.shutdown(); DirectoryServer.deregisterMonitorProvider(dbMonitor); } } @@ -401,7 +254,7 @@ { for (int j = 0; j < 50; j++) { if (thread.isShutdownInitiated()) if (shutdown.get()) { return; } @@ -428,7 +281,7 @@ // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() cursor.abort(); thread.initiateShutdown(); shutdown.set(true); throw e; } finally @@ -439,47 +292,6 @@ } /** * Flush a number of updates from the memory list to the stable storage. * <p> * Flush is done by chunk sized to 500 messages, starting from the beginning * of the list. * <p> * @GuardedBy("flushLock") * @throws ChangelogException * If a database problem happened */ private void flush() throws ChangelogException { try { synchronized (flushLock) { final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS); if (change == null) { // nothing to persist, check if shutdown was invoked return; } // Try to see if there are more changes and persist them all. final List<UpdateMsg> changes = new LinkedList<UpdateMsg>(); changes.add(change); msgQueue.drainTo(changes); int totalSize = db.addEntries(changes); // do not release more than queue max size permits // (be careful of the edge case with the very large message) queueSizeBytes.release(Math.min(totalSize, queueMaxBytes)); } } catch (InterruptedException e) { throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH .get(stackTraceToSingleLineString(e))); } } /** * This internal class is used to implement the Monitoring capabilities of the * ReplicaDB. */ @@ -501,9 +313,6 @@ { create(attributes, "last-change", encode(limits.newestCSN)); } create(attributes, "queue-size", String.valueOf(msgQueue.size())); create(attributes, "queue-size-bytes", String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits())); return attributes; } @@ -552,8 +361,6 @@ */ public void clear() throws ChangelogException { collectAllPermits(); msgQueue.clear(); // this call should not do anything at all db.clear(); csnLimits = new CSNLimits(null, null); } opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -28,7 +28,6 @@ import java.io.Closeable; import java.io.UnsupportedEncodingException; import java.util.List; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -182,15 +181,14 @@ } /** * add a list of changes to the underlying db. * add one change to the underlying db. * * @param changes * The list of changes to add to the underlying db. * @return the total size of all the changes * @param change * The change to add to the underlying db. * @throws ChangelogException * If a database problem happened */ public int addEntries(List<UpdateMsg> changes) throws ChangelogException public void addEntry(UpdateMsg change) throws ChangelogException { dbCloseLock.readLock().lock(); try @@ -198,26 +196,22 @@ // If the DB has been closed then return immediately. if (isDBClosed()) { return 0; return; } int totalSize = 0; for (UpdateMsg change : changes) { final DatabaseEntry key = createReplicationKey(change.getCSN()); final DatabaseEntry data = new ReplicationData(change); final DatabaseEntry key = createReplicationKey(change.getCSN()); final DatabaseEntry data = new ReplicationData(change); insertCounterRecordIfNeeded(change.getCSN()); db.put(null, key, data); counterCurrValue++; totalSize += change.size(); } return totalSize; insertCounterRecordIfNeeded(change.getCSN()); db.put(null, key, data); counterCurrValue++; } catch (DatabaseException e) { throw new ChangelogException(e); throw new ChangelogException( ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get( change.toString(), String.valueOf(baseDN), String.valueOf(serverId), stackTraceToSingleLineString(e))); } finally { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -48,8 +48,7 @@ import static org.testng.Assert.*; /** * Test the JEChangeNumberIndexDB class with 2 kinds of cleaning of the db : - * periodic trim - call to clear method() * Test the JEChangeNumberIndexDB class. */ @SuppressWarnings("javadoc") public class JEChangeNumberIndexDBTest extends ReplicationTestCase @@ -71,13 +70,13 @@ * <li>create the db</li> * <li>add records</li> * <li>read them with a cursor</li> * <li>set a very short trim period</li> * <li>wait for the db to be trimmed / here since the changes are not stored * <li>set a very short purge period</li> * <li>wait for the db to be purged / here since the changes are not stored * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li> * </ol> */ @Test void testPurge() throws Exception public void testPurge() throws Exception { ReplicationServer replicationServer = null; try @@ -172,8 +171,8 @@ * <li>clear the db</li> * </ol> */ @Test() void testClear() throws Exception @Test public void testClear() throws Exception { ReplicationServer replicationServer = null; try opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -97,8 +97,6 @@ //-- // Iterator tests with changes persisted waitChangesArePersisted(replicaDB, 3); assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]); assertNotFound(replicaDB, csns[4]); @@ -108,7 +106,6 @@ //-- // Cursor tests with changes persisted replicaDB.add(update4); waitChangesArePersisted(replicaDB, 4); assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]); // Test cursor from existing CSN @@ -141,29 +138,6 @@ } } private void waitChangesArePersisted(JEReplicaDB replicaDB, int nbRecordsInserted) throws Exception { waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000); } private void waitChangesArePersisted(JEReplicaDB replicaDB, int nbRecordsInserted, int counterWindow) throws Exception { // one counter record is inserted every time "counterWindow" // records have been inserted int expectedNbRecords = nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow; int count = 0; while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100) { Thread.sleep(10); count++; } assertEquals(replicaDB.getNumberRecords(), expectedNbRecords); } static CSN[] newCSNs(int serverId, long timestamp, int number) { CSNGenerator gen = new CSNGenerator(serverId, timestamp); @@ -301,7 +275,6 @@ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); } } waitChangesArePersisted(replicaDB, 4); cursor = replicaDB.generateCursorFrom(csns[0]); assertTrue(cursor.next()); @@ -378,7 +351,6 @@ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); mySeqnum+=2; } waitChangesArePersisted(replicaDB, max, counterWindow); assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); @@ -402,7 +374,6 @@ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); mySeqnum+=2; } waitChangesArePersisted(replicaDB, 2 * max, counterWindow); assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");