mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
11.04.2014 37fd83b785c0993b0e1f0fb93777943a9ec46d83
opendj3-server-dev/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -26,7 +26,10 @@
 */
package org.opends.server.replication.common;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
@@ -166,7 +169,31 @@
  }
  /**
   * Returns a snapshot of this object.
   *
   * @return an unmodifiable Map representing a snapshot of this object.
   */
  public Map<DN, List<CSN>> getSnapshot()
  {
    if (list.isEmpty())
    {
      return Collections.emptyMap();
    }
    final Map<DN, List<CSN>> map = new HashMap<DN, List<CSN>>();
    for (Entry<DN, ServerState> entry : list.entrySet())
    {
      final List<CSN> l = entry.getValue().getSnapshot();
      if (!l.isEmpty())
      {
        map.put(entry.getKey(), l);
      }
    }
    return Collections.unmodifiableMap(map);
  }
  /**
   * Returns a string representation of this object.
   *
   * @return The string representation.
   */
  @Override
opendj3-server-dev/src/server/org/opends/server/replication/common/ServerState.java
@@ -250,6 +250,20 @@
  }
  /**
   * Returns a snapshot of this object.
   *
   * @return an unmodifiable List representing a snapshot of this object.
   */
  public List<CSN> getSnapshot()
  {
    if (serverIdToCSN.isEmpty())
    {
      return Collections.emptyList();
    }
    return Collections.unmodifiableList(new ArrayList<CSN>(serverIdToCSN.values()));
  }
  /**
   * Return the text representation of ServerState.
   * @return the text representation of ServerState
   */
opendj3-server-dev/src/server/org/opends/server/replication/server/ChangelogState.java
@@ -25,12 +25,13 @@
 */
package org.opends.server.replication.server;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.types.DN;
/**
@@ -43,15 +44,16 @@
 * <p>
 * This class is used during replication initialization to decouple the code
 * that reads the changelogStateDB from the code that makes use of its data.
 *
 * @ThreadSafe
 */
public class ChangelogState
{
  private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>();
  private final Map<DN, List<Integer>> domainToServerIds =
      new HashMap<DN, List<Integer>>();
  private final Map<DN, List<CSN>> offlineReplicas =
      new HashMap<DN, List<CSN>>();
  private final ConcurrentSkipListMap<DN, Long> domainToGenerationId = new ConcurrentSkipListMap<DN, Long>();
  private final ConcurrentSkipListMap<DN, Set<Integer>> domainToServerIds =
      new ConcurrentSkipListMap<DN, Set<Integer>>();
  private final MultiDomainServerState offlineReplicas = new MultiDomainServerState();
  /**
   * Sets the generationId for the supplied replication domain.
@@ -76,11 +78,16 @@
   */
  public void addServerIdToDomain(int serverId, DN baseDN)
  {
    List<Integer> serverIds = domainToServerIds.get(baseDN);
    Set<Integer> serverIds = domainToServerIds.get(baseDN);
    if (serverIds == null)
    {
      serverIds = new LinkedList<Integer>();
      domainToServerIds.put(baseDN, serverIds);
      serverIds = new HashSet<Integer>();
      final Set<Integer> existingServerIds =
          domainToServerIds.putIfAbsent(baseDN, serverIds);
      if (existingServerIds != null)
      {
        serverIds = existingServerIds;
      }
    }
    serverIds.add(serverId);
  }
@@ -95,13 +102,25 @@
   */
  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
  {
    List<CSN> offlineCSNs = offlineReplicas.get(baseDN);
    if (offlineCSNs == null)
    offlineReplicas.update(baseDN, offlineCSN);
  }
  /**
   * Removes the following replica information from the offline list.
   *
   * @param baseDN
   *          the baseDN of the offline replica
   * @param serverId
   *          the serverId that is not offline anymore
   */
  public void removeOfflineReplica(DN baseDN, int serverId)
  {
    CSN csn;
    do
    {
      offlineCSNs = new LinkedList<CSN>();
      offlineReplicas.put(baseDN, offlineCSNs);
      csn = offlineReplicas.getCSN(baseDN, serverId);
    }
    offlineCSNs.add(offlineCSN);
    while (csn != null && !offlineReplicas.removeCSN(baseDN, csn));
  }
  /**
@@ -119,21 +138,51 @@
   *
   * @return a Map of domainBaseDN => List&lt;serverId&gt;.
   */
  public Map<DN, List<Integer>> getDomainToServerIds()
  public Map<DN, Set<Integer>> getDomainToServerIds()
  {
    return domainToServerIds;
  }
  /**
   * Returns the Map of domainBaseDN => List&lt;offlineCSN&gt;.
   * Returns the internal MultiDomainServerState for offline replicas.
   *
   * @return a Map of domainBaseDN => List&lt;offlineCSN&gt;.
   * @return the MultiDomainServerState for offline replicas.
   */
  public Map<DN, List<CSN>> getOfflineReplicas()
  public MultiDomainServerState getOfflineReplicas()
  {
    return offlineReplicas;
  }
  /**
   * Returns whether the current ChangelogState is equal to the provided
   * ChangelogState.
   * <p>
   * Note: Only use for tests!!<br>
   * This method should only be used by tests because it creates a lot of
   * intermediate objects which is not suitable for production.
   *
   * @param other
   *          the ChangelogState to compare with
   * @return true if the current ChangelogState is equal to the provided
   *         ChangelogState, false otherwise.
   */
  public boolean isEqualTo(ChangelogState other)
  {
    if (other == null)
    {
      return false;
    }
    if (this == other)
    {
      return true;
    }
    return domainToGenerationId.equals(other.domainToGenerationId)
        && domainToServerIds.equals(other.domainToServerIds)
        // Note: next line is not suitable for production
        // because it creates lots of Lists and Maps
        && offlineReplicas.getSnapshot().equals(other.offlineReplicas.getSnapshot());
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -25,8 +25,13 @@
 */
package org.opends.server.replication.server.changelog.je;
import java.util.*;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -356,8 +361,7 @@
    // initialize the DB cursor and the last seen updates
    // to ensure the medium consistency CSN can move forward
    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
    for (Entry<DN, List<Integer>> entry
        : changelogState.getDomainToServerIds().entrySet())
    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
      if (!isECLEnabledDomain(baseDN))
@@ -400,12 +404,10 @@
      nextChangeForInsertDBCursor.next();
    }
    for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas()
        .entrySet())
    final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
    for (DN baseDN : offlineReplicas)
    {
      final DN baseDN = entry.getKey();
      final List<CSN> offlineCSNs = entry.getValue();
      for (CSN offlineCSN : offlineCSNs)
      for (CSN offlineCSN : offlineReplicas.getServerState(baseDN))
      {
        if (isECLEnabledDomain(baseDN))
        {
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -38,6 +38,7 @@
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -310,7 +311,7 @@
    {
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      dbEnv = new ReplicationDbEnv(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = dbEnv.readChangelogState();
      final ChangelogState changelogState = dbEnv.getChangelogState();
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
@@ -333,7 +334,7 @@
    {
      replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
    }
    for (Map.Entry<DN, List<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      for (int serverId : entry.getValue())
      {
@@ -634,7 +635,7 @@
  {
    if (computeChangeNumber)
    {
      startIndexer(dbEnv.readChangelogState());
      startIndexer(dbEnv.getChangelogState());
    }
    else
    {
@@ -697,14 +698,14 @@
      throws ChangelogException
  {
    final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
    final ChangelogState state = dbEnv.readChangelogState();
    final MultiDomainServerState offlineReplicas = dbEnv.getChangelogState().getOfflineReplicas();
    final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
    for (int serverId : serverIds)
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
      final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
    }
    // recycle exhausted cursors,
@@ -712,13 +713,13 @@
    return new CompositeDBCursor<Void>(cursors, true);
  }
  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
  private CSN getOfflineCSN(final MultiDomainServerState offlineReplicas, DN baseDN, int serverId,
      ServerState startAfterServerState)
  {
    final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
    if (domain != null)
    final ServerState domainState = offlineReplicas.getServerState(baseDN);
    if (domainState != null)
    {
      for (CSN offlineCSN : domain)
      for (CSN offlineCSN : domainState)
      {
        if (serverId == offlineCSN.getServerId()
            && !startAfterServerState.cover(offlineCSN))
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -63,6 +63,16 @@
{
  private Environment dbEnvironment;
  private Database changelogStateDb;
  /**
   * The current changelogState. This is in-memory version of what is inside the
   * on-disk changelogStateDB. It improves performances in case the
   * changelogState is read often.
   *
   * @GuardedBy("stateLock")
   */
  private final ChangelogState changelogState;
  /** Exclusive lock to synchronize updates to in-memory and on-disk changelogState */
  private final Object stateLock = new Object();
  private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
  private ReplicationServer replicationServer;
  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
@@ -98,6 +108,7 @@
       * of all the servers that have been seen in the past.
       */
      changelogStateDb = openDatabase("changelogstate");
      changelogState = readOnDiskChangelogState();
    }
    catch (RuntimeException e)
    {
@@ -219,13 +230,23 @@
  }
  /**
   * Read and return the list of known servers from the database.
   * Return the current changelog state.
   *
   * @return the current {@link ChangelogState}
   */
  public ChangelogState getChangelogState()
  {
    return changelogState;
  }
  /**
   * Read and return the changelog state from the database.
   *
   * @return the {@link ChangelogState} read from the changelogState DB
   * @throws ChangelogException
   *           if a database problem occurs
   */
  public ChangelogState readChangelogState() throws ChangelogException
  protected ChangelogState readOnDiskChangelogState() throws ChangelogException
  {
    return decodeChangelogState(readWholeState());
  }
@@ -433,8 +454,13 @@
      // Opens the DB for the changes received from this server on this domain.
      final Database replicaDB = openDatabase(replicaEntry.getKey());
      putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
      putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
      synchronized (stateLock)
      {
        putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
        changelogState.addServerIdToDomain(serverId, baseDN);
        putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
        changelogState.setDomainGenerationId(baseDN, generationId);
      }
      return replicaDB;
    }
    catch (RuntimeException e)
@@ -619,9 +645,13 @@
   */
  public void clearGenerationId(DN baseDN) throws ChangelogException
  {
    final int unusedGenId = 0;
    deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
        "clearGenerationId(baseDN=" + baseDN + ")");
    synchronized (stateLock)
    {
      final int unusedGenId = 0;
      deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
          "clearGenerationId(baseDN=" + baseDN + ")");
      changelogState.setDomainGenerationId(baseDN, unusedGenId);
    }
  }
  /**
@@ -637,8 +667,12 @@
   */
  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
    synchronized (stateLock)
    {
      deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
          "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
      changelogState.setDomainGenerationId(baseDN, -1);
    }
  }
  private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
@@ -700,10 +734,14 @@
  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    // just overwrite any older entry as it is assumed a newly received offline
    // CSN is newer than the previous one
    putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
        "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
    synchronized (stateLock)
    {
      // just overwrite any older entry as it is assumed a newly received offline
      // CSN is newer than the previous one
      putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
          "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
      changelogState.addOfflineReplica(baseDN, offlineCSN);
    }
  }
  private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.HashSet;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
@@ -70,11 +71,16 @@
    }
    @Override
    protected Database openDatabase(String databaseName)
        throws ChangelogException, RuntimeException
    protected Database openDatabase(String databaseName) throws ChangelogException, RuntimeException
    {
      return null;
    }
    @Override
    protected ChangelogState readOnDiskChangelogState() throws ChangelogException
    {
      return new ChangelogState();
    }
  }
  @BeforeClass
@@ -129,8 +135,8 @@
        entry(baseDN, generationId));
    if (!replicas.isEmpty())
    {
      assertThat(state.getDomainToServerIds()).containsExactly(
          entry(baseDN, replicas));
      assertThat(state.getDomainToServerIds())
          .containsExactly(entry(baseDN, new HashSet<Integer>(replicas)));
    }
    else
    {
@@ -138,8 +144,8 @@
    }
    if (!offlineReplicas.isEmpty())
    {
      assertThat(state.getOfflineReplicas()).containsExactly(
          entry(baseDN, offlineReplicas));
      assertThat(state.getOfflineReplicas().getSnapshot())
          .containsExactly(entry(baseDN, offlineReplicas));
    }
    else
    {