From 355365ea3f95fdfec5cac95588044b91354096c8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 19 Aug 2014 11:11:46 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4303) Persistent searches on external changelog do not return changes for new replicas and new domains
---
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 35 +++++------------
opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java | 9 ----
opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java | 13 ------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java | 9 ----
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 45 +++++++++++-----------
5 files changed, 34 insertions(+), 77 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index eee6fe4..d70e7a1 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -25,11 +25,6 @@
*/
package org.opends.server.replication.server.changelog.je;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
-import static org.opends.server.util.StaticUtils.*;
-
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -50,6 +45,11 @@
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
+
/**
* Thread responsible for inserting replicated changes into the ChangeNumber
* Index DB (CNIndexDB for short). Only changes older than the medium
@@ -430,26 +430,13 @@
{
try
{
- if (!domainsToClear.isEmpty())
+ while (!domainsToClear.isEmpty())
{
- final DN cursorData = nextChangeForInsertDBCursor.getData();
- final boolean callNextOnCursor =
- cursorData != null && domainsToClear.contains(cursorData);
- while (!domainsToClear.isEmpty())
- {
- final DN baseDNToClear = domainsToClear.first();
- nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
- // Only release the waiting thread
- // once this domain's state has been cleared.
- domainsToClear.remove(baseDNToClear);
- }
-
- if (callNextOnCursor)
- {
- // The next change to consume comes from a domain to be removed.
- // Call DBCursor.next() to ensure this domain is removed
- nextChangeForInsertDBCursor.next();
- }
+ final DN baseDNToClear = domainsToClear.first();
+ nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
+ // Only release the waiting thread
+ // once this domain's state has been cleared.
+ domainsToClear.remove(baseDNToClear);
}
// Do not call DBCursor.next() here
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index ae54930..748619c 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -107,7 +107,6 @@
addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
}
- removeNoLongerNeededCursors();
incorporateNewCursors();
return !cursors.isEmpty();
}
@@ -127,31 +126,31 @@
}
}
- private void removeNoLongerNeededCursors()
+ /**
+ * Removes the cursor matching the provided data.
+ *
+ * @param dataToFind
+ * the data for which the cursor must be found and removed
+ */
+ protected void removeCursor(final Data dataToFind)
{
- for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
- {
- final Data dataToFind = iter.next();
- for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
- cursors.entrySet().iterator(); cursorIter.hasNext();)
- {
- final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
- if (dataToFind.equals(entry.getValue()))
- {
- entry.getKey().close();
- cursorIter.remove();
- }
- }
- iter.remove();
- }
+ removeCursor(this.cursors, dataToFind);
+ removeCursor(this.exhaustedCursors, dataToFind);
}
- /**
- * Returns an Iterator over the data associated to cursors that must be removed.
- *
- * @return an Iterator over the data associated to cursors that must be removed.
- */
- protected abstract Iterator<Data> removedCursorsIterator();
+ private void removeCursor(Map<DBCursor<UpdateMsg>, Data> cursors, Data dataToFind)
+ {
+ for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
+ cursors.entrySet().iterator(); cursorIter.hasNext();)
+ {
+ final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
+ if (dataToFind.equals(entry.getValue()))
+ {
+ entry.getKey().close();
+ cursorIter.remove();
+ }
+ }
+ }
/**
* Adds a cursor to this composite cursor. It first calls
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
index 26c40f7..780b19b 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -24,7 +24,6 @@
*/
package org.opends.server.replication.server.changelog.je;
-import java.util.Collections;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -118,14 +117,6 @@
/** {@inheritDoc} */
@Override
- @SuppressWarnings("unchecked")
- protected Iterator<Void> removedCursorsIterator()
- {
- return Collections.EMPTY_LIST.iterator(); // nothing to remove
- }
-
- /** {@inheritDoc} */
- @Override
public void close()
{
super.close();
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
index a7f067f..690e551 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -27,7 +27,6 @@
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -47,8 +46,6 @@
private final ConcurrentSkipListMap<DN, ServerState> newDomains =
new ConcurrentSkipListMap<DN, ServerState>();
- private final ConcurrentSkipListSet<DN> removeDomains =
- new ConcurrentSkipListSet<DN>();
private final PositionStrategy positionStrategy;
@@ -108,14 +105,7 @@
*/
public void removeDomain(DN baseDN)
{
- removeDomains.add(baseDN);
- }
-
- /** {@inheritDoc} */
- @Override
- protected Iterator<DN> removedCursorsIterator()
- {
- return removeDomains.iterator();
+ removeCursor(baseDN);
}
/** {@inheritDoc} */
@@ -125,7 +115,6 @@
super.close();
domainDB.unregisterCursor(this);
newDomains.clear();
- removeDomains.clear();
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index e5b022a..05eb5eb 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,9 +25,6 @@
*/
package org.opends.server.replication.server.changelog.je;
-import java.util.Collections;
-import java.util.Iterator;
-
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -51,12 +48,6 @@
protected void incorporateNewCursors() throws ChangelogException
{
}
-
- @Override
- protected Iterator<String> removedCursorsIterator()
- {
- return Collections.EMPTY_LIST.iterator();
- }
}
private UpdateMsg msg1;
--
Gitblit v1.10.0