From 5e495b3c867de5b83117834386859da67cbbedc5 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 13 Aug 2013 13:32:13 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java | 28 +++------
opends/src/server/org/opends/server/replication/server/MessageHandler.java | 29 ++++-----
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java | 4 +
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java | 60 ++++++++++++++++++++
opends/src/server/org/opends/server/replication/server/ReplicationBackend.java | 1
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java | 1
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 1
opends/src/server/org/opends/server/replication/server/DbHandler.java | 3
8 files changed, 91 insertions(+), 36 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 661a7ac..b5bae4e 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -42,6 +42,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
@@ -276,7 +277,7 @@
{
flush();
}
- return new ReplicationIterator(db, changeNumber, this);
+ return new JEReplicationIterator(db, changeNumber, this);
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java b/opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java
similarity index 84%
rename from opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
rename to opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java
index 82b201f..16e6243 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
+++ b/opends/src/server/org/opends/server/replication/server/JEReplicationIterator.java
@@ -32,12 +32,12 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ReplicationIterator;
/**
- * This class allows to iterate through the changes received from a given
- * LDAP Server Identifier.
+ * Berkeley DB JE implementation of IReplicationIterator.
*/
-public class ReplicationIterator
+public class JEReplicationIterator implements ReplicationIterator
{
private UpdateMsg currentChange = null;
private ReplServerDBCursor cursor = null;
@@ -55,7 +55,7 @@
* @param dbHandler The associated DbHandler.
* @throws ChangelogException if a database problem happened.
*/
- public ReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
+ public JEReplicationIterator(ReplicationDB db, ChangeNumber changeNumber,
DbHandler dbHandler) throws ChangelogException
{
this.db = db;
@@ -86,20 +86,15 @@
}
}
- /**
- * Get the UpdateMsg where the iterator is currently set.
- * @return The UpdateMsg where the iterator is currently set.
- */
+ /** {@inheritDoc} */
+ @Override
public UpdateMsg getChange()
{
return currentChange;
}
- /**
- * Go to the next change in the ReplicationDB or in the server Queue.
- * @return false if the iterator is already on the last change before
- * this call.
- */
+ /** {@inheritDoc} */
+ @Override
public boolean next()
{
currentChange = cursor.next();
@@ -136,11 +131,8 @@
return currentChange != null;
}
- /**
- * Release the resources and locks used by this Iterator.
- * This method must be called when the iterator is no longer used.
- * Failure to do it could cause DB deadlock.
- */
+ /** {@inheritDoc} */
+ @Override
public void releaseCursor()
{
synchronized (this)
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 626c08b..ed652c7 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -27,9 +27,6 @@
*/
package org.opends.server.replication.server;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
@@ -45,8 +42,12 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+
/**
* This class implements a buffering/producer/consumer mechanism of
* replication changes (UpdateMsg) used inside the replication server.
@@ -281,17 +282,15 @@
* load this change on the delayList
*
*/
- ReplicationIteratorComparator comparator =
- new ReplicationIteratorComparator();
SortedSet<ReplicationIterator> iteratorSortedSet =
- new TreeSet<ReplicationIterator>(comparator);
+ new TreeSet<ReplicationIterator>(
+ new ReplicationIteratorComparator());
try
{
/* fill the lateQueue */
for (int serverId : replicationServerDomain.getServers())
{
- ChangeNumber lastCsn = serverState
- .getChangeNumber(serverId);
+ ChangeNumber lastCsn = serverState.getChangeNumber(serverId);
ReplicationIterator iterator = replicationServerDomain
.getChangelogIterator(serverId, lastCsn);
if (iterator != null)
@@ -318,8 +317,7 @@
&& (lateQueue.count() < 100)
&& (lateQueue.bytesCount() < 50000))
{
- ReplicationIterator iterator = iteratorSortedSet
- .first();
+ ReplicationIterator iterator = iteratorSortedSet.first();
iteratorSortedSet.remove(iterator);
lateQueue.add(iterator.getChange());
if (iterator.next())
@@ -376,7 +374,7 @@
{
msg1 = msgQueue.removeFirst();
} while (!msg.getChangeNumber().equals(msg1.getChangeNumber()));
- this.updateServerState(msg);
+ updateServerState(msg);
return msg1;
}
}
@@ -388,7 +386,7 @@
{
msg = lateQueue.removeFirst();
}
- this.updateServerState(msg);
+ updateServerState(msg);
return msg;
}
}
@@ -412,7 +410,7 @@
}
msg = msgQueue.removeFirst();
- if (this.updateServerState(msg))
+ if (updateServerState(msg))
{
/*
* Only push the message if it has not yet been seen
@@ -462,10 +460,9 @@
there. So let's take the last change not sent directly from
the db.
*/
- ReplicationIteratorComparator comparator =
- new ReplicationIteratorComparator();
SortedSet<ReplicationIterator> iteratorSortedSet =
- new TreeSet<ReplicationIterator>(comparator);
+ new TreeSet<ReplicationIterator>(
+ new ReplicationIteratorComparator());
try
{
// Build a list of candidates iterator (i.e. db i.e. server)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index 0e31927..f1b4c56 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -72,6 +72,7 @@
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.AttributeType;
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 947fcbf..13c3b20 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -47,6 +47,7 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.opends.server.types.*;
import org.opends.server.util.TimeThread;
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java
new file mode 100644
index 0000000..b44c16c
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIterator.java
@@ -0,0 +1,60 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2013 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.api;
+
+import org.opends.server.replication.protocol.UpdateMsg;
+
+/**
+ * This interface allows to iterate through the changes received from a given
+ * LDAP Server Identifier.
+ */
+public interface ReplicationIterator
+{
+
+ /**
+ * Get the UpdateMsg where the iterator is currently set.
+ *
+ * @return The UpdateMsg where the iterator is currently set.
+ */
+ UpdateMsg getChange();
+
+ /**
+ * Go to the next change in the ReplicationDB or in the server Queue.
+ *
+ * @return false if the iterator is already on the last change before this
+ * call.
+ */
+ boolean next();
+
+ /**
+ * Release the resources and locks used by this Iterator. This method must be
+ * called when the iterator is no longer used. Failure to do it could cause DB
+ * deadlock.
+ */
+ void releaseCursor();
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationIteratorComparator.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java
similarity index 93%
rename from opends/src/server/org/opends/server/replication/server/ReplicationIteratorComparator.java
rename to opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java
index b805e30..e02d810 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationIteratorComparator.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationIteratorComparator.java
@@ -23,8 +23,9 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS
*/
-package org.opends.server.replication.server;
+package org.opends.server.replication.server.changelog.api;
import java.util.Comparator;
@@ -45,6 +46,7 @@
* @param o2 second ReplicationIterator.
* @return result of the comparison.
*/
+ @Override
public int compare(ReplicationIterator o1, ReplicationIterator o2)
{
ChangeNumber csn1 = o1.getChange().getChangeNumber();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
index 5da4f99..88001a7 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/DbHandlerTest.java
@@ -42,6 +42,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.server.changelog.api.ReplicationIterator;
import org.testng.annotations.Test;
/**
--
Gitblit v1.10.0