mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
15.16.2014 64d7cf576026e5cfdfca8a6a51f363b2d14d26b4
OPENDJ-1541 (CR-4516) Persistent search on cn=changelog can return duplicates

Persistent searches are registered before initial search ends (which is correct).
Because a new change can be added to the changelog before the "initial search" phase is over, the "persistent search" phase can return this change before the "initial search" phase returns it later.

To avoid this problem, persistent searches is marked with an enum to mention which phase is being run. The phases are the following:
1. INITIAL: The "initial search" phase is running, the "persistent search" phase do not return any entry.
2. TRANSITIONING: The "initial search" phase has completed and blocks currently running "persistent search" phase while the former is verifying no new updates where persisted to the DB
3. PERSISTENT: The "initial search" phase is finished and completed the transition to the "persistent search" phase. The "persistent search" phase can return all entries.
For the change-number-based persistent searches, only the last changeNumber sent by the "initial search" phase is recorded. For cookie-based persistent searches, for each replica, the last CSN sent by the "initial search" phase is recorded.

Problem is that the transitioning phase has the potential to block the whole server if the client of the persistent search does not consume changes fast enough.
This will be addressed separately.


ChangelogBackend.java:
Added constants COOKIE_ATTACHMENT and ENTRY_SENDER_ATTACHMENT.
Added cookieBasedPersistentSearches and changeNumberBasedPersistentSearches fields.
Added SearchPhase enum.
Added CookieEntrySender, ChangeNumberEntrySender and SendEntryData static inner classes + made several methods static to call them from these classes.
In initialSearchFromCookie(), initialSearchFromChangeNumber(), notifyEntryAdded() and registerPersistentSearch(), set or retrieved attachments + used entrySender.
Extracted methods sendCookieEntriesFromCursor(), sendChangeNumberEntriesFromCursors().
Added initializeAttachements().
Split notifyEntryAdded() in two: notifyCookieEntryAdded() and notifyChangeNumberEntryAdded().
3 files modified
34 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 16 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 16 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -26,7 +26,13 @@
package org.opends.server.replication.server.changelog.file;
import java.io.File;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -46,8 +52,12 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.je.DomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
@@ -790,7 +800,7 @@
    final FileReplicaDB replicaDB = pair.getFirst();
    replicaDB.add(updateMsg);
    ChangelogBackend.getInstance().notifyEntryAdded(baseDN, 0, null, updateMsg);
    ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -543,7 +543,7 @@
  protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber,
      String cookieString, UpdateMsg msg) throws ChangelogException
  {
    ChangelogBackend.getInstance().notifyEntryAdded(baseDN, changeNumber, cookieString, msg);
    ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookieString, msg);
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -26,7 +26,13 @@
package org.opends.server.replication.server.changelog.je;
import java.io.File;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -46,8 +52,12 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
@@ -839,7 +849,7 @@
    final JEReplicaDB replicaDB = pair.getFirst();
    replicaDB.add(updateMsg);
    ChangelogBackend.getInstance().notifyEntryAdded(baseDN, 0, null, updateMsg);
    ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)