From 443a7bb2635912474bb086501b53553528e6e09f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 27 Jun 2013 09:43:42 +0000
Subject: [PATCH] DbHandler.java, ReplicationDB.java: Javadoc / code cleanups.
---
opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 79 +++++++++----------
opends/src/server/org/opends/server/replication/server/DbHandler.java | 103 ++++++++++++++-----------
2 files changed, 96 insertions(+), 86 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index c175b78..7a7e594 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -26,29 +26,29 @@
* Portions copyright 2011-2013 ForgeRock AS
*/
package org.opends.server.replication.server;
-import org.opends.messages.MessageBuilder;
-import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.util.StaticUtils.*;
import java.util.ArrayList;
import java.util.Date;
-import java.util.List;
import java.util.LinkedList;
+import java.util.List;
+import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.InitializationException;
-import org.opends.server.util.TimeThread;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.InitializationException;
+import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
@@ -58,45 +58,54 @@
* It is responsible for efficiently saving the updates that is received from
* each master server into stable storage.
* This class is also able to generate a ReplicationIterator that can be
- * used to read all changes from a given ChangeNUmber.
+ * used to read all changes from a given ChangeNumber.
*
* This class publish some monitoring information below cn=monitor.
- *
*/
public class DbHandler implements Runnable
{
- // The msgQueue holds all the updates not yet saved to stable storage.
- // This list is only used as a temporary placeholder so that the write
- // in the stable storage can be grouped for efficiency reason.
- // Adding an update synchronously add the update to this list.
- // A dedicated thread loops on flush() and trim().
- // flush() : get a number of changes from the in memory list by block
- // and write them to the db.
- // trim() : deletes from the DB a number of changes that are older than a
- // certain date.
- //
- // Changes are not read back by replicationServer threads that are responsible
- // for pushing the changes to other replication server or to LDAP server
- //
+ /**
+ * The msgQueue holds all the updates not yet saved to stable storage.
+ * <p>
+ * This list is only used as a temporary placeholder so that the write in the
+ * stable storage can be grouped for efficiency reason. Adding an update
+ * synchronously add the update to this list. A dedicated thread loops on
+ * flush() and trim().
+ * <dl>
+ * <dt>flush()</dt>
+ * <dd>get a number of changes from the in memory list by block and write them
+ * to the db.</dd>
+ * <dt>trim()</dt>
+ * <dd>deletes from the DB a number of changes that are older than a certain
+ * date.</dd>
+ * </dl>
+ * <p>
+ * Changes are not read back by replicationServer threads that are responsible
+ * for pushing the changes to other replication server or to LDAP server
+ */
private final LinkedList<UpdateMsg> msgQueue =
new LinkedList<UpdateMsg>();
- // The High and low water mark for the max size of the msgQueue.
- // the threads calling add() method will be blocked if the size of
- // msgQueue becomes larger than the queueHimark and will resume
- // only when the size of the msgQueue goes below queueLowmark.
- int queueMaxSize = 5000;
- int queueLowmark = 1000;
- int queueHimark = 4000;
+ /**
+ * The High and low water mark for the max size of the msgQueue. The threads
+ * calling add() method will be blocked if the size of msgQueue becomes larger
+ * than the queueHimark and will resume only when the size of the msgQueue
+ * goes below queueLowmark.
+ */
+ private int queueMaxSize = 5000;
+ private int queueLowmark = 1000;
+ private int queueHimark = 4000;
- // The queue himark and lowmark in bytes, this is set to 100 times the
- // himark and lowmark in number of updates.
- int queueMaxBytes = 100 * queueMaxSize;
- int queueLowmarkBytes = 100 * queueLowmark;
- int queueHimarkBytes = 100 * queueHimark;
+ /**
+ * The queue himark and lowmark in bytes, this is set to 100 times the himark
+ * and lowmark in number of updates.
+ */
+ private int queueMaxBytes = 100 * queueMaxSize;
+ private int queueLowmarkBytes = 100 * queueLowmark;
+ private int queueHimarkBytes = 100 * queueHimark;
- // The number of bytes currently in the queue
- int queueByteSize = 0;
+ /** The number of bytes currently in the queue */
+ private int queueByteSize = 0;
private ReplicationDB db;
private ChangeNumber firstChange = null;
@@ -113,10 +122,8 @@
private long latestTrimDate = 0;
/**
- *
* The trim age in milliseconds. Changes record in the change DB that
* are older than this age are removed.
- *
*/
private long trimAge;
@@ -195,7 +202,9 @@
lastChange = update.getChangeNumber();
}
if (firstChange == null)
+ {
firstChange = update.getChangeNumber();
+ }
}
}
@@ -299,9 +308,11 @@
queueByteSize -= msg.size();
current++;
}
- if ((msgQueue.size() < queueLowmark) &&
- (queueByteSize < queueLowmarkBytes))
+ if ((msgQueue.size() < queueLowmark)
+ && (queueByteSize < queueLowmarkBytes))
+ {
msgQueue.notifyAll();
+ }
}
}
@@ -334,7 +345,9 @@
}
while (msgQueue.size() != 0)
+ {
flush();
+ }
db.shutdown();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -345,6 +358,7 @@
* Periodically Flushes the ReplicationServerDomain cache from memory to the
* stable storage and trims the old updates.
*/
+ @Override
public void run()
{
while (!shutdown)
@@ -650,16 +664,15 @@
*/
public int getCount(ChangeNumber from, ChangeNumber to)
{
- int c=0;
// Now that we always keep the last ChangeNumber in the DB to avoid
- // expiring cookies to quickly, we need to check if the "to"
+ // expiring cookies too quickly, we need to check if the "to"
// is older than the trim date.
if ((to == null) || !to.older(new ChangeNumber(latestTrimDate, 0, 0)))
{
flush();
- c = db.count(from, to);
+ return db.count(from, to);
}
- return c;
+ return 0;
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 2536033..ea5612a 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -27,21 +27,19 @@
*/
package org.opends.server.replication.server;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
-
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.decodeUTF8;
-import static org.opends.server.util.StaticUtils.getBytes;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-
-import java.util.List;
-
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
-
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.opends.server.util.StaticUtils;
import com.sleepycat.je.*;
@@ -58,8 +56,10 @@
private int serverId;
private String baseDn;
- // The lock used to provide exclusive access to the thread that
- // close the db (shutdown or clear).
+ /**
+ * The lock used to provide exclusive access to the thread that close the db
+ * (shutdown or clear).
+ */
private ReentrantReadWriteLock dbCloseLock;
// Change counter management
@@ -85,20 +85,22 @@
// since the previous counter record
// 2/- the change to be stored has a new timestamp - so that the counter
// record is the first record for this timestamp.
- //
+ /** Current value of the counter. */
private int counterCurrValue = 1;
- // Current value of the counter.
+ /**
+ * When not null, the next change with a ts different from
+ * tsForNewCounterRecord will lead to store a new counterRecord.
+ */
private long counterTsLimit = 0;
- // When not null,
- // the next change with a ts different from tsForNewCounterRecord will lead
- // to store a new counterRecord.
+ /**
+ * The counter record will never be written to the db more often than each
+ * counterWindowSize changes.
+ */
private int counterWindowSize = 1000;
- // The counter record will never be written to the db more often than each
- // counterWindowSize changes.
/**
* Creates a new database or open existing database that will be used
@@ -126,7 +128,6 @@
dbCloseLock = new ReentrantReadWriteLock(true);
- //
Cursor cursor;
Transaction txn = null;
DatabaseEntry key = new DatabaseEntry();
@@ -292,17 +293,7 @@
{
try
{
- if (cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch (DatabaseException e)
- {
- // Ignore.
- }
- }
+ StaticUtils.close(cursor);
}
finally
{
@@ -458,9 +449,10 @@
*/
public ChangeNumber getPreviousChangeNumber(ChangeNumber changeNumber)
{
-
if (changeNumber == null)
+ {
return null;
+ }
Cursor cursor = null;
ChangeNumber cn = null;
@@ -565,11 +557,15 @@
* This Class implements a cursor that can be used to browse a
* replicationServer database.
*/
- public class ReplServerDBCursor
+ public class ReplServerDBCursor implements Closeable
{
- // The transaction that will protect the actions done with the cursor
- // Will be let null for a read cursor
- // Will be set non null for a write cursor
+ /**
+ * The transaction that will protect the actions done with the cursor
+ * <p>
+ * Will be let null for a read cursor
+ * <p>
+ * Will be set non null for a write cursor
+ */
private final Transaction txn;
private final Cursor cursor;
private final DatabaseEntry key;
@@ -706,6 +702,7 @@
/**
* Close the ReplicationServer Cursor.
*/
+ @Override
public void close()
{
synchronized (this)
@@ -1111,10 +1108,10 @@
this.counterWindowSize = size;
}
-
-
- // Returns {@code true} if the DB is closed. This method assumes that either
- // the db read/write lock has been taken.
+ /**
+ * Returns {@code true} if the DB is closed. This method assumes that either
+ * the db read/write lock has been taken.
+ */
private boolean isDBClosed()
{
return db == null;
--
Gitblit v1.10.0