mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.08.2013 020a870af63f7407d3145feb74351bee3c2ce837
OPENDJ-1231 (CR-2724) Make the Medium Consistency Point support replica heartbeats and replicas shutting down


Replica heartbeats are now making the medium consistency point move forward.
This brings trunk implementation to be on par with OpenDJ 2.6.0.


ReplicationServerDomain.java:
In getEligibleCSN(), used ReplicationDomainDB.getDomainLastAliveCSNs().
Removed field ctHeartbeatState + methods getChangeTimeHeartbeatState() and storeReceivedCTHeartbeat(), all superseded by ChangeNumberIndexer.

ExternalChangeLogTest.java
Consequence of removing ReplicationServerDomain.getChangeTimeHeartbeatState().
Improved the code readability with waitOpResult() + moved calls to waitOpResult() inside searchOnChangelog().


ReplicationDomainDB.java, JEChangelogDB.java:
Added and implemented getDomainLastAliveCSNs().
In getDomainOldestCSNs() and getDomainNewestCSNs(), improved javadocs.

ChangeNumberIndexer.java:
Remain lastSeenUpdates field to lastAliveCSNs + improved javadoc.
Added getDomainLastAliveCSNs().


MultiDomainServerState.java
Added getServerState(DN).
6 files modified
196 ■■■■ changed files
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java 15 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 62 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 20 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 39 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 24 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 36 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -209,6 +209,19 @@
  }
  /**
   * Returns the ServerState associated to the provided replication domain's
   * baseDN.
   *
   * @param baseDN
   *          the replication domain's baseDN
   * @return the associated ServerState
   */
  public ServerState getServerState(DN baseDN)
  {
    return list.get(baseDN);
  }
  /**
   * Returns the CSN associated to the provided replication domain's baseDN and
   * serverId.
   *
@@ -216,7 +229,7 @@
   *          the replication domain's baseDN
   * @param serverId
   *          the serverId
   * @return the associated ServerState
   * @return the associated CSN
   */
  public CSN getCSN(DN baseDN, int serverId)
  {
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -167,8 +167,6 @@
   */
  private int assuredTimeoutTimerPurgeCounter = 0;
  private ServerState ctHeartbeatState;
  /**
   * Creates a new ReplicationServerDomain associated to the baseDN.
   *
@@ -2558,19 +2556,6 @@
  }
  /**
   * Return the state that contain for each server the time of eligibility.
   * @return the state.
   */
  public ServerState getChangeTimeHeartbeatState()
  {
    if (ctHeartbeatState == null)
    {
      ctHeartbeatState = getLatestServerState().duplicate();
    }
    return ctHeartbeatState;
  }
  /**
   * Returns the oldest known state for the domain, made of the oldest CSN
   * stored for each serverId.
   * <p>
@@ -2593,31 +2578,13 @@
   *
   * @return the eligible CSN.
   */
  public CSN getEligibleCSN()
  CSN getEligibleCSN()
  {
    CSN eligibleCSN = null;
    final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN);
    for (final CSN replicaNewestCSN : newestCSNs)
    for (final CSN lastAliveCSN : domainDB.getDomainLastAliveCSNs(baseDN))
    {
      // Should it be considered for eligibility ?
      int serverId = replicaNewestCSN.getServerId();
      CSN heartbeatLastCSN = getChangeTimeHeartbeatState().getCSN(serverId);
      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
      // then the domain is considered down and not considered for eligibility
      /*
      if ((heartbeatLastDN != null) &&
          (TimeThread.getTime()- heartbeatLastDN.getTime() > 5000))
      {
        if (debugEnabled())
          TRACER.debugInfo("In " + this.getName() +
            " Server " + sid
            + " is not considered for eligibility ... potentially down");
        continue;
      }
      */
      final int serverId = lastAliveCSN.getServerId();
      if (!isServerConnected(serverId))
      {
        if (debugEnabled())
@@ -2628,13 +2595,9 @@
        continue;
      }
      if (eligibleCSN == null || replicaNewestCSN.isNewerThan(eligibleCSN))
      if (eligibleCSN == null || lastAliveCSN.isNewerThan(eligibleCSN))
      {
        eligibleCSN = replicaNewestCSN;
      }
      if (heartbeatLastCSN != null && heartbeatLastCSN.isNewerThan(eligibleCSN))
      {
        eligibleCSN = heartbeatLastCSN;
        eligibleCSN = lastAliveCSN;
      }
    }
@@ -2671,7 +2634,7 @@
   * @param msg The message to process.
   */
  public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
      ChangeTimeHeartbeatMsg msg )
      ChangeTimeHeartbeatMsg msg)
  {
    try
    {
@@ -2689,7 +2652,7 @@
    try
    {
      storeReceivedCTHeartbeat(msg.getCSN());
      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
      if (senderHandler.isDataServer())
      {
        // If we are the first replication server warned,
@@ -2722,17 +2685,6 @@
  }
  /**
   * Store a change time value received from a data server.
   * @param csn The provided change time.
   */
  public void storeReceivedCTHeartbeat(CSN csn)
  {
    // TODO:Maybe we can spare processing by only storing CSN (timestamp)
    // instead of a server state.
    getChangeTimeHeartbeatState().update(csn);
  }
  /**
   * This methods count the changes, server by server :
   * - from a serverState start point
   * - to (inclusive) an end point (the provided endCSN).
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -47,8 +47,8 @@
  long getDomainChangesCount(DN baseDN);
  /**
   * Returns the oldest {@link CSN}s of each serverId for the specified
   * replication domain.
   * Returns the oldest {@link CSN}s from the replicaDBs for each serverId in
   * the specified replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
@@ -59,8 +59,8 @@
  ServerState getDomainOldestCSNs(DN baseDN);
  /**
   * Returns the newest {@link CSN}s of each serverId for the specified
   * replication domain.
   * Returns the newest {@link CSN}s from the replicaDBs for each serverId in
   * the specified replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
@@ -71,6 +71,18 @@
  ServerState getDomainNewestCSNs(DN baseDN);
  /**
   * Returns the last time each serverId was seen alive for the specified
   * replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return a new ServerState object holding the {serverId => CSN} Map. Can be
   *         null if the config that computes change numbers is set to false or
   *         if domain is not replicated.
   */
  ServerState getDomainLastAliveCSNs(DN baseDN);
  /**
   * Retrieves the latest trim date for the specified replication domain.
   * <p>
   * FIXME will be removed when ECLServerHandler will not be responsible anymore
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -101,12 +101,13 @@
  private volatile CSN mediumConsistencyCSN;
  /**
   * Holds the most recent changes or heartbeats received for each serverIds
   * cross domain. changes are stored in the replicaDBs and hence persistent,
   * heartbeats are transient because they are easily constructed on normal
   * operations.
   * Holds the last time each replica was seen alive, whether via updates or
   * heartbeats received. Data is held for each serverId cross domain.
   * <p>
   * Updates are persistent and stored in the replicaDBs, heartbeats are
   * transient and are easily constructed on normal operations.
   */
  private final MultiDomainServerState lastSeenUpdates =
  private final MultiDomainServerState lastAliveCSNs =
      new MultiDomainServerState();
  private final MultiDomainServerState replicasOffline =
      new MultiDomainServerState();
@@ -174,7 +175,7 @@
   */
  public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
  {
    lastSeenUpdates.update(baseDN, heartbeatCSN);
    lastAliveCSNs.update(baseDN, heartbeatCSN);
    tryNotify(baseDN);
  }
@@ -192,13 +193,27 @@
      throws ChangelogException
  {
    final CSN csn = updateMsg.getCSN();
    lastSeenUpdates.update(baseDN, csn);
    lastAliveCSNs.update(baseDN, csn);
    // only keep the oldest CSN that will be the new cursor's starting point
    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
    tryNotify(baseDN);
  }
  /**
   * Returns the last time each serverId was seen alive for the specified
   * replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return a new ServerState object holding the {serverId => CSN} Map. Can be
   *         null if domain is not replicated.
   */
  public ServerState getDomainLastAliveCSNs(DN baseDN)
  {
    return lastAliveCSNs.getServerState(baseDN);
  }
  /**
   * Signals a replica went offline.
   *
   * @param baseDN
@@ -208,7 +223,7 @@
   */
  public void replicaOffline(DN baseDN, CSN offlineCSN)
  {
    lastSeenUpdates.update(baseDN, offlineCSN);
    lastAliveCSNs.update(baseDN, offlineCSN);
    replicasOffline.update(baseDN, offlineCSN);
    tryNotify(baseDN);
  }
@@ -234,8 +249,8 @@
    if (mcCSN != null)
    {
      final int serverId = mcCSN.getServerId();
      final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId);
      return mcCSN.isOlderThan(lastSeenSameServerId);
      CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId);
      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
    }
    return true;
  }
@@ -269,7 +284,7 @@
      }
      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
      lastSeenUpdates.update(baseDN, latestKnownState);
      lastAliveCSNs.update(baseDN, latestKnownState);
    }
    resetNextChangeForInsertDBCursor();
@@ -491,7 +506,7 @@
    if (offlineCSN != null
        && offlineCSN.isOlderThan(mediumConsistencyCSN)
        // If no new updates has been seen for this replica
        && lastSeenUpdates.removeCSN(baseDN, offlineCSN))
        && lastAliveCSNs.removeCSN(baseDN, offlineCSN))
    {
      removeCursor(baseDN, csn);
      replicasOffline.removeCSN(baseDN, offlineCSN);
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -539,6 +539,23 @@
  /** {@inheritDoc} */
  @Override
  public ServerState getDomainLastAliveCSNs(DN baseDN)
  {
    final ChangeNumberIndexer indexer = this.cnIndexer.get();
    if (indexer != null)
    {
      final ServerState results = indexer.getDomainLastAliveCSNs(baseDN);
      if (results != null)
      {
        // return a copy to protect against concurrent modifications
        return results.duplicate();
      }
    }
    return null;
  }
  /** {@inheritDoc} */
  @Override
  public void removeDomain(DN baseDN) throws ChangelogException
  {
    // Remember the first exception because :
@@ -791,8 +808,11 @@
  @Override
  public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
  {
    // TODO implement this when the changelogDB will be responsible for
    // maintaining the medium consistency point
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.publishHeartbeat(baseDN, heartbeatCSN);
    }
  }
  /** {@inheritDoc} */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -552,9 +552,8 @@
    debugInfo(tn, "Starting test\n\n");
    // root entry returned
    final InternalSearchOperation op = searchOnChangelog(
        "(objectclass=*)", Collections.<String>emptySet(), createControls(""), 1, tn);
    waitOpResult(op, ResultCode.SUCCESS);
    searchOnChangelog("(objectclass=*)", Collections.<String> emptySet(), createControls(""),
        1, ResultCode.SUCCESS, tn);
    debugInfo(tn, "Ending test successfully");
  }
@@ -585,8 +584,7 @@
  /** Add an entry in the database */
  private void addEntry(Entry entry) throws Exception
  {
    AddOperation addOp = connection.processAdd(entry);
    waitOpResult(addOp, ResultCode.SUCCESS);
    waitOpResult(connection.processAdd(entry), ResultCode.SUCCESS);
    assertNotNull(getEntry(entry.getDN(), 1000, true));
  }
@@ -868,10 +866,8 @@
      throws Exception
  {
    debugInfo(testName, "Search with cookie=[" + cookie + "] filter=[" + filterString + "]");
    final InternalSearchOperation searchOp = searchOnChangelog(
        filterString, ALL_ATTRIBUTES, createControls(cookie), expectedNbEntries, testName);
    waitOpResult(searchOp, expectedResultCode);
    return searchOp;
    return searchOnChangelog(filterString, ALL_ATTRIBUTES, createControls(cookie),
        expectedNbEntries, expectedResultCode, testName);
  }
  private InternalSearchOperation searchOnChangelog(String filterString,
@@ -879,15 +875,13 @@
      throws Exception
  {
    debugInfo(testName, " Search: " + filterString);
    final InternalSearchOperation searchOp = searchOnChangelog(
        filterString, ALL_ATTRIBUTES, NO_CONTROL, expectedNbEntries, testName);
    waitOpResult(searchOp, expectedResultCode);
    return searchOp;
    return searchOnChangelog(filterString, ALL_ATTRIBUTES, NO_CONTROL,
        expectedNbEntries, expectedResultCode, testName);
  }
  private InternalSearchOperation searchOnChangelog(String filterString,
      Set<String> attributes, List<Control> controls, int expectedNbEntries,
      String testName) throws Exception
      ResultCode expectedResultCode, String testName) throws Exception
  {
    InternalSearchOperation op = null;
    int cnt = 0;
@@ -912,6 +906,7 @@
    final List<SearchResultEntry> entries = op.getSearchEntries();
    assertThat(entries).hasSize(expectedNbEntries);
    debugAndWriteEntries(getLDIFWriter(), entries, testName);
    waitOpResult(op, expectedResultCode);
    return op;
  }
@@ -2104,7 +2099,6 @@
      ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN);
      debugInfo(tn, rsd1.getBaseDN()
          + " LatestServerState=" + rsd1.getLatestServerState()
          + " ChangeTimeHeartBeatState=" + rsd1.getChangeTimeHeartbeatState()
          + " eligibleCSN=" + rsd1.getEligibleCSN()
          + " rs eligibleCSN=" + replicationServer.getEligibleCSN(null));
      // FIXME:ECL Enable this test by adding an assert on the right value
@@ -2112,7 +2106,6 @@
      ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN2);
      debugInfo(tn, rsd2.getBaseDN()
          + " LatestServerState=" + rsd2.getLatestServerState()
          + " ChangeTimeHeartBeatState=" + rsd2.getChangeTimeHeartbeatState()
          + " eligibleCSN=" + rsd2.getEligibleCSN()
          + " rs eligibleCSN=" + replicationServer.getEligibleCSN(null));
      // FIXME:ECL Enable this test by adding an assert on the right value
@@ -2882,13 +2875,10 @@
    }
    finally
    {
      final DeleteOperation delOp1 = connection.processDelete(
          DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2));
      waitOpResult(delOp1, ResultCode.SUCCESS);
      final DeleteOperation delOp2 = connection.processDelete(TEST_ROOT_DN2);
      waitOpResult(delOp2, ResultCode.SUCCESS);
      final DeleteOperation delOp3 = connection.processDelete(baseDN3);
      waitOpResult(delOp3, ResultCode.SUCCESS);
      final DN fionaDN = DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2);
      waitOpResult(connection.processDelete(fionaDN), ResultCode.SUCCESS);
      waitOpResult(connection.processDelete(TEST_ROOT_DN2), ResultCode.SUCCESS);
      waitOpResult(connection.processDelete(baseDN3), ResultCode.SUCCESS);
      remove(domain21, domain2, domain3);
      removeTestBackend(backend2, backend3);