From d6a26e0aea658bf89f950e2255484fdffe343e58 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 19 Aug 2014 15:43:59 +0000
Subject: [PATCH] OPENDJ-1206 (CR-4261) Create a new ReplicationBackend/ChangelogBackend to support cn=changelog
---
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 168 +++++++++++++++++++++++++++++++------------------------
1 files changed, 95 insertions(+), 73 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index afc94f0..c04ffef 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -26,12 +26,10 @@
*/
package org.opends.server.replication.server.changelog.je;
-import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ProtocolVersion;
@@ -40,6 +38,8 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -244,15 +244,18 @@
private DatabaseEntry createReplicationKey(CSN csn)
{
- DatabaseEntry key = new DatabaseEntry();
- try
+ final DatabaseEntry key = new DatabaseEntry();
+ if (csn != null)
{
- key.setData(csn.toString().getBytes("UTF-8"));
- }
- catch (UnsupportedEncodingException e)
- {
- // Should never happens, UTF-8 is always supported
- // TODO : add better logging
+ try
+ {
+ key.setData(csn.toString().getBytes("UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // Should never happens, UTF-8 is always supported
+ // TODO : add better logging
+ }
}
return key;
}
@@ -286,13 +289,15 @@
* @param startCSN
* The CSN from which the cursor must start.If null, start from the
* oldest CSN
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @return The ReplServerDBCursor.
* @throws ChangelogException
* If a database problem happened
*/
- ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
+ ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
{
- return new ReplServerDBCursor(startCSN);
+ return new ReplServerDBCursor(startCSN, positionStrategy);
}
/**
@@ -446,7 +451,7 @@
* This Class implements a cursor that can be used to browse a
* replicationServer database.
*/
- class ReplServerDBCursor implements Closeable
+ class ReplServerDBCursor implements DBCursor<UpdateMsg>
{
/**
* The transaction that will protect the actions done with the cursor.
@@ -455,12 +460,14 @@
* <p>
* Will be set non null for a write cursor
*/
- private final Transaction txn;
private final Cursor cursor;
private final DatabaseEntry key;
private final DatabaseEntry data;
+ /** \@Null for read cursors, \@NotNull for deleting cursors. */
+ private final Transaction txn;
+ private UpdateMsg currentRecord;
- private boolean isClosed = false;
+ private boolean isClosed;
/**
* Creates a ReplServerDBCursor that can be used for browsing a
@@ -468,21 +475,15 @@
*
* @param startCSN
* The CSN from which the cursor must start.
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @throws ChangelogException
* When the startCSN does not exist.
*/
- private ReplServerDBCursor(CSN startCSN) throws ChangelogException
+ private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
{
- if (startCSN != null)
- {
- key = createReplicationKey(startCSN);
- }
- else
- {
- key = new DatabaseEntry();
- }
+ key = createReplicationKey(startCSN);
data = new DatabaseEntry();
-
txn = null;
// Take the lock. From now on, whatever error that happen in the life
@@ -516,18 +517,25 @@
return;
}
- // We can move close to the startCSN.
- // Let's create a cursor from that point.
- DatabaseEntry aKey = new DatabaseEntry();
- DatabaseEntry aData = new DatabaseEntry();
- if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS)
+ if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
{
- localCursor.close();
- localCursor = db.openCursor(txn, null);
+ // We can move close to the startCSN.
+ // Let's create a cursor from that point.
+ key.setData(null);
+ if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
+ {
+ localCursor.close();
+ localCursor = db.openCursor(txn, null);
+ }
}
}
cursor = localCursor;
cursorHeld = cursor != null;
+
+ if (key.getData() != null)
+ {
+ computeCurrentRecord();
+ }
}
catch (DatabaseException e)
{
@@ -605,6 +613,7 @@
return;
}
isClosed = true;
+ currentRecord = null;
}
closeAndReleaseReadLock(cursor);
@@ -659,6 +668,7 @@
return null;
}
+ currentRecord = null;
try
{
if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
@@ -673,63 +683,75 @@
}
}
- /**
- * Get the next UpdateMsg from this cursor.
- *
- * @return the next UpdateMsg.
- */
- UpdateMsg next()
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
{
if (isClosed)
{
- return null;
+ return false;
}
- UpdateMsg currentChange = null;
- while (currentChange == null)
+ currentRecord = null;
+ while (currentRecord == null)
{
try
{
if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
{
- return null;
+ return false;
}
}
catch (DatabaseException e)
{
- return null;
+ throw new ChangelogException(e);
}
-
- CSN csn = null;
- try
- {
- csn = toCSN(key.getData());
- if (isACounterRecord(csn))
- {
- continue;
- }
- currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
- data.getData(), ProtocolVersion.getCurrentVersion());
- }
- catch (Exception e)
- {
- /*
- * An error happening trying to convert the data from the
- * replicationServer database to an Update Message. This can only
- * happen if the database is corrupted. There is not much more that we
- * can do at this point except trying to continue with the next
- * record. In such case, it is therefore possible that we miss some
- * changes.
- * TODO : This should be handled by the repair functionality.
- */
- Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD
- .get(replicationServer.getServerId(),
- (csn != null ? csn.toString() : ""),
- e.getMessage());
- logError(message);
- }
+ computeCurrentRecord();
}
- return currentChange;
+ return currentRecord != null;
+ }
+
+ private void computeCurrentRecord()
+ {
+ CSN csn = null;
+ try
+ {
+ csn = toCSN(key.getData());
+ if (isACounterRecord(csn))
+ {
+ return;
+ }
+ currentRecord = toRecord(data.getData());
+ }
+ catch (Exception e)
+ {
+ /*
+ * An error happening trying to convert the data from the
+ * replicationServer database to an Update Message. This can only
+ * happen if the database is corrupted. There is not much more that we
+ * can do at this point except trying to continue with the next
+ * record. In such case, it is therefore possible that we miss some
+ * changes.
+ * TODO : This should be handled by the repair functionality.
+ */
+ logError(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD.get(
+ replicationServer.getServerId(),
+ (csn != null ? csn.toString() : ""),
+ e.getMessage()));
+ }
+ }
+
+ private UpdateMsg toRecord(final byte[] data) throws Exception
+ {
+ final short currentVersion = ProtocolVersion.getCurrentVersion();
+ return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return currentRecord;
}
/**
--
Gitblit v1.10.0