From 12db845ee284503024cd2ebd62e6549d5cc42b77 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 20 Aug 2014 10:57:29 +0000
Subject: [PATCH] OPENDJ-1206 (CR-4261) Create a new ReplicationBackend/ChangelogBackend to support cn=changelog
---
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 162 +++++++++++++++++++++++++++++++-----------------------
1 files changed, 93 insertions(+), 69 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index 27ea1a6..abbe49c 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -26,7 +26,6 @@
*/
package org.opends.server.replication.server.changelog.je;
-import java.io.Closeable;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.locks.ReadWriteLock;
@@ -39,6 +38,8 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -244,15 +245,18 @@
private DatabaseEntry createReplicationKey(CSN csn)
{
- DatabaseEntry key = new DatabaseEntry();
- try
+ final DatabaseEntry key = new DatabaseEntry();
+ if (csn != null)
{
- key.setData(csn.toString().getBytes("UTF-8"));
- }
- catch (UnsupportedEncodingException e)
- {
- // Should never happens, UTF-8 is always supported
- // TODO : add better logging
+ try
+ {
+ key.setData(csn.toString().getBytes("UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // Should never happens, UTF-8 is always supported
+ // TODO : add better logging
+ }
}
return key;
}
@@ -285,13 +289,15 @@
* @param startCSN
* The CSN from which the cursor must start.If null, start from the
* oldest CSN
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @return The ReplServerDBCursor.
* @throws ChangelogException
* If a database problem happened
*/
- ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
+ ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
{
- return new ReplServerDBCursor(startCSN);
+ return new ReplServerDBCursor(startCSN, positionStrategy);
}
/**
@@ -445,7 +451,7 @@
* This Class implements a cursor that can be used to browse a
* replicationServer database.
*/
- class ReplServerDBCursor implements Closeable
+ class ReplServerDBCursor implements DBCursor<UpdateMsg>
{
/**
* The transaction that will protect the actions done with the cursor.
@@ -454,12 +460,14 @@
* <p>
* Will be set non null for a write cursor
*/
- private final Transaction txn;
private final Cursor cursor;
private final DatabaseEntry key;
private final DatabaseEntry data;
+ /** \@Null for read cursors, \@NotNull for deleting cursors. */
+ private final Transaction txn;
+ private UpdateMsg currentRecord;
- private boolean isClosed = false;
+ private boolean isClosed;
/**
* Creates a ReplServerDBCursor that can be used for browsing a
@@ -467,21 +475,15 @@
*
* @param startCSN
* The CSN from which the cursor must start.
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @throws ChangelogException
* When the startCSN does not exist.
*/
- private ReplServerDBCursor(CSN startCSN) throws ChangelogException
+ private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
{
- if (startCSN != null)
- {
- key = createReplicationKey(startCSN);
- }
- else
- {
- key = new DatabaseEntry();
- }
+ key = createReplicationKey(startCSN);
data = new DatabaseEntry();
-
txn = null;
// Take the lock. From now on, whatever error that happen in the life
@@ -515,18 +517,25 @@
return;
}
- // We can move close to the startCSN.
- // Let's create a cursor from that point.
- DatabaseEntry aKey = new DatabaseEntry();
- DatabaseEntry aData = new DatabaseEntry();
- if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS)
+ if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
{
- localCursor.close();
- localCursor = db.openCursor(txn, null);
+ // We can move close to the startCSN.
+ // Let's create a cursor from that point.
+ key.setData(null);
+ if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
+ {
+ localCursor.close();
+ localCursor = db.openCursor(txn, null);
+ }
}
}
cursor = localCursor;
cursorHeld = cursor != null;
+
+ if (key.getData() != null)
+ {
+ computeCurrentRecord();
+ }
}
catch (DatabaseException e)
{
@@ -604,6 +613,7 @@
return;
}
isClosed = true;
+ currentRecord = null;
}
closeAndReleaseReadLock(cursor);
@@ -658,6 +668,7 @@
return null;
}
+ currentRecord = null;
try
{
if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
@@ -672,60 +683,73 @@
}
}
- /**
- * Get the next UpdateMsg from this cursor.
- *
- * @return the next UpdateMsg.
- */
- UpdateMsg next()
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
{
if (isClosed)
{
- return null;
+ return false;
}
- UpdateMsg currentChange = null;
- while (currentChange == null)
+ currentRecord = null;
+ while (currentRecord == null)
{
try
{
if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
{
- return null;
+ return false;
}
}
catch (DatabaseException e)
{
- return null;
+ throw new ChangelogException(e);
}
-
- CSN csn = null;
- try
- {
- csn = toCSN(key.getData());
- if (isACounterRecord(csn))
- {
- continue;
- }
- currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
- data.getData(), ProtocolVersion.getCurrentVersion());
- }
- catch (Exception e)
- {
- /*
- * An error happening trying to convert the data from the
- * replicationServer database to an Update LocalizableMessage. This can only
- * happen if the database is corrupted. There is not much more that we
- * can do at this point except trying to continue with the next
- * record. In such case, it is therefore possible that we miss some
- * changes.
- * TODO : This should be handled by the repair functionality.
- */
- logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
- csn, e.getMessage());
- }
+ computeCurrentRecord();
}
- return currentChange;
+ return currentRecord != null;
+ }
+
+ private void computeCurrentRecord()
+ {
+ CSN csn = null;
+ try
+ {
+ csn = toCSN(key.getData());
+ if (isACounterRecord(csn))
+ {
+ return;
+ }
+ currentRecord = toRecord(data.getData());
+ }
+ catch (Exception e)
+ {
+ /*
+ * An error happening trying to convert the data from the
+ * replicationServer database to an Update Message. This can only
+ * happen if the database is corrupted. There is not much more that we
+ * can do at this point except trying to continue with the next
+ * record. In such case, it is therefore possible that we miss some
+ * changes.
+ * TODO : This should be handled by the repair functionality.
+ */
+ logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
+ csn, e.getMessage());
+ }
+ }
+
+ private UpdateMsg toRecord(final byte[] data) throws Exception
+ {
+ final short currentVersion = ProtocolVersion.getCurrentVersion();
+ return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return currentRecord;
}
/**
--
Gitblit v1.10.0