mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
14.37.2009 5ec0cb08889c9f1a24fd4cc8b139dcdb942dd92a
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -63,6 +63,7 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.MonitorMsg;
@@ -75,9 +76,11 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
/**
@@ -173,6 +176,8 @@
  // every n number of treated assured messages
  private int assuredTimeoutTimerPurgeCounter = 0;
  ServerState ctHeartbeatState = null;
  /**
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
   *
@@ -360,8 +365,7 @@
        if ( (generationId>0) && (generationId != handler.getGenerationId()) )
        {
          if (debugEnabled())
            TRACER.debugInfo("In RS " +
              replicationServer.getServerId() +
            TRACER.debugInfo("In " + this.getName() +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " will not be sent to replication server " +
@@ -426,8 +430,7 @@
        if (debugEnabled())
        {
          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
            TRACER.debugInfo("In RS " +
              replicationServer.getServerId() +
            TRACER.debugInfo("In " + this +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " will not be sent to directory server " +
@@ -1024,10 +1027,9 @@
  {
      if (debugEnabled())
        TRACER.debugInfo(
            "In RS " + this.replicationServer.getMonitorInstanceName() +
            " domain=" + this +
            " stopServer(SH)" + handler.getMonitorInstanceName() +
          " " + stackTraceToSingleLineString(new Exception()));
            "In " + this.replicationServer.getMonitorInstanceName() +
            " domain=" + this + " stopServer() on the server handler " +
            handler.getMonitorInstanceName());
    /*
     * We must prevent deadlock on replication server domain lock, when for
     * instance this code is called from dying ServerReader but also dying
@@ -1119,10 +1121,9 @@
  {
    if (debugEnabled())
      TRACER.debugInfo(
          "In RS " + this.replicationServer.getMonitorInstanceName() +
          " domain=" + this +
          " stopServer(MH)" + handler.getMonitorInstanceName() +
          " " + stackTraceToSingleLineString(new Exception()));
          "In " + this.replicationServer.getMonitorInstanceName()
          + " domain=" + this + " stopServer() on the message handler "
          + handler.getMonitorInstanceName());
    /*
     * We must prevent deadlock on replication server domain lock, when for
     * instance this code is called from dying ServerReader but also dying
@@ -1363,7 +1364,40 @@
    try
    {
      return handler.generateIterator(changeNumber);
      ReplicationIterator it = handler.generateIterator(changeNumber);
      if (it.next()==false)
      {
        it.releaseCursor();
        throw new Exception("no new change");
      }
      return it;
    } catch (Exception e)
    {
      return null;
    }
  }
  /**
   * Creates and returns an iterator.
   * When the iterator is not used anymore, the caller MUST call the
   * ReplicationIterator.releaseCursor() method to free the resources
   * and locks used by the ReplicationIterator.
   *
   * @param serverId Identifier of the server for which the iterator is created.
   * @param changeNumber Starting point for the iterator.
   * @return the created ReplicationIterator. Null when no DB is available
   * for the provided server Id.
   */
  public ReplicationIterator getIterator(short serverId,
    ChangeNumber changeNumber)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
    if (handler == null)
      return null;
    try
    {
      ReplicationIterator it = handler.generateIterator(changeNumber);
      return it;
    } catch (Exception e)
    {
      return null;
@@ -1955,12 +1989,10 @@
    ResetGenerationIdMsg genIdMsg)
  {
    if (debugEnabled())
    {
      TRACER.debugInfo(
        "In RS " + getReplicationServer().getServerId() +
        "In " + this +
        " Receiving ResetGenerationIdMsg from " + senderHandler.getServerId() +
        " for baseDn " + baseDn + ":\n" + genIdMsg);
    }
    try
    {
@@ -1982,14 +2014,12 @@
    {
      // Order to take a gen id we already have, just ignore
      if (debugEnabled())
      {
        TRACER.debugInfo(
          "In RS " + getReplicationServer().getServerId()
          "In " + this
          + " Reset generation id requested for baseDn " + baseDn
          + " but generation id was already " + this.generationId
          + ":\n" + genIdMsg);
      }
    }
    // If we are the first replication server warned,
    // then forwards the reset message to the remote replication servers
@@ -2002,7 +2032,7 @@
        rsHandler.setGenerationId(newGenId);
        if (senderHandler.isDataServer())
        {
          rsHandler.forwardGenerationIdToRS(genIdMsg);
          rsHandler.send(genIdMsg);
        }
      } catch (IOException e)
      {
@@ -2158,7 +2188,7 @@
  }
  /**
   * Clears the Db associated with that cache.
   * Clears the Db associated with that domain.
   */
  public void clearDbs()
  {
@@ -2181,12 +2211,6 @@
        }
      }
      stopDbHandlers();
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + this.replicationServer.getMonitorInstanceName() +
          " baseDN=" + baseDn +
          " The source db handler has been cleared");
    }
    try
    {
@@ -2471,11 +2495,6 @@
   */
  public void receivesMonitorDataResponse(MonitorMsg msg)
  {
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        "Receiving " + msg + " from " + msg.getsenderID());
    try
    {
      synchronized (monitorDataLock)
@@ -2543,7 +2562,7 @@
        {
          if (debugEnabled())
            TRACER.debugInfo(
              "In " + this.replicationServer.getMonitorInstanceName() +
              "In " + this +
              " baseDn=" + baseDn +
              " Processed msg from " + msg.getsenderID() +
              " New monitor data: " + wrkMonitorData.toString());
@@ -2819,24 +2838,29 @@
   * Return the state that contain for each server the time of eligibility.
   * @return the state.
   */
  public ServerState getHeartbeatState()
  public ServerState getChangeTimeHeartbeatState()
  {
    // TODO:ECL Eligility must be supported
    return this.getDbServerState();
    if (ctHeartbeatState == null)
    {
      ctHeartbeatState = this.getDbServerState().duplicate();
    }
    return ctHeartbeatState;
  }
  /**
   * TODO: code cleaning - remove this method.
   * Computes the change number eligible to the ECL.
   * @return null if the domain does not play in eligibility.
   */
  public ChangeNumber computeEligibleCN()
  public ChangeNumber computeEligibleCN2()
  {
    ChangeNumber elligibleCN = null;
    ServerState heartbeatState = getHeartbeatState();
    ChangeNumber eligibleCN = null;
    ServerState heartbeatState = getChangeTimeHeartbeatState();
    if (heartbeatState==null)
      return null;
    // compute elligible CN
    // compute eligible CN
    ServerState hbState = heartbeatState.duplicate();
    Iterator<Short> it = hbState.iterator();
@@ -2850,70 +2874,87 @@
      if (TimeThread.getTime()-storedCN.getTime()>2000)
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "For RSD." + this.baseDn + " Server " + sid
          TRACER.debugInfo("In " + this.getName() +
            " Server " + sid
            + " is not considered for eligibility ... potentially down");
        continue;
      }
      if ((elligibleCN == null) || (storedCN.older(elligibleCN)))
      if ((eligibleCN == null) || (storedCN.older(eligibleCN)))
      {
        elligibleCN = storedCN;
        eligibleCN = storedCN;
      }
    }
    if (debugEnabled())
      TRACER.debugInfo(
        "For RSD." + this.baseDn + " ElligibleCN()=" + elligibleCN);
    return elligibleCN;
      TRACER.debugInfo("In " + this.getName() +
        " computeEligibleCN() returns " + eligibleCN);
    return eligibleCN;
  }
  /**
   * Computes the eligible server state by minimizing the dbServerState and the
   * elligibleCN.
   * Computes the eligible server state for the domain.
   * Consists in taking the most recent change from the dbServerState and the
   * eligibleCN.
   * @param eligibleCN The provided eligibleCN.
   * @return The computed eligible server state.
   */
  public ServerState getCLElligibleState()
  public ServerState getEligibleState(ChangeNumber eligibleCN)
  {
    // ChangeNumber elligibleCN = computeEligibleCN();
    ServerState res = new ServerState();
    ServerState dbState = this.getDbServerState();
    res = dbState;
    ServerState result = new ServerState();
    /* TODO:ECL Eligibility is not yet implemented
    Iterator<Short> it = dbState.iterator();
    while (it.hasNext())
    ServerState dbState = this.getDbServerState();
    result = dbState.duplicate();
    if (eligibleCN != null)
    {
      Short sid = it.next();
      DbHandler h = sourceDbHandlers.get(sid);
      ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
      try
      Iterator<Short> it = dbState.iterator();
      while (it.hasNext())
      {
        if ((elligibleCN!=null)&&(elligibleCN.older(dbCN)))
        Short sid = it.next();
        DbHandler h = sourceDbHandlers.get(sid);
        ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
        try
        {
          // some CN exist in the db newer than elligible CN
          ReplicationIterator ri = h.generateIterator(elligibleCN);
          ChangeNumber newCN = ri.getCurrentCN();
          res.update(newCN);
          ri.releaseCursor();
          if (eligibleCN.older(dbCN))
          {
            // some CN exist in the db newer than eligible CN
            // let's get it
            ReplicationIterator ri = h.generateIterator(eligibleCN);
            try
            {
              if ((ri != null) && (ri.getChange()!=null))
              {
                ChangeNumber newCN = ri.getChange().getChangeNumber();
                result.update(newCN);
              }
            }
            finally
            {
              ri.releaseCursor();
              ri = null;
            }
          }
          else
          {
            // no CN exist in the db newer than elligible CN
            result.update(dbCN);
          }
        }
        else
        catch(Exception e)
        {
          // no CN exist in the db newer than elligible CN
          res.update(dbCN);
          Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(
              " " +  stackTraceToSingleLineString(e));
          logError(errMessage);
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
    */
    if (debugEnabled())
      TRACER.debugInfo("In " + this.getName()
        + " getCLElligibleState returns:" + res);
    return res;
      TRACER.debugInfo("In " + this
        + " getEligibleState() result is " + result);
    return result;
  }
  /**
@@ -2930,4 +2971,206 @@
    }
    return domainStartState;
  }
  /**
   * Returns the eligibleCN for that domain - relies on the ChangeTimeHeartbeat
   * state.
   * For each DS, take the oldest CN from the changetime hearbeat state
   * and from the changelog db last CN. Can be null.
   * @return the eligible CN.
   */
  public ChangeNumber getEligibleCN()
  {
    ChangeNumber eligibleCN = null;
    for (DbHandler db : sourceDbHandlers.values())
    {
      // Consider this producer (DS/db).
      short sid = db.getServerId();
      ChangeNumber changelogLastCN = db.getLastChange();
      if (changelogLastCN != null)
      {
        if ((eligibleCN == null) || (changelogLastCN.newer(eligibleCN)))
        {
          eligibleCN = changelogLastCN;
        }
      }
      ChangeNumber heartbeatLastDN =
        getChangeTimeHeartbeatState().getMaxChangeNumber(sid);
      if ((heartbeatLastDN != null) &&
       ((eligibleCN == null) || (heartbeatLastDN.newer(eligibleCN))))
      {
        eligibleCN = heartbeatLastDN;
      }
    }
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.getName() + " getEligibleCN() returns result ="
        + eligibleCN);
    return eligibleCN;
  }
  /**
   * Processes a ChangeTimeHeartbeatMsg received, by storing the CN (timestamp)
   * value received, and forwarding the message to the other RSes.
   * @param senderHandler The handler for the server that sent the heartbeat.
   * @param msg The message to process.
   */
  public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
      ChangeTimeHeartbeatMsg msg )
  {
    try
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
    } catch (InterruptedException ex)
    {
      // Try doing job anyway...
    }
    try
    {
      storeReceivedCTHeartbeat(msg.getChangeNumber());
      // If we are the first replication server warned,
      // then forwards the reset message to the remote replication servers
      for (ReplicationServerHandler rsHandler : replicationServers.values())
      {
        try
        {
          // After we'll have sent the message , the remote RS will adopt
          // the new genId
          if (senderHandler.isDataServer())
          {
            rsHandler.send(msg);
          }
        } catch (IOException e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(rsHandler.getName()));
          stopServer(rsHandler);
        }
      }
    }
    finally
    {
      release();
    }
  }
  /**
   * Store a change time value received from a data server.
   * @param cn The provided change time.
   */
  public void storeReceivedCTHeartbeat(ChangeNumber cn)
  {
    // TODO:May be we can spare processing by only storing CN (timestamp)
    // instead of a server state.
    getChangeTimeHeartbeatState().update(cn);
    /*
    if (debugEnabled())
    {
      Set<String> ss = ctHeartbeatState.toStringSet();
      String dss = "";
      for (String s : ss)
      {
        dss = dss + " \\ " + s;
      }
      TRACER.debugInfo("In " + this.getName() + " " + dss);
    }
    */
  }
  /**
   * This methods count the changes, server by server :
   * - from a start point (cn taken from the provided startState)
   * - to an end point (the provided endCN).
   * @param startState The provided start server state.
   * @param endCN The provided end change number.
   * @return The number of changes between startState and endCN.
   */
  public long getEligibleCount(ServerState startState, ChangeNumber endCN)
  {
    long res = 0;
    ReplicationIterator ri=null;
    // Parses the dbState of the domain , server by server
    ServerState dbState = this.getDbServerState();
    Iterator<Short> it = dbState.iterator();
    while (it.hasNext())
    {
      // for each server
      Short sid = it.next();
      DbHandler h = sourceDbHandlers.get(sid);
      try
      {
        // Set on the change related to the startState
        ChangeNumber startCN = null;
        try
        {
          ri = h.generateIterator(startState.getMaxChangeNumber(sid));
          startCN = ri.getChange().getChangeNumber();
        }
        catch(Exception e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
          // no change found (purge from CL)
          startCN = null;
        }
        finally
        {
          if (ri!=null)
          {
            ri.releaseCursor();
            ri = null;
          }
        }
        if (startCN != null)
        {
          // Set on the change related to the endCN
          ChangeNumber upperCN;
          try
          {
            // Build a changenumber for this very server, with the timestamp
            // of the endCN
            ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid);
            ri = h.generateIterator(f);
            upperCN = ri.getChange().getChangeNumber();
          }
          catch(Exception e)
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
            // no new change
            upperCN = h.getLastChange();
          }
          finally
          {
            if (ri!=null)
            {
              ri.releaseCursor();
              ri = null;
            }
          }
          long diff = upperCN.getSeqnum() - startCN.getSeqnum() + 1;
          res += diff;
        }
        // TODO:ECL We should compute if changenumber.seqnum has turned !
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
    return res;
  }
}