From d04fb0f282e0fd9a4bc80d3f9d5ee15506a3b83b Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk

---
 opends/src/server/org/opends/server/replication/server/ServerHandler.java |  313 +++++++++++++++-------------------------------------
 1 files changed, 90 insertions(+), 223 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 238a243..df7211a 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -40,7 +40,6 @@
 import java.util.Date;
 import java.util.List;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -67,7 +66,6 @@
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeBuilder;
 import org.opends.server.types.Attributes;
-import org.opends.server.types.DN;
 import org.opends.server.types.InitializationException;
 import org.opends.server.util.TimeThread;
 
@@ -95,16 +93,12 @@
   private ProtocolSession session;
   private final MsgQueue msgQueue = new MsgQueue();
   private MsgQueue lateQueue = new MsgQueue();
-  private final Map<ChangeNumber, AckMessageList> waitingAcks =
-    new HashMap<ChangeNumber, AckMessageList>();
   private ReplicationServerDomain replicationServerDomain = null;
   private String serverURL;
   private int outCount = 0; // number of update sent to the server
 
   private int inCount = 0;  // number of updates received from the server
 
-  private int inAckCount = 0;
-  private int outAckCount = 0;
   private int maxReceiveQueue = 0;
   private int maxSendQueue = 0;
   private int maxReceiveDelay = 0;
@@ -120,7 +114,7 @@
   private ServerState serverState;
   private boolean activeWriter = true;
   private ServerWriter writer = null;
-  private DN baseDn = null;
+  private String baseDn = null;
   private int rcvWindow;
   private int rcvWindowSizeHalf;
   private int maxRcvWindow;
@@ -143,7 +137,7 @@
    * Properties filled only if remote server is a DS
    */
 
-  // Status of this DS
+  // Status of this DS (only used if this server handler represents a DS)
   private ServerStatus status = ServerStatus.INVALID_STATUS;
   // Referrals URLs this DS is exporting
   private List<String> refUrls = new ArrayList<String>();
@@ -182,9 +176,6 @@
    * Set when ServerHandler is stopping.
    */
   private AtomicBoolean shuttingDown = new AtomicBoolean(false);
-  private static final Map<ChangeNumber, ReplServerAckMessageList>
-    changelogsWaitingAcks =
-    new HashMap<ChangeNumber, ReplServerAckMessageList>();
 
   /**
    * Creates a new server handler instance with the provided socket.
@@ -266,7 +257,7 @@
    * @param replicationServer the ReplicationServer that created this server
    *                          handler.
    */
-  public void start(DN baseDn, short replicationServerId,
+  public void start(String baseDn, short replicationServerId,
     String replicationServerURL,
     int windowSize, boolean sslEncryption,
     ReplicationServer replicationServer)
@@ -571,7 +562,7 @@
               {
                 // Timeout
                 Message message = NOTE_TIMEOUT_WHEN_CROSS_CONNECTION.get(
-                  this.baseDn.toNormalizedString(),
+                  this.baseDn,
                   Short.toString(serverId),
                   Short.toString(replicationServer.getServerId()));
                 closeSession(message);
@@ -732,7 +723,7 @@
                     //     gen ID and so won't change without a reset
                     // then  we are just degrading the peer.
                     Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
-                      this.baseDn.toNormalizedString(),
+                      this.baseDn,
                       Short.toString(serverId),
                       Long.toString(generationId),
                       Long.toString(localGenerationId));
@@ -759,7 +750,7 @@
                     // replicationServerDomain.
                     // setGenerationId(generationId, false);
                     Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
-                      this.baseDn.toNormalizedString(),
+                      this.baseDn,
                       Short.toString(serverId),
                       Long.toString(generationId),
                       Long.toString(localGenerationId));
@@ -859,7 +850,7 @@
             if (generationId != localGenerationId)
             {
               Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
-                this.baseDn.toNormalizedString(),
+                this.baseDn,
                 Short.toString(serverId),
                 Long.toString(generationId),
                 Long.toString(localGenerationId));
@@ -873,7 +864,7 @@
               // If the LDAP server has already sent changes
               // it is not expected to connect to an empty RS
               Message message = NOTE_BAD_GENERATION_ID_FROM_DS.get(
-                this.baseDn.toNormalizedString(),
+                this.baseDn,
                 Short.toString(serverId),
                 Long.toString(generationId),
                 Long.toString(localGenerationId));
@@ -957,7 +948,7 @@
                   //     gen ID and so won't change without a reset
                   // then  we are just degrading the peer.
                   Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
-                    this.baseDn.toNormalizedString(),
+                    this.baseDn,
                     Short.toString(serverId),
                     Long.toString(generationId),
                     Long.toString(localGenerationId));
@@ -984,7 +975,7 @@
                   // replicationServerDomain.
                   // setGenerationId(generationId, false);
                   Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
-                    this.baseDn.toNormalizedString(),
+                    this.baseDn,
                     Short.toString(serverId),
                     Long.toString(generationId),
                     Long.toString(localGenerationId));
@@ -1195,26 +1186,6 @@
   }
 
   /**
-   * Get the number of Ack received from the server managed by this handler.
-   *
-   * @return Returns the inAckCount.
-   */
-  public int getInAckCount()
-  {
-    return inAckCount;
-  }
-
-  /**
-   * Get the number of Ack sent to the server managed by this handler.
-   *
-   * @return Returns the outAckCount.
-   */
-  public int getOutAckCount()
-  {
-    return outAckCount;
-  }
-
-  /**
    * Check is this server is saturated (this server has already been
    * sent a bunch of updates and has not processed them so they are staying
    * in the message queue for this server an the size of the queue
@@ -1763,145 +1734,14 @@
   }
 
   /**
-   * Send the ack to the server that did the original modification.
+   * Sends an ack message to the server represented by this object.
    *
-   * @param changeNumber The ChangeNumber of the update that is acked.
+   * @param ack The ack message to be sent.
    * @throws IOException In case of Exception thrown sending the ack.
    */
-  public void sendAck(ChangeNumber changeNumber) throws IOException
+  public void sendAck(AckMsg ack) throws IOException
   {
-    AckMsg ack = new AckMsg(changeNumber);
     session.publish(ack);
-    outAckCount++;
-  }
-
-  /**
-   * Do the work when an ack message has been received from another server.
-   *
-   * @param message The ack message that was received.
-   * @param ackingServerId The  id of the server that acked the change.
-   */
-  public void ack(AckMsg message, short ackingServerId)
-  {
-    ChangeNumber changeNumber = message.getChangeNumber();
-    AckMessageList ackList;
-    boolean completedFlag;
-    synchronized (waitingAcks)
-    {
-      ackList = waitingAcks.get(changeNumber);
-      if (ackList == null)
-        return;
-      ackList.addAck(ackingServerId);
-      completedFlag = ackList.completed();
-      if (completedFlag)
-      {
-        waitingAcks.remove(changeNumber);
-      }
-    }
-    if (completedFlag)
-    {
-      replicationServerDomain.sendAck(changeNumber, true);
-    }
-  }
-
-  /**
-   * Process reception of an for an update that was received from a
-   * ReplicationServer.
-   *
-   * @param message the ack message that was received.
-   * @param ackingServerId The  id of the server that acked the change.
-   */
-  public static void ackChangelog(AckMsg message, short ackingServerId)
-  {
-    ChangeNumber changeNumber = message.getChangeNumber();
-    ReplServerAckMessageList ackList;
-    boolean completedFlag;
-    synchronized (changelogsWaitingAcks)
-    {
-      ackList = changelogsWaitingAcks.get(changeNumber);
-      if (ackList == null)
-        return;
-      ackList.addAck(ackingServerId);
-      completedFlag = ackList.completed();
-      if (completedFlag)
-      {
-        changelogsWaitingAcks.remove(changeNumber);
-      }
-    }
-    if (completedFlag)
-    {
-      ReplicationServerDomain replicationServerDomain =
-        ackList.getChangelogCache();
-      replicationServerDomain.sendAck(changeNumber, false,
-        ackList.getReplicationServerId());
-    }
-  }
-
-  /**
-   * Add an update to the list of update waiting for acks.
-   *
-   * @param update the update that must be added to the list
-   * @param nbWaitedAck  The number of ack that must be received before
-   *               the update is fully acked.
-   */
-  public void addWaitingAck(UpdateMsg update, int nbWaitedAck)
-  {
-    AckMessageList ackList = new AckMessageList(update.getChangeNumber(),
-      nbWaitedAck);
-    synchronized (waitingAcks)
-    {
-      waitingAcks.put(update.getChangeNumber(), ackList);
-    }
-  }
-
-  /**
-   * Add an update to the list of update received from a replicationServer and
-   * waiting for acks.
-   *
-   * @param update The update that must be added to the list.
-   * @param ChangelogServerId The identifier of the replicationServer that sent
-   *                          the update.
-   * @param replicationServerDomain The ReplicationServerDomain from which the
-   *                                change was processed and to which the ack
-   *                                must later be sent.
-   * @param nbWaitedAck The number of ack that must be received before
-   *                    the update is fully acked.
-   */
-  public static void addWaitingAck(
-    UpdateMsg update,
-    short ChangelogServerId, ReplicationServerDomain replicationServerDomain,
-    int nbWaitedAck)
-  {
-    ReplServerAckMessageList ackList =
-      new ReplServerAckMessageList(update.getChangeNumber(),
-      nbWaitedAck,
-      ChangelogServerId,
-      replicationServerDomain);
-    synchronized (changelogsWaitingAcks)
-    {
-      changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
-    }
-  }
-
-  /**
-   * Get the size of the list of update waiting for acks.
-   *
-   * @return the size of the list of update waiting for acks.
-   */
-  public int getWaitingAckSize()
-  {
-    synchronized (waitingAcks)
-    {
-      return waitingAcks.size();
-    }
-  }
-
-  /**
-   * Increment the count of Acks received from this server.
-   */
-  public void incrementInAckCount()
-  {
-    inAckCount++;
   }
 
   /**
@@ -2001,13 +1841,13 @@
         .valueOf(serverId)));
     attributes.add(Attributes.create("base-dn", baseDn.toString()));
 
-    if (serverIsLDAPserver)
+    try
     {
       MonitorData md;
-      try
-      {
-        md = replicationServerDomain.getMonitorData();
+      md = replicationServerDomain.getMonitorData();
 
+      if (serverIsLDAPserver)
+      {
         // Oldest missing update
         Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
         if ((approxFirstMissingDate != null) && (approxFirstMissingDate > 0))
@@ -2017,7 +1857,7 @@
               "approx-older-change-not-synchronized", date.toString()));
           attributes.add(Attributes.create(
               "approx-older-change-not-synchronized-millis", String
-                  .valueOf(approxFirstMissingDate)));
+              .valueOf(approxFirstMissingDate)));
         }
 
         // Missing changes
@@ -2030,14 +1870,21 @@
         attributes.add(Attributes.create("approximate-delay", String
             .valueOf(delay)));
       }
-      catch (Exception e)
+      else
       {
-        // TODO: improve the log
-        // We failed retrieving the remote monitor data.
-        attributes.add(Attributes.create("error",
-            stackTraceToSingleLineString(e)));
+        // Missing changes
+        long missingChanges = md.getMissingChangesRS(serverId);
+        attributes.add(Attributes.create("missing-changes", String
+            .valueOf(missingChanges)));
       }
     }
+    catch (Exception e)
+    {
+      // TODO: improve the log
+      // We failed retrieving the remote monitor data.
+      attributes.add(Attributes.create("error",
+          stackTraceToSingleLineString(e)));
+    }
 
     attributes.add(
         Attributes.create("queue-size", String.valueOf(msgQueue.count())));
@@ -2056,14 +1903,6 @@
     attributes.add(Attributes.create("update-received", String
         .valueOf(getInCount())));
 
-    // Deprecated as long as assured is not exposed
-    attributes.add(Attributes.create("update-waiting-acks", String
-        .valueOf(getWaitingAckSize())));
-    attributes.add(Attributes.create("ack-sent", String
-        .valueOf(getOutAckCount())));
-    attributes.add(Attributes.create("ack-received", String
-        .valueOf(getInAckCount())));
-
     // Window stats
     attributes.add(Attributes.create("max-send-window", String
         .valueOf(sendWindowSize)));
@@ -2125,11 +1964,14 @@
     /*
      * Stop the remote LSHandler
      */
-    for (LightweightServerHandler lsh : directoryServers.values())
+    synchronized (directoryServers)
     {
-      lsh.stopHandler();
+      for (LightweightServerHandler lsh : directoryServers.values())
+      {
+        lsh.stopHandler();
+      }
+      directoryServers.clear();
     }
-    directoryServers.clear();
 
     /*
      * Stop the heartbeat thread.
@@ -2217,7 +2059,6 @@
       {
         WindowMsg msg = new WindowMsg(rcvWindowSizeHalf);
         session.publish(msg);
-        outAckCount++;
         rcvWindow += rcvWindowSizeHalf;
       }
     }
@@ -2302,23 +2143,26 @@
      */
     List<DSInfo> dsInfos = topoMsg.getDsList();
 
-    // Removes the existing structures
-    for (LightweightServerHandler lsh : directoryServers.values())
+    synchronized (directoryServers)
     {
-      lsh.stopHandler();
-    }
-    directoryServers.clear();
+      // Removes the existing structures
+      for (LightweightServerHandler lsh : directoryServers.values())
+      {
+        lsh.stopHandler();
+      }
+      directoryServers.clear();
 
-    // Creates the new structure according to the message received.
-    for (DSInfo dsInfo : dsInfos)
-    {
-      LightweightServerHandler lsh = new LightweightServerHandler(this,
-        serverId, dsInfo.getDsId(), dsInfo.getGenerationId(),
-        dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
-        dsInfo.isAssured(), dsInfo.getAssuredMode(),
-        dsInfo.getSafeDataLevel());
-      lsh.startHandler();
-      directoryServers.put(lsh.getServerId(), lsh);
+      // Creates the new structure according to the message received.
+      for (DSInfo dsInfo : dsInfos)
+      {
+        LightweightServerHandler lsh = new LightweightServerHandler(this,
+            serverId, dsInfo.getDsId(), dsInfo.getGenerationId(),
+            dsInfo.getGroupId(), dsInfo.getStatus(), dsInfo.getRefUrls(),
+            dsInfo.isAssured(), dsInfo.getAssuredMode(),
+            dsInfo.getSafeDataLevel());
+        lsh.startHandler();
+        directoryServers.put(lsh.getServerId(), lsh);
+      }
     }
   }
 
@@ -2445,7 +2289,10 @@
    */
   public boolean hasRemoteLDAPServers()
   {
-    return !directoryServers.isEmpty();
+    synchronized (directoryServers)
+    {
+      return !directoryServers.isEmpty();
+    }
   }
 
   /**
@@ -2496,7 +2343,6 @@
       // TODO also log an error message.
       WindowMsg msg = new WindowMsg(rcvWindow);
       session.publish(msg);
-      outAckCount++;
     } else
     {
       // Both the LDAP server and the replication server believes that the
@@ -2556,17 +2402,10 @@
    */
   public Set<Short> getConnectedDirectoryServerIds()
   {
-    return directoryServers.keySet();
-  }
-
-  /**
-   * Get the map of connected DSs
-   * (to the RS represented by this server handler).
-   * @return The map of connected DSs
-   */
-  public Map<Short, LightweightServerHandler> getConnectedDSs()
-  {
-    return directoryServers;
+    synchronized (directoryServers)
+    {
+      return directoryServers.keySet();
+    }
   }
 
   /**
@@ -2716,4 +2555,32 @@
   {
     return protocolVersion;
   }
+
+  /**
+   * Add the DSinfos of the connected Directory Servers
+   * to the List of DSInfo provided as a parameter.
+   *
+   * @param dsInfos The List of DSInfo that should be updated
+   *                with the DSInfo for the directoryServers
+   *                connected to this ServerHandler.
+   */
+  public void addDSInfos(List<DSInfo> dsInfos)
+  {
+    synchronized (directoryServers)
+    {
+      for (LightweightServerHandler ls : directoryServers.values())
+      {
+        dsInfos.add(ls.toDSInfo());
+      }
+    }
+  }
+
+  /**
+   * Gets the group id of the server represented by this object.
+   * @return The group id of the server represented by this object.
+   */
+  public byte getGroupId()
+  {
+    return groupId;
+  }
 }

--
Gitblit v1.10.0