mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Fabio Pistolesi
26.45.2015 ea9e4fc1b5d3c481a65c5929eb1b4fedfc04a0be
OPENDJ-2182 CR-7977 Change number indexer prevent server from shutting down

Additional fix to take care of domains being re-enabled.
ECL cursoring now maintains which domain has been removed from the cursor, so that the indexer can reinitialize the cursor when a domain becames re-enabled from the current state.
1 files added
8 files modified
197 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java 41 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java 37 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java 23 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 8 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java 11 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 8 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java 9 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java 9 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java 51 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/api/ChangelogStateProvider.java
New file
@@ -0,0 +1,41 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2015 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.api;
import org.opends.server.replication.server.ChangelogState;
/**
 * Small interface for common Replication Environment operations.
 */
public interface ChangelogStateProvider
{
  /**
   * Returns the current state of the replication changelog.
   *
   * @return current in-memory {@link ChangelogState}.
   */
  ChangelogState getChangelogState();
}
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
@@ -37,13 +37,13 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
import org.opends.server.types.DN;
import static org.opends.messages.ReplicationMessages.*;
@@ -73,8 +73,7 @@
   */
  private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>();
  private final ChangelogDB changelogDB;
  /** Only used for initialization, and then discarded. */
  private ChangelogState changelogState;
  private final ChangelogStateProvider changelogStateProvider;
  private final ECLEnabledDomainPredicate predicate;
  /*
@@ -111,33 +110,30 @@
  /**
   * Builds a ChangeNumberIndexer object.
   *
   * @param changelogDB
   *  @param changelogDB
   *          the changelogDB
   * @param changelogState
   *          the changelog state used for initialization
   * @param changelogStateProvider
   *          the replication environment information for access to changelog state
   */
  public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
  public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider)
  {
    this(changelogDB, changelogState, new ECLEnabledDomainPredicate());
    this(changelogDB, changelogStateProvider, new ECLEnabledDomainPredicate());
  }
  /**
   * Builds a ChangeNumberIndexer object.
   *
   * @param changelogDB
   *          the changelogDB
   * @param changelogState
   * @param changelogStateProvider
   *          the changelog state used for initialization
   * @param predicate
   *          tells whether a domain is enabled for the external changelog
   */
  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState,
  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogStateProvider changelogStateProvider,
      ECLEnabledDomainPredicate predicate)
  {
    super("Change number indexer");
    this.changelogDB = changelogDB;
    this.changelogState = changelogState;
    this.changelogStateProvider = changelogStateProvider;
    this.predicate = predicate;
  }
@@ -310,9 +306,6 @@
    initializeLastAliveCSNs(domainDB);
    initializeNextChangeCursor(domainDB);
    initializeOfflineReplicas();
    // this will not be used any more. Discard for garbage collection.
    this.changelogState = null;
  }
  private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
@@ -331,7 +324,7 @@
  private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
  {
    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    for (Entry<DN, Set<Integer>> entry : changelogStateProvider.getChangelogState().getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
      if (predicate.isECLEnabledDomain(baseDN))
@@ -353,7 +346,7 @@
  private void initializeOfflineReplicas()
  {
    final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
    final MultiDomainServerState offlineReplicas = changelogStateProvider.getChangelogState().getOfflineReplicas();
    for (DN baseDN : offlineReplicas)
    {
      for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
@@ -409,7 +402,11 @@
            // once this domain's state has been cleared.
            domainsToClear.remove(baseDNToClear);
          }
          if (nextChangeForInsertDBCursor.shouldReInitialize())
          {
            nextChangeForInsertDBCursor.close();
            initialize();
          }
          // Do not call DBCursor.next() here
          // because we might not have consumed the last record,
          // for example if we could not move the MCP forward
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
@@ -29,6 +29,9 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
import java.util.ArrayList;
import java.util.List;
/**
 * Multi domain DB cursor that only returns updates for the domains which have
 * been enabled for the external changelog.
@@ -37,6 +40,7 @@
{
  private final ECLEnabledDomainPredicate predicate;
  private final MultiDomainDBCursor cursor;
  private final List<DN> eclDisabledDomains = new ArrayList<>();
  /**
   * Builds an instance of this class filtering updates from the provided cursor.
@@ -82,6 +86,24 @@
    cursor.removeDomain(baseDN);
  }
  /**
   * Returns whether the cursor should be reinitialized because a domain became re-enabled.
   *
   * @return whether the cursor should be reinitialized
   */
  public boolean shouldReInitialize()
  {
    for (DN domainDN : eclDisabledDomains)
    {
      if (predicate.isECLEnabledDomain(domainDN))
      {
        eclDisabledDomains.clear();
        return true;
      }
    }
    return false;
  }
  @Override
  public boolean next() throws ChangelogException
  {
@@ -94,6 +116,7 @@
    while (domain != null && !predicate.isECLEnabledDomain(domain))
    {
      cursor.removeDomain(domain);
      eclDisabledDomains.add(domain);
      domain = cursor.getData();
    }
    return domain != null;
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -297,7 +297,7 @@
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
        startIndexer(changelogState);
        startIndexer();
      }
      setPurgeDelay(replicationServer.getPurgeDelay());
    }
@@ -610,7 +610,7 @@
  {
    if (computeChangeNumber)
    {
      startIndexer(replicationEnv.getChangelogState());
      startIndexer();
    }
    else
    {
@@ -622,9 +622,9 @@
    }
  }
  private void startIndexer(final ChangelogState changelogState)
  private void startIndexer()
  {
    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv);
    if (cnIndexer.compareAndSet(null, indexer))
    {
      indexer.start();
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -59,6 +59,7 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
import org.opends.server.replication.server.changelog.file.Log.LogRotationParameters;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -124,7 +125,7 @@
 * |           \---head.log [contains last records written]
 * </pre>
 */
class ReplicationEnvironment
class ReplicationEnvironment implements ChangelogStateProvider
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
@@ -318,12 +319,8 @@
    return state;
  }
  /**
   * Returns the current state of the replication changelog.
   *
   * @return the current {@link ChangelogState}
   */
  ChangelogState getChangelogState()
  @Override
  public ChangelogState getChangelogState()
  {
    return changelogState;
  }
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -323,7 +323,7 @@
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
        startIndexer(changelogState);
        startIndexer();
      }
      setPurgeDelay(replicationServer.getPurgeDelay());
    }
@@ -652,7 +652,7 @@
  {
    if (computeChangeNumber)
    {
      startIndexer(replicationEnv.getChangelogState());
      startIndexer();
    }
    else
    {
@@ -664,9 +664,9 @@
    }
  }
  private void startIndexer(final ChangelogState changelogState)
  private void startIndexer()
  {
    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, replicationEnv);
    if (cnIndexer.compareAndSet(null, indexer))
    {
      indexer.start();
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -57,7 +58,7 @@
 * This class represents a DB environment that acts as a factory for
 * ReplicationDBs.
 */
public class ReplicationDbEnv
public class ReplicationDbEnv implements ChangelogStateProvider
{
  private Environment dbEnvironment;
  private Database changelogStateDb;
@@ -227,11 +228,7 @@
    return db;
  }
  /**
   * Return the current changelog state.
   *
   * @return the current {@link ChangelogState}
   */
  @Override
  public ChangelogState getChangelogState()
  {
    return changelogState;
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexerTest.java
@@ -49,10 +49,7 @@
import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions;
import org.opends.server.replication.server.changelog.api.ReplicaId;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.file.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.file.DomainDBCursor;
import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate;
import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor;
import org.opends.server.replication.server.changelog.api.ChangelogStateProvider;
import org.opends.server.types.DN;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -593,7 +590,9 @@
        return eclEnabledDomains.contains(baseDN);
      }
    };
    cnIndexer = new ChangeNumberIndexer(changelogDB, initialState, predicate)
    ChangelogStateProvider changeLogState = mock(ChangelogStateProvider.class);
    when(changeLogState.getChangelogState()).thenReturn(initialState);
    cnIndexer = new ChangeNumberIndexer(changelogDB, changeLogState, predicate)
    {
      /** {@inheritDoc} */
      @Override
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursorTest.java
@@ -125,43 +125,36 @@
    final DN baseDN1 = DN.valueOf("dc=example,dc=com");
    final DN baseDN2 = DN.valueOf("cn=admin data");
    eclEnabledDomains.add(baseDN1);
    final UpdateMsg msgs[] = newUpdateMsgs(13);
    // At least two updates in an enabled domain
    final UpdateMsg msg1 = new FakeUpdateMsg(1);
    final UpdateMsg msg2 = new FakeUpdateMsg(2);
    final UpdateMsg msg3 = new FakeUpdateMsg(3);
    final UpdateMsg msg4 = new FakeUpdateMsg(4);
    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg1, msg4));
    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg2, msg3));
    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[0], msgs[3]));
    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[1], msgs[2]));
    assertMessagesInOrder(baseDN1, msg1, msg4);
    assertMessagesInOrder(baseDN1, msgs[0], msgs[3]);
    assertEmpty();
    //Only one update in an enabled domain
    final UpdateMsg msg5 = new FakeUpdateMsg(5);
    final UpdateMsg msg6 = new FakeUpdateMsg(6);
    final UpdateMsg msg7 = new FakeUpdateMsg(7);
    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg5));
    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg6, msg7));
    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[4]));
    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[5], msgs[6]));
    assertMessagesInOrder(baseDN1, msg5, null);
    assertMessagesInOrder(baseDN1, msgs[4], null);
    assertEmpty();
    // Two disabled domains
    final DN baseDN3 = DN.valueOf("cn=schema");
    final UpdateMsg msg8 = new FakeUpdateMsg(8);
    final UpdateMsg msg9 = new FakeUpdateMsg(9);
    final UpdateMsg msg10 = new FakeUpdateMsg(10);
    final UpdateMsg msg11 = new FakeUpdateMsg(11);
    final UpdateMsg msg12 = new FakeUpdateMsg(12);
    final UpdateMsg msg13 = new FakeUpdateMsg(13);
    final DN baseDN3 = DN.valueOf("dc=example,dc=net");
    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msg8, msg10));
    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msg9, msg11));
    addDomainCursorToCursor(baseDN3, new SequentialDBCursor(msg12, msg13));
    addDomainCursorToCursor(baseDN1, new SequentialDBCursor(msgs[7], msgs[9]));
    addDomainCursorToCursor(baseDN2, new SequentialDBCursor(msgs[8], msgs[10]));
    addDomainCursorToCursor(baseDN3, new SequentialDBCursor(msgs[11], msgs[12]));
    assertMessagesInOrder(baseDN1, msg8, msg10);
    assertMessagesInOrder(baseDN1, msgs[7], msgs[9]);
    assertEmpty();
    // Test disable/enable domain tracking
    eclEnabledDomains.add(baseDN3);
    assertThat(eclCursor.shouldReInitialize()).isTrue();
    assertThat(eclCursor.shouldReInitialize()).isFalse();
  }
  private void assertEmpty() throws Exception
@@ -174,6 +167,16 @@
    assertMessagesInOrder(baseDN, msg1, null);
  }
  private UpdateMsg[] newUpdateMsgs(int num)
  {
    UpdateMsg[] results = new UpdateMsg[num];
    for (int i = 0; i < num; i++)
    {
      results[i] = new FakeUpdateMsg(i + 1);
    }
    return results;
  }
  private void assertMessagesInOrder(DN baseDN, UpdateMsg msg1, UpdateMsg msg2) throws Exception
  {
    assertThat(eclCursor.getRecord()).isNull();