mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
08.03.2008 7adb93986ace907531875e25be1f94d735fbb068
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -40,7 +40,6 @@
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -67,7 +66,6 @@
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
@@ -95,16 +93,12 @@
  private ProtocolSession session;
  private final MsgQueue msgQueue = new MsgQueue();
  private MsgQueue lateQueue = new MsgQueue();
  private final Map<ChangeNumber, AckMessageList> waitingAcks =
    new HashMap<ChangeNumber, AckMessageList>();
  private ReplicationServerDomain replicationServerDomain = null;
  private String serverURL;
  private int outCount = 0; // number of update sent to the server
  private int inCount = 0;  // number of updates received from the server
  private int inAckCount = 0;
  private int outAckCount = 0;
  private int maxReceiveQueue = 0;
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
@@ -120,7 +114,7 @@
  private ServerState serverState;
  private boolean activeWriter = true;
  private ServerWriter writer = null;
  private DN baseDn = null;
  private String baseDn = null;
  private int rcvWindow;
  private int rcvWindowSizeHalf;
  private int maxRcvWindow;
@@ -143,7 +137,7 @@
   * Properties filled only if remote server is a DS
   */
  // Status of this DS
  // Status of this DS (only used if this server handler represents a DS)
  private ServerStatus status = ServerStatus.INVALID_STATUS;
  // Referrals URLs this DS is exporting
  private List<String> refUrls = new ArrayList<String>();
@@ -182,9 +176,6 @@
   * Set when ServerHandler is stopping.
   */
  private AtomicBoolean shuttingDown = new AtomicBoolean(false);
  private static final Map<ChangeNumber, ReplServerAckMessageList>
    changelogsWaitingAcks =
    new HashMap<ChangeNumber, ReplServerAckMessageList>();
  /**
   * Creates a new server handler instance with the provided socket.
@@ -266,7 +257,7 @@
   * @param replicationServer the ReplicationServer that created this server
   *                          handler.
   */
  public void start(DN baseDn, short replicationServerId,
  public void start(String baseDn, short replicationServerId,
    String replicationServerURL,
    int windowSize, boolean sslEncryption,
    ReplicationServer replicationServer)
@@ -571,7 +562,7 @@
              {
                // Timeout
                Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get(
                  this.baseDn.toNormalizedString(),
                  this.baseDn,
                  Short.toString(serverId),
                  Short.toString(replicationServer.getServerId()));
                closeSession(message);
@@ -732,7 +723,7 @@
                    //     gen ID and so won't change without a reset
                    // then  we are just degrading the peer.
                    Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                      this.baseDn.toNormalizedString(),
                      this.baseDn,
                      Short.toString(serverId),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
@@ -759,7 +750,7 @@
                    // replicationServerDomain.
                    // setGenerationId(generationId, false);
                    Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                      this.baseDn.toNormalizedString(),
                      this.baseDn,
                      Short.toString(serverId),
                      Long.toString(generationId),
                      Long.toString(localGenerationId));
@@ -859,7 +850,7 @@
            if (generationId != localGenerationId)
            {
              Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
                this.baseDn.toNormalizedString(),
                this.baseDn,
                Short.toString(serverId),
                Long.toString(generationId),
                Long.toString(localGenerationId));
@@ -873,7 +864,7 @@
              // If the LDAP server has already sent changes
              // it is not expected to connect to an empty RS
              Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
                this.baseDn.toNormalizedString(),
                this.baseDn,
                Short.toString(serverId),
                Long.toString(generationId),
                Long.toString(localGenerationId));
@@ -957,7 +948,7 @@
                  //     gen ID and so won't change without a reset
                  // then  we are just degrading the peer.
                  Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                    this.baseDn.toNormalizedString(),
                    this.baseDn,
                    Short.toString(serverId),
                    Long.toString(generationId),
                    Long.toString(localGenerationId));
@@ -984,7 +975,7 @@
                  // replicationServerDomain.
                  // setGenerationId(generationId, false);
                  Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
                    this.baseDn.toNormalizedString(),
                    this.baseDn,
                    Short.toString(serverId),
                    Long.toString(generationId),
                    Long.toString(localGenerationId));
@@ -1195,26 +1186,6 @@
  }
  /**
   * Get the number of Ack received from the server managed by this handler.
   *
   * @return Returns the inAckCount.
   */
  public int getInAckCount()
  {
    return inAckCount;
  }
  /**
   * Get the number of Ack sent to the server managed by this handler.
   *
   * @return Returns the outAckCount.
   */
  public int getOutAckCount()
  {
    return outAckCount;
  }
  /**
   * Check is this server is saturated (this server has already been
   * sent a bunch of updates and has not processed them so they are staying
   * in the message queue for this server an the size of the queue
@@ -1763,145 +1734,14 @@
  }
  /**
   * Send the ack to the server that did the original modification.
   * Sends an ack message to the server represented by this object.
   *
   * @param changeNumber The ChangeNumber of the update that is acked.
   * @param ack The ack message to be sent.
   * @throws IOException In case of Exception thrown sending the ack.
   */
  public void sendAck(ChangeNumber changeNumber) throws IOException
  public void sendAck(AckMsg ack) throws IOException
  {
    AckMsg ack = new AckMsg(changeNumber);
    session.publish(ack);
    outAckCount++;
  }
  /**
   * Do the work when an ack message has been received from another server.
   *
   * @param message The ack message that was received.
   * @param ackingServerId The  id of the server that acked the change.
   */
  public void ack(AckMsg message, short ackingServerId)
  {
    ChangeNumber changeNumber = message.getChangeNumber();
    AckMessageList ackList;
    boolean completedFlag;
    synchronized (waitingAcks)
    {
      ackList = waitingAcks.get(changeNumber);
      if (ackList == null)
        return;
      ackList.addAck(ackingServerId);
      completedFlag = ackList.completed();
      if (completedFlag)
      {
        waitingAcks.remove(changeNumber);
      }
    }
    if (completedFlag)
    {
      replicationServerDomain.sendAck(changeNumber, true);
    }
  }
  /**
   * Process reception of an for an update that was received from a
   * ReplicationServer.
   *
   * @param message the ack message that was received.
   * @param ackingServerId The  id of the server that acked the change.
   */
  public static void ackChangelog(AckMsg message, short ackingServerId)
  {
    ChangeNumber changeNumber = message.getChangeNumber();
    ReplServerAckMessageList ackList;
    boolean completedFlag;
    synchronized (changelogsWaitingAcks)
    {
      ackList = changelogsWaitingAcks.get(changeNumber);
      if (ackList == null)
        return;
      ackList.addAck(ackingServerId);
      completedFlag = ackList.completed();
      if (completedFlag)
      {
        changelogsWaitingAcks.remove(changeNumber);
      }
    }
    if (completedFlag)
    {
      ReplicationServerDomain replicationServerDomain =
        ackList.getChangelogCache();
      replicationServerDomain.sendAck(changeNumber, false,
        ackList.getReplicationServerId());
    }
  }
  /**
   * Add an update to the list of update waiting for acks.
   *
   * @param update the update that must be added to the list
   * @param nbWaitedAck  The number of ack that must be received before
   *               the update is fully acked.
   */
  public void addWaitingAck(UpdateMsg update, int nbWaitedAck)
  {
    AckMessageList ackList = new AckMessageList(update.getChangeNumber(),
      nbWaitedAck);
    synchronized (waitingAcks)
    {
      waitingAcks.put(update.getChangeNumber(), ackList);
    }
  }
  /**
   * Add an update to the list of update received from a replicationServer and
   * waiting for acks.
   *
   * @param update The update that must be added to the list.
   * @param ChangelogServerId The identifier of the replicationServer that sent
   *                          the update.
   * @param replicationServerDomain The ReplicationServerDomain from which the
   *                                change was processed and to which the ack
   *                                must later be sent.
   * @param nbWaitedAck The number of ack that must be received before
   *                    the update is fully acked.
   */
  public static void addWaitingAck(
    UpdateMsg update,
    short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
    int nbWaitedAck)
  {
    ReplServerAckMessageList ackList =
      new ReplServerAckMessageList(update.getChangeNumber(),
      nbWaitedAck,
      ChangelogServerId,
      replicationServerDomain);
    synchronized (changelogsWaitingAcks)
    {
      changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
    }
  }
  /**
   * Get the size of the list of update waiting for acks.
   *
   * @return the size of the list of update waiting for acks.
   */
  public int getWaitingAckSize()
  {
    synchronized (waitingAcks)
    {
      return waitingAcks.size();
    }
  }
  /**
   * Increment the count of Acks received from this server.
   */
  public void incrementInAckCount()
  {
    inAckCount++;
  }
  /**
@@ -2001,13 +1841,13 @@
        .valueOf(serverId)));
    attributes.add(Attributes.create("base-dn", baseDn.toString()));
    if (serverIsLDAPserver)
    try
    {
      MonitorData md;
      try
      {
        md = replicationServerDomain.getMonitorData();
      md = replicationServerDomain.getMonitorData();
      if (serverIsLDAPserver)
      {
        // Oldest missing update
        Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
        if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
@@ -2017,7 +1857,7 @@
              "approx-older-change-not-synchronized", date.toString()));
          attributes.add(Attributes.create(
              "approx-older-change-not-synchronized-millis", String
                  .valueOf(approxFirstMissingDate)));
              .valueOf(approxFirstMissingDate)));
        }
        // Missing changes
@@ -2030,14 +1870,21 @@
        attributes.add(Attributes.create("approximate-delay", String
            .valueOf(delay)));
      }
      catch (Exception e)
      else
      {
        // TODO: improve the log
        // We failed retrieving the remote monitor data.
        attributes.add(Attributes.create("error",
            stackTraceToSingleLineString(e)));
        // Missing changes
        long missingChanges = md.getMissingChangesRS(serverId);
        attributes.add(Attributes.create("missing-changes", String
            .valueOf(missingChanges)));
      }
    }
    catch (Exception e)
    {
      // TODO: improve the log
      // We failed retrieving the remote monitor data.
      attributes.add(Attributes.create("error",
          stackTraceToSingleLineString(e)));
    }
    attributes.add(
        Attributes.create("queue-size", String.valueOf(msgQueue.count())));
@@ -2056,14 +1903,6 @@
    attributes.add(Attributes.create("update-received", String
        .valueOf(getInCount())));
    // Deprecated as long as assured is not exposed
    attributes.add(Attributes.create("update-waiting-acks", String
        .valueOf(getWaitingAckSize())));
    attributes.add(Attributes.create("ack-sent", String
        .valueOf(getOutAckCount())));
    attributes.add(Attributes.create("ack-received", String
        .valueOf(getInAckCount())));
    // Window stats
    attributes.add(Attributes.create("max-send-window", String
        .valueOf(sendWindowSize)));
@@ -2125,11 +1964,14 @@
    /*
     * Stop the remote LSHandler
     */
    for (LightweightServerHandler lsh : directoryServers.values())
    synchronized (directoryServers)
    {
      lsh.stopHandler();
      for (LightweightServerHandler lsh : directoryServers.values())
      {
        lsh.stopHandler();
      }
      directoryServers.clear();
    }
    directoryServers.clear();
    /*
     * Stop the heartbeat thread.
@@ -2217,7 +2059,6 @@
      {
        WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
        session.publish(msg);
        outAckCount++;
        rcvWindow += rcvWindowSizeHalf;
      }
    }
@@ -2302,23 +2143,26 @@
     */
    List<DSInfo> dsInfos = topoMsg.getDsList();
    // Removes the existing structures
    for (LightweightServerHandler lsh : directoryServers.values())
    synchronized (directoryServers)
    {
      lsh.stopHandler();
    }
    directoryServers.clear();
      // Removes the existing structures
      for (LightweightServerHandler lsh : directoryServers.values())
      {
        lsh.stopHandler();
      }
      directoryServers.clear();
    // Creates the new structure according to the message received.
    for (DSInfo dsInfo : dsInfos)
    {
      LightweightServerHandler lsh = new LightweightServerHandler(this,
        serverId, dsInfo.getDsId(), dsInfo.getGenerationId(),
        dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
        dsInfo.isAssured(), dsInfo.getAssuredMode(),
        dsInfo.getSafeDataLevel());
      lsh.startHandler();
      directoryServers.put(lsh.getServerId(), lsh);
      // Creates the new structure according to the message received.
      for (DSInfo dsInfo : dsInfos)
      {
        LightweightServerHandler lsh = new LightweightServerHandler(this,
            serverId, dsInfo.getDsId(), dsInfo.getGenerationId(),
            dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
            dsInfo.isAssured(), dsInfo.getAssuredMode(),
            dsInfo.getSafeDataLevel());
        lsh.startHandler();
        directoryServers.put(lsh.getServerId(), lsh);
      }
    }
  }
@@ -2445,7 +2289,10 @@
   */
  public boolean hasRemoteLDAPServers()
  {
    return !directoryServers.isEmpty();
    synchronized (directoryServers)
    {
      return !directoryServers.isEmpty();
    }
  }
  /**
@@ -2496,7 +2343,6 @@
      // TODO also log an error message.
      WindowMsg msg = new WindowMsg(rcvWindow);
      session.publish(msg);
      outAckCount++;
    } else
    {
      // Both the LDAP server and the replication server believes that the
@@ -2556,17 +2402,10 @@
   */
  public Set<Short> getConnectedDirectoryServerIds()
  {
    return directoryServers.keySet();
  }
  /**
   * Get the map of connected DSs
   * (to the RS represented by this server handler).
   * @return The map of connected DSs
   */
  public Map<Short, LightweightServerHandler> getConnectedDSs()
  {
    return directoryServers;
    synchronized (directoryServers)
    {
      return directoryServers.keySet();
    }
  }
  /**
@@ -2716,4 +2555,32 @@
  {
    return protocolVersion;
  }
  /**
   * Add the DSinfos of the connected Directory Servers
   * to the List of DSInfo provided as a parameter.
   *
   * @param dsInfos The List of DSInfo that should be updated
   *                with the DSInfo for the directoryServers
   *                connected to this ServerHandler.
   */
  public void addDSInfos(List<DSInfo> dsInfos)
  {
    synchronized (directoryServers)
    {
      for (LightweightServerHandler ls : directoryServers.values())
      {
        dsInfos.add(ls.toDSInfo());
      }
    }
  }
  /**
   * Gets the group id of the server represented by this object.
   * @return The group id of the server represented by this object.
   */
  public byte getGroupId()
  {
    return groupId;
  }
}