mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
04.46.2013 2fc933d03d5961fa75fc4a8ffef75d3ec3b125d6
OPENDJ-1116 Introduce abstraction for the changelog DB

* convert ServerState to implement Iterable<CSN> instead of Iterable<Integer>
6 files modified
91 ■■■■ changed files
opends/src/server/org/opends/server/replication/common/CSNGenerator.java 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitorData.java 11 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 39 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/CSNGenerator.java
@@ -67,15 +67,18 @@
 public CSNGenerator(int id, ServerState state)
 {
   this.lastTime = TimeThread.getTime();
   for (int stateId : state)
    for (CSN csn : state)
   {
     if (this.lastTime < state.getCSN(stateId).getTime())
       this.lastTime = state.getCSN(stateId).getTime();
     if (stateId == id)
       this.seqnum = state.getCSN(id).getSeqnum();
      if (this.lastTime < csn.getTime())
      {
        this.lastTime = csn.getTime();
      }
      if (csn.getServerId() == id)
      {
        this.seqnum = csn.getSeqnum();
      }
   }
   this.serverId = id;
 }
  /**
@@ -160,9 +163,9 @@
   */
  public void adjust(ServerState state)
  {
    for (int localServerId : state)
    for (CSN csn : state)
    {
      adjust(state.getCSN(localServerId));
      adjust(csn);
    }
  }
}
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -43,7 +43,7 @@
 * For example, it is exchanged with the replication servers at connection
 * establishment time to communicate "which CSNs was last seen by a serverId".
 */
public class ServerState implements Iterable<Integer>
public class ServerState implements Iterable<CSN>
{
  /** Associates a serverId with a CSN. */
@@ -404,9 +404,9 @@
   * {@inheritDoc}
   */
  @Override
  public Iterator<Integer> iterator()
  public Iterator<CSN> iterator()
  {
    return serverIdToCSN.keySet().iterator();
    return serverIdToCSN.values().iterator();
  }
  /**
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -868,13 +868,10 @@
    changelog db has been trimmed and the cookie is not valid
    anymore.
    */
    for (int serverId : rsDomain.getStartState())
    for (CSN dbOldestChange : rsDomain.getStartState())
    {
      CSN dbOldestChange =
          rsDomain.getStartState().getCSN(serverId);
      CSN providedChange = cookie.getCSN(serverId);
      if (providedChange != null
          && providedChange.older(dbOldestChange))
      CSN providedChange = cookie.getCSN(dbOldestChange.getServerId());
      if (providedChange != null && providedChange.older(dbOldestChange))
      {
        return true;
      }
opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitor.java
@@ -279,7 +279,7 @@
        // This directly connected LS has never produced any change
        maxCSN = new CSN(0, 0, serverId);
      }
      pendingMonitorData.setMaxCSN(serverId, maxCSN);
      pendingMonitorData.setMaxCSN(maxCSN);
      pendingMonitorData.setLDAPServerState(serverId, dsState);
      pendingMonitorData.setFirstMissingDate(serverId,
          ds.getApproxFirstMissingDate());
@@ -290,10 +290,9 @@
    // - whatever they are directly or indirectly connected
    final ServerState dbServerState = domain.getLatestServerState();
    pendingMonitorData.setRSState(domain.getLocalRSServerId(), dbServerState);
    for (int serverId : dbServerState)
    for (CSN storedCSN : dbServerState)
    {
      CSN storedCSN = dbServerState.getCSN(serverId);
      pendingMonitorData.setMaxCSN(serverId, storedCSN);
      pendingMonitorData.setMaxCSN(storedCSN);
    }
  }
opends/src/server/org/opends/server/replication/server/ReplicationDomainMonitorData.java
@@ -294,22 +294,21 @@
   */
  public void setMaxCSNs(ServerState state)
  {
    for (Integer serverId : state) {
      CSN newCSN = state.getCSN(serverId);
      setMaxCSN(serverId, newCSN);
    for (CSN newCSN : state)
    {
      setMaxCSN(newCSN);
    }
  }
  /**
   * For the provided serverId, sets the provided CSN as the max if
   * it is newer than the current max.
   * @param serverId the provided serverId
   * @param newCSN the provided new CSN
   */
  public void setMaxCSN(int serverId, CSN newCSN)
  public void setMaxCSN(CSN newCSN)
  {
    if (newCSN==null) return;
    int serverId = newCSN.getServerId();
    CSN currentMaxCSN = maxCSNs.get(serverId);
    if (currentMaxCSN == null)
    {
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1307,14 +1307,13 @@
 /**
  * Count the number of changes in the replication changelog for the provided
  * serverID, between 2 provided CSNs.
  * @param serverId Identifier of the server for which to compute the count.
  * @param from lower limit CSN.
  * @param to   upper limit CSN.
  * @return the number of changes.
  */
  public long getCount(int serverId, CSN from, CSN to)
  public long getCount(CSN from, CSN to)
  {
    return domainDB.getCount(baseDN, serverId, from, to);
    return domainDB.getCount(baseDN, from.getServerId(), from, to);
  }
  /**
@@ -2593,9 +2592,8 @@
    if (eligibleCSN != null)
    {
      for (int serverId : latestState)
      for (CSN mostRecentDbCSN : latestState)
      {
        CSN mostRecentDbCSN = latestState.getCSN(serverId);
        try {
          // Is the most recent change in the Db newer than eligible CSN ?
          // if yes (like csn15 in the example above, then we have to go back
@@ -2603,9 +2601,12 @@
          if (eligibleCSN.olderOrEqual(mostRecentDbCSN))
          {
            // let's try to seek the first change <= eligibleCSN
            CSN newCSN = domainDB.getCSNAfter(baseDN, serverId, eligibleCSN);
            CSN newCSN = domainDB.getCSNAfter(baseDN,
                mostRecentDbCSN.getServerId(), eligibleCSN);
            result.update(newCSN);
          } else {
          }
          else
          {
            // for this serverId, all changes in the ChangelogDb are holder
            // than eligibleCSN, the most recent in the db is our guy.
            result.update(mostRecentDbCSN);
@@ -2653,14 +2654,11 @@
    CSN eligibleCSN = null;
    final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN);
    for (final int serverId : newestCSNs)
    for (final CSN changelogNewestCSN : newestCSNs)
    {
      // Consider this producer (DS/db).
      final CSN changelogNewestCSN = newestCSNs.getCSN(serverId);
      // Should it be considered for eligibility ?
      CSN heartbeatLastCSN =
        getChangeTimeHeartbeatState().getCSN(serverId);
      int serverId = changelogNewestCSN.getServerId();
      CSN heartbeatLastCSN = getChangeTimeHeartbeatState().getCSN(serverId);
      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
      // then the domain is considered down and not considered for eligibility
@@ -2804,10 +2802,9 @@
  {
    long res = 0;
    for (int serverId : getLatestServerState())
    for (CSN startCSN : getLatestServerState())
    {
      CSN startCSN = startState.getCSN(serverId);
      long serverIdRes = getCount(serverId, startCSN, endCSN);
      long serverIdRes = getCount(startCSN, endCSN);
      // The startPoint is excluded when counting the ECL eligible changes
      if (startCSN != null && serverIdRes > 0)
@@ -2831,10 +2828,12 @@
  public long getEligibleCount(CSN startCSN, CSN endCSN)
  {
    long res = 0;
    for (int serverId : getLatestServerState()) {
      CSN lStartCSN =
          new CSN(startCSN.getTime(), startCSN.getSeqnum(), serverId);
      res += getCount(serverId, lStartCSN, endCSN);
    for (CSN csn : getLatestServerState())
    {
      int serverId = csn.getServerId();
      CSN lStartCSN = new CSN(startCSN.getTime(), startCSN.getSeqnum(),
          serverId);
      res += getCount(lStartCSN, endCSN);
    }
    return res;
  }