mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
25.26.2014 881c7d3d5c53debdd4a1bb5f63146f0f73d56957
OPENDJ-1441 (CR-4082) Persistent searches on external changelog do not return changes for new replicas and new domains

Created ECLMultiDomainDBCursor to only let ECL enabled domains reach the ChangeNumberIndexer.

ChangeNumberIndexer.java:
Wrapped the MultiDomainDBCursor into an ECLMultiDomainDBCursor.

ECLMultiDomainDBCursor.java: ADDED

JEChangelogDB.java, FileChangelogDB.java:
Removed unnecessary calls to MultimasterREplication.isECLEnabledDomain().
3 files added
4 files modified
440 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 15 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 54 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java 55 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java 113 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 16 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 17 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursorTest.java 170 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -41,7 +41,6 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
@@ -230,8 +229,7 @@
    }
    // unlucky, the domainMap does not exist: take the hit and create the
    // newValue, even though the same could be done concurrently by another
    // thread
    // newValue, even though the same could be done concurrently by another thread
    final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<Integer, FileReplicaDB>();
    final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
    if (previousValue != null)
@@ -240,15 +238,10 @@
      return previousValue;
    }
    // When called at replication startup, the isECLEnabledDomain() method blocks on STARTING state.
    // Checking cursors list ensure that it is never called in the startup case.
    if (!registeredMultiDomainCursors.isEmpty() && MultimasterReplication.isECLEnabledDomain(baseDN))
    // we just created a new domain => update all cursors
    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
    {
      // we just created a new domain => update all cursors
      for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
      {
        cursor.addDomain(baseDN, null);
      }
      cursor.addDomain(baseDN, null);
    }
    return newValue;
  }
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -27,6 +27,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.Map.Entry;
@@ -39,7 +40,6 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -47,7 +47,6 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -73,6 +72,7 @@
  private final ChangelogDB changelogDB;
  /** Only used for initialization, and then discarded. */
  private ChangelogState changelogState;
  private final ECLEnabledDomainPredicate predicate;
  /*
   * The following MultiDomainServerState fields must be thread safe, because
@@ -121,7 +121,7 @@
   *
   * @NonNull
   */
  private MultiDomainDBCursor nextChangeForInsertDBCursor;
  private ECLMultiDomainDBCursor nextChangeForInsertDBCursor;
  /**
   * Builds a ChangeNumberIndexer object.
@@ -133,9 +133,26 @@
   */
  public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
  {
    this(changelogDB, changelogState, new ECLEnabledDomainPredicate());
  }
  /**
   * Builds a ChangeNumberIndexer object.
   *
   * @param changelogDB
   *          the changelogDB
   * @param changelogState
   *          the changelog state used for initialization
   * @param predicate
   *          tells whether a domain is enabled for the external changelog
   */
  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState,
      ECLEnabledDomainPredicate predicate)
  {
    super("Change number indexer");
    this.changelogDB = changelogDB;
    this.changelogState = changelogState;
    this.predicate = predicate;
  }
  /**
@@ -148,7 +165,7 @@
   */
  public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
  {
    if (!isECLEnabledDomain(baseDN))
    if (!predicate.isECLEnabledDomain(baseDN))
    {
      return;
    }
@@ -186,7 +203,7 @@
  public void publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
      throws ChangelogException
  {
    if (!isECLEnabledDomain(baseDN))
    if (!predicate.isECLEnabledDomain(baseDN))
    {
      return;
    }
@@ -197,24 +214,6 @@
  }
  /**
   * Returns whether the provided baseDN represents a replication domain enabled
   * for the external changelog.
   * <p>
   * This method is a test seam that break the dependency on a static method.
   *
   * @param baseDN
   *          the replication domain to check
   * @return true if the provided baseDN is enabled for the external changelog,
   *         false if the provided baseDN is disabled for the external changelog
   *         or unknown to multimaster replication.
   * @see MultimasterReplication#isECLEnabledDomain(DN)
   */
  protected boolean isECLEnabledDomain(DN baseDN)
  {
    return MultimasterReplication.isECLEnabledDomain(baseDN);
  }
  /**
   * Signals a replica went offline.
   *
   * @param baseDN
@@ -224,7 +223,7 @@
   */
  public void replicaOffline(DN baseDN, CSN offlineCSN)
  {
    if (!isECLEnabledDomain(baseDN))
    if (!predicate.isECLEnabledDomain(baseDN))
    {
      return;
    }
@@ -337,7 +336,7 @@
    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
      if (isECLEnabledDomain(baseDN))
      if (predicate.isECLEnabledDomain(baseDN))
      {
        for (Integer serverId : entry.getValue())
        {
@@ -353,7 +352,8 @@
      }
    }
    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY);
    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate,
        domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY));
    nextChangeForInsertDBCursor.next();
    if (newestRecord != null)
@@ -384,7 +384,7 @@
    {
      for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
      {
        if (isECLEnabledDomain(baseDN))
        if (predicate.isECLEnabledDomain(baseDN))
        {
          replicasOffline.update(baseDN, offlineCSN);
          // a replica offline message could also be the very last time
opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
New file
@@ -0,0 +1,55 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.types.DN;
/**
 * Returns whether a domain is enabled for the external changelog.
 *
 * @FunctionalInterface
 */
class ECLEnabledDomainPredicate
{
  /**
   * Returns whether the provided baseDN represents a replication domain enabled
   * for the external changelog.
   * <p>
   * This method is a test seam that break the dependency on a static method.
   *
   * @param baseDN
   *          the replication domain to check
   * @return true if the provided baseDN is enabled for the external changelog,
   *         false if the provided baseDN is disabled for the external changelog
   *         or unknown to multimaster replication.
   * @see MultimasterReplication#isECLEnabledDomain(DN)
   */
  public boolean isECLEnabledDomain(DN baseDN)
  {
    return MultimasterReplication.isECLEnabledDomain(baseDN);
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
New file
@@ -0,0 +1,113 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
/**
 * Multi domain DB cursor that only returns updates for the domains which have
 * been enabled for the external changelog.
 */
class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
{
  private final ECLEnabledDomainPredicate predicate;
  private final MultiDomainDBCursor cursor;
  /**
   * Builds an instance of this class filtering updates from the provided cursor.
   *
   * @param predicate
   *          tells whether a domain is enabled for the external changelog
   * @param cursor
   *          the cursor whose updates will be filtered
   */
  public ECLMultiDomainDBCursor(ECLEnabledDomainPredicate predicate, MultiDomainDBCursor cursor)
  {
    this.predicate = predicate;
    this.cursor = cursor;
  }
  /** {@inheritDoc} */
  @Override
  public UpdateMsg getRecord()
  {
    return cursor.getRecord();
  }
  /**
   * Returns the data associated to the cursor that returned the current record.
   *
   * @return the data associated to the cursor that returned the current record.
   */
  public DN getData()
  {
    return cursor.getData();
  }
  /**
   * Removes a replication domain from this cursor and stops iterating over it.
   * Removed cursors will be effectively removed on the next call to
   * {@link #next()}.
   *
   * @param baseDN
   *          the replication domain's baseDN
   */
  public void removeDomain(DN baseDN)
  {
    cursor.removeDomain(baseDN);
  }
  /** {@inheritDoc} */
  @Override
  public boolean next() throws ChangelogException
  {
    // discard updates from non ECL enabled domains
    boolean hasNext;
    do
    {
      hasNext = cursor.next();
    }
    while (hasNext && !predicate.isECLEnabledDomain(cursor.getData()));
    return hasNext;
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    cursor.close();
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " cursor=[" + cursor + ']';
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -42,7 +42,6 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
@@ -58,7 +57,6 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -256,8 +254,7 @@
    }
    // unlucky, the domainMap does not exist: take the hit and create the
    // newValue, even though the same could be done concurrently by another
    // thread
    // newValue, even though the same could be done concurrently by another thread
    final ConcurrentMap<Integer, JEReplicaDB> newValue = new ConcurrentHashMap<Integer, JEReplicaDB>();
    final ConcurrentMap<Integer, JEReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
    if (previousValue != null)
@@ -266,15 +263,10 @@
      return previousValue;
    }
    // When called at replication startup, the isECLEnabledDomain() method blocks on STARTING state.
    // Checking cursors list ensure that it is never called in the startup case.
    if (!registeredMultiDomainCursors.isEmpty() && MultimasterReplication.isECLEnabledDomain(baseDN))
    // we just created a new domain => update all cursors
    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
    {
      // we just created a new domain => update all cursors
      for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
      {
        cursor.addDomain(baseDN, null);
      }
      cursor.addDomain(baseDN, null);
    }
    return newValue;
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -44,7 +44,6 @@
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.testng.annotations.*;
@@ -136,6 +135,7 @@
  private Map<DN, DomainDBCursor> domainDBCursors;
  private ChangelogState initialState;
  private Map<DN, ServerState> domainNewestCSNs;
  private ECLEnabledDomainPredicate predicate;
  private ChangeNumberIndexer cnIndexer;
  private MultiDomainServerState initialCookie;
@@ -592,7 +592,7 @@
    final SequentialDBCursor replicaDBCursor = new SequentialDBCursor();
    replicaDBCursors.put(Pair.of(baseDN, serverId), replicaDBCursor);
    if (isECLEnabledDomain2(baseDN))
    if (predicate.isECLEnabledDomain(baseDN))
    {
      DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
      if (domainDBCursor == null)
@@ -627,24 +627,19 @@
  private void startCNIndexer()
  {
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
    predicate = new ECLEnabledDomainPredicate()
    {
      @Override
      protected boolean isECLEnabledDomain(DN baseDN)
      public boolean isECLEnabledDomain(DN baseDN)
      {
        return isECLEnabledDomain2(baseDN);
        return eclEnabledDomains.contains(baseDN);
      }
    };
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState, predicate);
    cnIndexer.start();
    waitForWaitingState(cnIndexer);
  }
  private boolean isECLEnabledDomain2(DN baseDN)
  {
    return eclEnabledDomains.contains(baseDN);
  }
  private void stopCNIndexer() throws Exception
  {
    if (cnIndexer != null)
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursorTest.java
New file
@@ -0,0 +1,170 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
import java.util.HashSet;
import java.util.Set;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
@SuppressWarnings("javadoc")
public class ECLMultiDomainDBCursorTest extends DirectoryServerTestCase
{
  @Mock
  private ReplicationDomainDB domainDB;
  private MultiDomainDBCursor multiDomainCursor;
  private ECLMultiDomainDBCursor eclCursor;
  private final Set<DN> eclEnabledDomains = new HashSet<DN>();
  private ECLEnabledDomainPredicate predicate = new ECLEnabledDomainPredicate()
  {
    @Override
    public boolean isECLEnabledDomain(DN baseDN)
    {
      return eclEnabledDomains.contains(baseDN);
    }
  };
  @BeforeMethod
  public void setup() throws Exception
  {
    TestCaseUtils.startFakeServer();
    MockitoAnnotations.initMocks(this);
    multiDomainCursor = new MultiDomainDBCursor(domainDB, ON_MATCHING_KEY);
    eclCursor = new ECLMultiDomainDBCursor(predicate, multiDomainCursor);
  }
  @AfterMethod
  public void teardown() throws Exception
  {
    TestCaseUtils.shutdownFakeServer();
    domainDB = null;
    multiDomainCursor = null;
    eclCursor.close();
    eclCursor = null;
    eclEnabledDomains.clear();
  }
  @Test
  public void testEmptyCursor() throws Exception
  {
    assertEmpty();
  }
  @Test
  public void testECLDisabledDomainWithCursor() throws Exception
  {
    final DN baseDN = DN.decode("dc=example,dc=com");
    final UpdateMsg msg1 = new FakeUpdateMsg(1);
    addDomainCursorToCursor(baseDN, new SequentialDBCursor(msg1));
    assertEmpty();
  }
  @Test
  public void testECLEnabledDomainWithCursor() throws Exception
  {
    final DN baseDN = DN.decode("dc=example,dc=com");
    eclEnabledDomains.add(baseDN);
    final UpdateMsg msg1 = new FakeUpdateMsg(1);
    addDomainCursorToCursor(baseDN, new SequentialDBCursor(msg1));
    assertSingleMessage(baseDN, msg1);
  }
  @Test(dependsOnMethods = { "testECLEnabledDomainWithCursor", "testECLDisabledDomainWithCursor" })
  public void testECLEnabledAndDisabledDomainCursors() throws Exception
  {
    final DN baseDN1 = DN.decode("dc=example,dc=com");
    final DN baseDN2 = DN.decode("cn=admin data");
    eclEnabledDomains.add(baseDN1);
    final UpdateMsg msg1 = new FakeUpdateMsg(1);
    final UpdateMsg msg2 = new FakeUpdateMsg(2);
    final UpdateMsg msg3 = new FakeUpdateMsg(3);
    final UpdateMsg msg4 = new FakeUpdateMsg(4);
    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg1, msg4));
    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg2, msg3));
    assertMessagesInOrder(baseDN1, msg1, msg4);
  }
  private void assertEmpty() throws Exception
  {
    assertMessagesInOrder(null, null, null);
  }
  private void assertSingleMessage(DN baseDN, UpdateMsg msg1) throws Exception
  {
    assertMessagesInOrder(baseDN, msg1, null);
  }
  private void assertMessagesInOrder(DN baseDN, UpdateMsg msg1, UpdateMsg msg2) throws Exception
  {
    assertThat(eclCursor.getRecord()).isNull();
    assertThat(eclCursor.getData()).isNull();
    if (msg1 != null)
    {
      assertThat(eclCursor.next()).isTrue();
      assertThat(eclCursor.getRecord()).isEqualTo(msg1);
      assertThat(eclCursor.getData()).isEqualTo(baseDN);
    }
    if (msg2 != null)
    {
      assertThat(eclCursor.next()).isTrue();
      assertThat(eclCursor.getRecord()).isEqualTo(msg2);
      assertThat(eclCursor.getData()).isEqualTo(baseDN);
    }
    assertThat(eclCursor.next()).isFalse();
    assertThat(eclCursor.getRecord()).isNull();
    assertThat(eclCursor.getData()).isNull();
  }
  private void addDomainCursorToCursor(DN baseDN, SequentialDBCursor cursor) throws ChangelogException
  {
    final ServerState state = new ServerState();
    when(domainDB.getCursorFrom(baseDN, state, ON_MATCHING_KEY)).thenReturn(cursor);
    multiDomainCursor.addDomain(baseDN, state);
  }
}