From 71ebb3724c79a7d1218c36f080acd6ee162b9cd2 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 26 Apr 2007 06:31:01 +0000
Subject: [PATCH] Rename the class with names containing synchronization or changelog. Replace most of the changelog occurences with replication server. (issue 1090)

---
 opends/src/server/org/opends/server/replication/server/ServerHandler.java |  163 +++++++++++++++++++++++++++++-------------------------
 1 files changed, 87 insertions(+), 76 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 0f918a1..386591d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,7 +50,7 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.AckMessage;
-import org.opends.server.replication.protocol.ChangelogStartMessage;
+import org.opends.server.replication.protocol.ReplServerStartMessage;
 import org.opends.server.replication.protocol.HeartbeatThread;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.RoutableMessage;
@@ -69,7 +69,7 @@
 
 /**
  * This class defines a server handler, which handles all interaction with a
- * changelog server.
+ * replication server.
  */
 public class ServerHandler extends MonitorProvider
 {
@@ -79,7 +79,7 @@
   private MsgQueue lateQueue = new MsgQueue();
   private final Map<ChangeNumber, AckMessageList> waitingAcks  =
           new HashMap<ChangeNumber, AckMessageList>();
-  private ChangelogCache changelogCache = null;
+  private ReplicationCache replicationCache = null;
   private String serverURL;
   private int outCount = 0; // number of update sent to the server
   private int inCount = 0;  // number of updates received from the server
@@ -111,7 +111,7 @@
                                        // flow controled and should
                                        // be stopped from sending messsages.
   private int saturationCount = 0;
-  private short changelogId;
+  private short replicationServerId;
 
   /**
    * The time in milliseconds between heartbeats from the replication
@@ -124,8 +124,9 @@
    */
   HeartbeatThread heartbeatThread = null;
 
-  private static final Map<ChangeNumber, ChangelogAckMessageList>
-   changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
+  private static final Map<ChangeNumber, ReplServerAckMessageList>
+   changelogsWaitingAcks =
+       new HashMap<ChangeNumber, ReplServerAckMessageList>();
 
   /**
    * Creates a new server handler instance with the provided socket.
@@ -144,22 +145,24 @@
 
   /**
    * Do the exchange of start messages to know if the remote
-   * server is an LDAP or changelog server and to exchange serverID.
+   * server is an LDAP or replication server and to exchange serverID.
    * Then create the reader and writer thread.
    *
    * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
    *               null if this is an incoming connection.
-   * @param changelogId The identifier of the changelog that creates this
-   *                    server handler.
-   * @param changelogURL The URL of the changelog that creates this
-   *                    server handler.
+   * @param replicationServerId The identifier of the replicationServer that
+   *                            creates this server handler.
+   * @param replicationServerURL The URL of the replicationServer that creates
+   *                             this server handler.
    * @param windowSize the window size that this server handler must use.
-   * @param changelog the Changelog that created this server handler.
+   * @param replicationServer the ReplicationServer that created this server
+   *                          handler.
    */
-  public void start(DN baseDn, short changelogId, String changelogURL,
-                    int windowSize, Changelog changelog)
+  public void start(DN baseDn, short replicationServerId,
+                    String replicationServerURL,
+                    int windowSize, ReplicationServer replicationServer)
   {
-    this.changelogId = changelogId;
+    this.replicationServerId = replicationServerId;
     rcvWindowSizeHalf = windowSize/2;
     maxRcvWindow = windowSize;
     rcvWindow = windowSize;
@@ -168,10 +171,10 @@
       if (baseDn != null)
       {
         this.baseDn = baseDn;
-        changelogCache = changelog.getChangelogCache(baseDn);
-        ServerState localServerState = changelogCache.getDbServerState();
-        ChangelogStartMessage msg =
-          new ChangelogStartMessage(changelogId, changelogURL,
+        replicationCache = replicationServer.getReplicationCache(baseDn);
+        ServerState localServerState = replicationCache.getDbServerState();
+        ReplServerStartMessage msg =
+          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                     baseDn, windowSize, localServerState);
 
         session.publish(msg);
@@ -225,17 +228,17 @@
 
         serverIsLDAPserver = true;
 
-        changelogCache = changelog.getChangelogCache(this.baseDn);
-        ServerState localServerState = changelogCache.getDbServerState();
-        ChangelogStartMessage myStartMsg =
-          new ChangelogStartMessage(changelogId, changelogURL,
+        replicationCache = replicationServer.getReplicationCache(this.baseDn);
+        ServerState localServerState = replicationCache.getDbServerState();
+        ReplServerStartMessage myStartMsg =
+          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                     this.baseDn, windowSize, localServerState);
         session.publish(myStartMsg);
         sendWindowSize = receivedMsg.getWindowSize();
       }
-      else if (msg instanceof ChangelogStartMessage)
+      else if (msg instanceof ReplServerStartMessage)
       {
-        ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg;
+        ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
         serverId = receivedMsg.getServerId();
         serverURL = receivedMsg.getServerURL();
         String[] splittedURL = serverURL.split(":");
@@ -244,11 +247,12 @@
         this.baseDn = receivedMsg.getBaseDn();
         if (baseDn == null)
         {
-          changelogCache = changelog.getChangelogCache(this.baseDn);
-          ServerState serverState = changelogCache.getDbServerState();
-          ChangelogStartMessage outMsg =
-            new ChangelogStartMessage(changelogId, changelogURL,
-                                      this.baseDn, windowSize, serverState);
+          replicationCache = replicationServer.getReplicationCache(this.baseDn);
+          ServerState serverState = replicationCache.getDbServerState();
+          ReplServerStartMessage outMsg =
+            new ReplServerStartMessage(replicationServerId,
+                                       replicationServerURL,
+                                       this.baseDn, windowSize, serverState);
           session.publish(outMsg);
         }
         else
@@ -262,21 +266,21 @@
         return;   // we did not recognize the message, ignore it
       }
 
-      changelogCache = changelog.getChangelogCache(this.baseDn);
+      replicationCache = replicationServer.getReplicationCache(this.baseDn);
 
       if (serverIsLDAPserver)
       {
-        changelogCache.startServer(this);
+        replicationCache.startServer(this);
       }
       else
       {
-        changelogCache.startChangelog(this);
+        replicationCache.startReplicationServer(this);
       }
 
-      writer = new ServerWriter(session, serverId, this, changelogCache);
+      writer = new ServerWriter(session, serverId, this, replicationCache);
 
       reader = new ServerReader(session, serverId, this,
-                                             changelogCache);
+                                             replicationCache);
 
       reader.start();
       writer.start();
@@ -486,11 +490,12 @@
   }
 
   /**
-   * Check if the server associated to this ServerHandler is a changelog server.
+   * Check if the server associated to this ServerHandler is a replication
+   * server.
    * @return true if the server associated to this ServerHandler is a
-   *         changelog server.
+   *         replication server.
    */
-  public boolean isChangelogServer()
+  public boolean isReplicationServer()
   {
     return (!serverIsLDAPserver);
   }
@@ -520,7 +525,7 @@
         * the sum of the number of missing changes for every dbHandler.
         */
        int totalCount = 0;
-       ServerState dbState = changelogCache.getDbServerState();
+       ServerState dbState = replicationCache.getDbServerState();
        for (short id : dbState)
        {
          int max = dbState.getMaxChangeNumber(id).getSeqnum();
@@ -554,7 +559,7 @@
    * Get an approximation of the delay by looking at the age of the odest
    * message that has not been sent to this server.
    * This is an approximation because the age is calculated using the
-   * clock of the servee where the changelog is currently running
+   * clock of the servee where the replicationServer is currently running
    * while it should be calculated using the clock of the server
    * that originally processed the change.
    *
@@ -686,7 +691,7 @@
       saturationCount = 0;
       try
       {
-        changelogCache.checkAllSaturation();
+        replicationCache.checkAllSaturation();
       }
       catch (IOException e)
       {
@@ -747,16 +752,16 @@
            *   load this change on the delayList
            *
            */
-          ChangelogIteratorComparator comparator =
-            new ChangelogIteratorComparator();
-          SortedSet<ChangelogIterator> iteratorSortedSet =
-            new TreeSet<ChangelogIterator>(comparator);
+          ReplicationIteratorComparator comparator =
+            new ReplicationIteratorComparator();
+          SortedSet<ReplicationIterator> iteratorSortedSet =
+            new TreeSet<ReplicationIterator>(comparator);
           /* fill the lateQueue */
-          for (short serverId : changelogCache.getServers())
+          for (short serverId : replicationCache.getServers())
           {
             ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
-            ChangelogIterator iterator =
-              changelogCache.getChangelogIterator(serverId, lastCsn);
+            ReplicationIterator iterator =
+              replicationCache.getChangelogIterator(serverId, lastCsn);
             if ((iterator != null) && (iterator.getChange() != null))
             {
               iteratorSortedSet.add(iterator);
@@ -764,7 +769,7 @@
           }
           while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
           {
-            ChangelogIterator iterator = iteratorSortedSet.first();
+            ReplicationIterator iterator = iteratorSortedSet.first();
             iteratorSortedSet.remove(iterator);
             lateQueue.add(iterator.getChange());
             if (iterator.next())
@@ -772,7 +777,7 @@
             else
               iterator.releaseCursor();
           }
-          for (ChangelogIterator iterator : iteratorSortedSet)
+          for (ReplicationIterator iterator : iteratorSortedSet)
           {
             iterator.releaseCursor();
           }
@@ -928,13 +933,13 @@
     }
     if (completedFlag)
     {
-      changelogCache.sendAck(changeNumber, true);
+      replicationCache.sendAck(changeNumber, true);
     }
   }
 
   /**
    * Process reception of an for an update that was received from a
-   * Changelog Server.
+   * ReplicationServer.
    *
    * @param message the ack message that was received.
    * @param ackingServerId The  id of the server that acked the change.
@@ -942,7 +947,7 @@
   public static void ackChangelog(AckMessage message, short ackingServerId)
   {
     ChangeNumber changeNumber = message.getChangeNumber();
-    ChangelogAckMessageList ackList;
+    ReplServerAckMessageList ackList;
     boolean completedFlag;
     synchronized (changelogsWaitingAcks)
     {
@@ -958,9 +963,9 @@
     }
     if (completedFlag)
     {
-      ChangelogCache changelogCache = ackList.getChangelogCache();
-      changelogCache.sendAck(changeNumber, false,
-                             ackList.getChangelogServerId());
+      ReplicationCache replicationCache = ackList.getChangelogCache();
+      replicationCache.sendAck(changeNumber, false,
+                             ackList.getReplicationServerId());
     }
   }
 
@@ -982,24 +987,26 @@
   }
 
   /**
-   * Add an update to the list of update received from a changelog server and
+   * Add an update to the list of update received from a replicationServer and
    * waiting for acks.
    *
    * @param update The update that must be added to the list.
-   * @param ChangelogServerId The identifier of the changelog that sent the
-   *                          update.
-   * @param changelogCache The ChangelogCache from which the change was
-   *                       processed and to which the ack must later be sent.
+   * @param ChangelogServerId The identifier of the replicationServer that sent
+   *                          the update.
+   * @param replicationCache The ReplicationCache from which the change was
+   *                         processed and to which the ack must later be sent.
    * @param nbWaitedAck The number of ack that must be received before
    *                    the update is fully acked.
    */
-  public static void addWaitingAck(UpdateMessage update,
-      short ChangelogServerId, ChangelogCache changelogCache, int nbWaitedAck)
+  public static void addWaitingAck(
+      UpdateMessage update,
+      short ChangelogServerId, ReplicationCache replicationCache,
+      int nbWaitedAck)
   {
-    ChangelogAckMessageList ackList =
-          new ChangelogAckMessageList(update.getChangeNumber(),
+    ReplServerAckMessageList ackList =
+          new ReplServerAckMessageList(update.getChangeNumber(),
                                       nbWaitedAck,
-                                      ChangelogServerId, changelogCache);
+                                      ChangelogServerId, replicationCache);
     synchronized(changelogsWaitingAcks)
     {
       changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
@@ -1031,7 +1038,7 @@
    * Check type of server handled.
    *
    * @return true if the handled server is an LDAP server.
-   *         false if the handled server is a changelog server
+   *         false if the handled server is a replicationServer
    */
   public boolean isLDAPserver()
   {
@@ -1074,7 +1081,7 @@
     if (serverIsLDAPserver)
       return "LDAP Server " + str;
     else
-      return "Changelog Server " + str;
+      return "Replication Server " + str;
   }
 
   /**
@@ -1122,7 +1129,7 @@
     if (serverIsLDAPserver)
       attributes.add(new Attribute("LDAP-Server", serverURL));
     else
-      attributes.add(new Attribute("Changelog-Server", serverURL));
+      attributes.add(new Attribute("ReplicationServer-Server", serverURL));
     attributes.add(new Attribute("server-id",
                                  String.valueOf(serverId)));
     attributes.add(new Attribute("base-dn",
@@ -1199,7 +1206,7 @@
       if (serverIsLDAPserver)
         localString = "Directory Server ";
       else
-        localString = "Changelog Server ";
+        localString = "Replication Server ";
 
 
       localString += serverId + " " + serverURL + " " + baseDn;
@@ -1233,7 +1240,7 @@
     {
       if (flowControl)
       {
-        if (changelogCache.restartAfterSaturation(this))
+        if (replicationCache.restartAfterSaturation(this))
         {
           flowControl = false;
         }
@@ -1277,13 +1284,15 @@
   public void process(RoutableMessage msg)
   {
     if (debugEnabled())
-      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+      debugInfo("SH(" + replicationServerId + ") forwards " +
+                 msg + " to " + serverId);
 
     logError(ErrorLogCategory.SYNCHRONIZATION,
         ErrorLogSeverity.SEVERE_ERROR,
-        "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
+        "SH(" + replicationServerId + ") receives " + msg +
+            " from " + serverId, 1);
 
-    changelogCache.process(msg, this);
+    replicationCache.process(msg, this);
   }
 
   /**
@@ -1296,11 +1305,13 @@
   public void send(RoutableMessage msg) throws IOException
   {
     if (debugEnabled())
-      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
+      debugInfo("SH(" + replicationServerId + ") forwards " +
+                 msg + " to " + serverId);
 
     logError(ErrorLogCategory.SYNCHRONIZATION,
         ErrorLogSeverity.SEVERE_ERROR,
-        "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
+        "SH(" + replicationServerId + ") forwards " +
+             msg + " to " + serverId, 1);
 
     session.publish(msg);
   }

--
Gitblit v1.10.0