From 053bd871061fe5a094c1c2c6bea65811c038c622 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 06 Jun 2014 14:19:37 +0000
Subject: [PATCH] (CR-3599) Convert all protocols message to use ByteArrayBuilder + ByteArrayScanner
---
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 137 +++++++++------------------------------------
1 files changed, 27 insertions(+), 110 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index 0200812..27ea1a6 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -33,6 +33,8 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
@@ -54,17 +56,16 @@
* <p>
* This is the only class that should have code using the BDB interfaces.
*/
-public class ReplicationDB
+class ReplicationDB
{
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
-
private Database db;
- private ReplicationDbEnv dbEnv;
- private ReplicationServer replicationServer;
- private int serverId;
- private DN baseDN;
+ private final ReplicationDbEnv dbEnv;
+ private final ReplicationServer replicationServer;
+ private final int serverId;
+ private final DN baseDN;
/**
* The lock used to provide exclusive access to the thread that close the db
@@ -121,7 +122,7 @@
* @param dbEnv The Db environment to use to create the db.
* @throws ChangelogException If a database problem happened
*/
- public ReplicationDB(int serverId, DN baseDN,
+ ReplicationDB(int serverId, DN baseDN,
ReplicationServer replicationServer, ReplicationDbEnv dbEnv)
throws ChangelogException
{
@@ -189,7 +190,7 @@
* @throws ChangelogException
* If a database problem happened
*/
- public void addEntry(UpdateMsg change) throws ChangelogException
+ void addEntry(UpdateMsg change) throws ChangelogException
{
dbCloseLock.readLock().lock();
try
@@ -201,7 +202,9 @@
}
final DatabaseEntry key = createReplicationKey(change.getCSN());
- final DatabaseEntry data = new ReplicationData(change);
+ // Always keep messages in the replication DB with the current protocol
+ // version
+ final DatabaseEntry data = new DatabaseEntry(change.getBytes());
insertCounterRecordIfNeeded(change.getCSN());
db.put(null, key, data);
@@ -257,7 +260,7 @@
/**
* Shutdown the database.
*/
- public void shutdown()
+ void shutdown()
{
dbCloseLock.writeLock().lock();
try
@@ -286,8 +289,7 @@
* @throws ChangelogException
* If a database problem happened
*/
- public ReplServerDBCursor openReadCursor(CSN startCSN)
- throws ChangelogException
+ ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
{
return new ReplServerDBCursor(startCSN);
}
@@ -301,7 +303,7 @@
*
* @return The ReplServerDBCursor.
*/
- public ReplServerDBCursor openDeleteCursor() throws ChangelogException
+ ReplServerDBCursor openDeleteCursor() throws ChangelogException
{
return new ReplServerDBCursor();
}
@@ -325,7 +327,7 @@
* @throws ChangelogException
* If a database problem happened
*/
- public CSN readOldestCSN() throws ChangelogException
+ CSN readOldestCSN() throws ChangelogException
{
dbCloseLock.readLock().lock();
@@ -381,7 +383,7 @@
* @throws ChangelogException
* If a database problem happened
*/
- public CSN readNewestCSN() throws ChangelogException
+ CSN readNewestCSN() throws ChangelogException
{
dbCloseLock.readLock().lock();
@@ -432,93 +434,7 @@
}
}
- /**
- * Try to find in the DB, the CSN right before the one passed as a parameter.
- *
- * @param csn
- * The CSN from which we start searching.
- * @return the CSN right before the one passed as a parameter. Can return null
- * if there is none.
- * @throws ChangelogException
- * If a database problem happened
- */
- public CSN getPreviousCSN(CSN csn) throws ChangelogException
- {
- if (csn == null)
- {
- return null;
- }
-
- dbCloseLock.readLock().lock();
-
- Cursor cursor = null;
- try
- {
- // If the DB has been closed then return immediately.
- if (isDBClosed())
- {
- return null;
- }
-
- DatabaseEntry key = createReplicationKey(csn);
- DatabaseEntry data = new DatabaseEntry();
- cursor = db.openCursor(null, null);
- if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == SUCCESS)
- {
- // We can move close to the CSN.
- // Let's move to the previous change.
- if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
- {
- return getRegularRecord(cursor, key, data);
- }
- // else, there was no change previous to our CSN.
- }
- else
- {
- // We could not move the cursor past to the CSN
- // Check if the last change is older than CSN
- if (cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS)
- {
- return getRegularRecord(cursor, key, data);
- }
- }
- }
- catch (DatabaseException e)
- {
- throw new ChangelogException(e);
- }
- finally
- {
- closeAndReleaseReadLock(cursor);
- }
- return null;
- }
-
- private CSN getRegularRecord(Cursor cursor, DatabaseEntry key,
- DatabaseEntry data) throws DatabaseException
- {
- final CSN csn = toCSN(key.getData());
- if (!isACounterRecord(csn))
- {
- return csn;
- }
-
- // There cannot be 2 counter record next to each other,
- // it is safe to return previous record which must exist
- if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
- {
- return toCSN(key.getData());
- }
-
- // database only contain a counter record, which should not be possible
- // let's just say no CSN
- return null;
- }
-
-
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
@@ -529,7 +445,7 @@
* This Class implements a cursor that can be used to browse a
* replicationServer database.
*/
- public class ReplServerDBCursor implements Closeable
+ class ReplServerDBCursor implements Closeable
{
/**
* The transaction that will protect the actions done with the cursor.
@@ -713,7 +629,7 @@
* (per the Cursor documentation).
* This should not be used in any other case.
*/
- public void abort()
+ void abort()
{
synchronized (this)
{
@@ -735,7 +651,7 @@
* @throws ChangelogException
* In case of underlying database problem.
*/
- public CSN nextCSN() throws ChangelogException
+ CSN nextCSN() throws ChangelogException
{
if (isClosed)
{
@@ -761,7 +677,7 @@
*
* @return the next UpdateMsg.
*/
- public UpdateMsg next()
+ UpdateMsg next()
{
if (isClosed)
{
@@ -791,7 +707,8 @@
{
continue;
}
- currentChange = ReplicationData.generateChange(data.getData());
+ currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
+ data.getData(), ProtocolVersion.getCurrentVersion());
}
catch (Exception e)
{
@@ -816,7 +733,7 @@
*
* @throws ChangelogException In case of database problem.
*/
- public void delete() throws ChangelogException
+ void delete() throws ChangelogException
{
if (isClosed)
{
@@ -839,7 +756,7 @@
*
* @throws ChangelogException In case of database problem.
*/
- public void clear() throws ChangelogException
+ void clear() throws ChangelogException
{
// The coming users will be blocked until the clear is done
dbCloseLock.writeLock().lock();
@@ -906,7 +823,7 @@
* Encode the provided counter value in a database entry.
* @return The database entry with the counter value encoded inside.
*/
- static private DatabaseEntry encodeCounterValue(int value)
+ private static DatabaseEntry encodeCounterValue(int value)
{
DatabaseEntry entry = new DatabaseEntry();
entry.setData(getBytes(String.valueOf(value)));
--
Gitblit v1.10.0