From ed847e95ab009b3f8a7b57636aa3bbe977bf875d Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 19 Oct 2009 07:56:29 +0000
Subject: [PATCH] Fix #4270 ECL Should not establish connections between RSes

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java               |   14 +
 opends/src/server/org/opends/server/replication/server/MessageHandler.java                         |   11 +
 opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java           |    3 
 opends/src/server/org/opends/server/replication/server/ServerReader.java                           |   88 ++++--------
 opends/src/server/org/opends/server/replication/server/DataServerHandler.java                      |   10 +
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                      |    9 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java |   35 ++--
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                       |   37 ++--
 opends/src/server/org/opends/server/replication/server/ECLServerWriter.java                        |    5 
 opends/src/server/org/opends/server/replication/server/ServerHandler.java                          |   98 +++++++++++--
 opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java                     |    3 
 opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                  |    5 
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                |   50 +++++-
 13 files changed, 232 insertions(+), 136 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 78178b7..d1531e8 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4533,8 +4533,9 @@
       {
         AttributeType atype = DirectoryServer.getAttributeType(name);
         List<Attribute> attrs = entry.getAttribute(atype);
-        for (Attribute a : attrs)
-          newattrs.add(a);
+        if (attrs != null)
+          for (Attribute a : attrs)
+            newattrs.add(a);
       }
       ((DeleteMsg)msg).setEclIncludes(newattrs);
 
diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index c20d895..ff6ea07 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -28,6 +28,7 @@
 
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -103,7 +104,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("Closing SocketSession." +
-          Thread.currentThread().getStackTrace());
+          stackTraceToSingleLineString(new Exception("Stack:")));
     }
     if (plainSocket != null && !plainSocket.isClosed())
     {
diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 40fe57d..cbc6bad 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -780,4 +780,14 @@
     }
     return startSessionMsg;
   }
+
+  /**
+   * Process message of a remote server changing his status.
+   * @param csMsg The message containing the new status
+   */
+  public void receiveNewStatus(ChangeStatusMsg csMsg)
+  {
+    if (replicationServerDomain!=null)
+      replicationServerDomain.processNewStatus(this, csMsg);
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 923e1b6..d9e67d6 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -371,19 +371,17 @@
           replicationServerURL,
           getServiceId(),
           maxRcvWindow,
-          replicationServerDomain.getDbServerState(),
+          new ServerState(),
           protocolVersion,
           localGenerationId,
           sslEncryption,
           getLocalGroupId(),
-          replicationServerDomain.
-          getReplicationServer().getDegradedStatusThreshold(),
+          0,
           replicationServer.getWeight(),
-          replicationServerDomain.getConnectedLDAPservers().size());
+          0);
 
 
       session.publish(outReplServerStartDSMsg);
-
       return outReplServerStartDSMsg;
     }
   }
@@ -462,9 +460,11 @@
         processStartFromRemote(inECLStartMsg);
 
       // lock with timeout
-      lockDomain(true);
+      if (this.replicationServerDomain != null)
+        lockDomain(true);
 
-      this.localGenerationId = replicationServerDomain.getGenerationId();
+//    this.localGenerationId = replicationServerDomain.getGenerationId();
+      this.localGenerationId = -1;
 
       // send start to remote
       StartMsg outStartMsg =
@@ -708,7 +708,7 @@
   {
     HashMap<String,ServerState> startStates = new HashMap<String,ServerState>();
 
-    ReplicationServer rs = replicationServerDomain.getReplicationServer();
+    ReplicationServer rs = this.replicationServer;
 
     // Parse the provided cookie and overwrite startState from it.
     if ((providedCookie != null) && (providedCookie.length()!=0))
@@ -740,6 +740,10 @@
           if (excludedServiceIDs.contains(rsd.getBaseDn()))
             continue;
 
+          // skip unused domains
+          if (rsd.getDbServerState().isEmpty())
+            continue;
+
           // Creates the new domain context
           DomainContext newDomainCtxt = new DomainContext();
           newDomainCtxt.active = true;
@@ -826,7 +830,8 @@
    */
   private void registerIntoDomain()
   {
-    replicationServerDomain.registerHandler(this);
+    if (replicationServerDomain!=null)
+      replicationServerDomain.registerHandler(this);
   }
 
   /**
@@ -877,7 +882,7 @@
     String str = serverURL + " " + String.valueOf(serverId);
 
     return "Connected External Changelog Server " + str +
-    ",cn=" + replicationServerDomain.getMonitorInstanceName();
+    ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT;
   }
 
   /**
@@ -982,8 +987,7 @@
       sendWindow = new Semaphore(sendWindowSize);
 
       // create reader
-      reader = new ServerReader(session, serverId,
-          this, replicationServerDomain);
+      reader = new ServerReader(session, serverId, this);
       reader.start();
 
       if (writer == null)
@@ -1132,8 +1136,7 @@
     ECLUpdateMsg oldestChange = null;
 
     if (debugEnabled())
-      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + "," + this +
+      TRACER.debugInfo("In cn=changelog" + this +
           " getNextECLUpdate starts: " + dumpState());
 
     try
@@ -1443,8 +1446,7 @@
     // starvation of changelog messages
     // all domain have been unactived means are covered
     if (debugEnabled())
-      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + "," + this + " closeInitPhase(): "
+      TRACER.debugInfo("In cn=changelog" + "," + this + " closeInitPhase(): "
           + dumpState());
 
     // go to persistent phase if one
@@ -1503,8 +1505,7 @@
     }
 
     if (debugEnabled())
-      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName()
+      TRACER.debugInfo("In cn=changelog"
           + "," + this + " getOldestChangeFromDomainCtxts() returns " +
           ((oldest!=-1)?domainCtxts[oldest].nextMsg:"-1"));
 
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 2567751..31ce932 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -193,7 +193,8 @@
           // Can't do much more : ignore
         }
       }
-      replicationServerDomain.stopServer(handler);
+      if (replicationServerDomain!=null)
+        replicationServerDomain.stopServer(handler);
     }
   }
 
@@ -243,7 +244,7 @@
             // session is null in pusherOnly mode
             // Done is used to end phase 1
             session.publish(new DoneMsg(
-                replicationServerDomain.getReplicationServer().getServerId(),
+                handler.getReplicationServerId(),
                 handler.getServerId()), protocolVersion);
           }
         }
diff --git a/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java b/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
index 6dddacd..b66e8d0 100644
--- a/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
+++ b/opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
@@ -92,6 +92,7 @@
    */
   public void close()
   {
-    handler.getDomain().stopServer(handler);
+    if (handler.getDomain() != null)
+      handler.getDomain().stopServer(handler);
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 96cdaa6..4cb7198 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -747,7 +747,8 @@
     else
     {
       this.serviceId = serviceId;
-      this.replicationServerDomain = getDomain(true, isDataServer);
+      if (!serviceId.equalsIgnoreCase("cn=changelog"))
+        this.replicationServerDomain = getDomain(true, isDataServer);
     }
   }
 
@@ -802,4 +803,12 @@
     return replicationServer.getGroupId();
   }
 
+  /**
+   * Get the serverId of the hosting replication server.
+   * @return the replication serverId.
+   */
+  public int getReplicationServerId()
+  {
+    return this.replicationServerId;
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index b24c5e5..c0c6094 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1596,11 +1596,9 @@
       while (rsdi.hasNext())
       {
         ReplicationServerDomain domain = rsdi.next();
-
-        if (excludedServiceIDs.contains(domain.getBaseDn()))
-        {
+        if ((excludedServiceIDs != null) &&
+            excludedServiceIDs.contains(domain.getBaseDn()))
           continue;
-        }
 
         ChangeNumber domainEligibleCN = domain.getEligibleCN();
         String dates = "";
@@ -1830,6 +1828,9 @@
             && (excludedServiceIDs.contains(rsd.getBaseDn())))
           continue;
 
+        if (rsd.getDbServerState().isEmpty())
+          continue;
+
         result.update(rsd.getBaseDn(), rsd.getEligibleState(
             getEligibleCN()));
       }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index a301fc3..3531d5a 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -194,7 +194,6 @@
   {
     super("Replication Server " + replicationServer.getReplicationPort() + " "
         + baseDn + " " + replicationServer.getServerId());
-
     this.baseDn = baseDn;
     this.replicationServer = replicationServer;
     this.assuredTimeoutTimer = new Timer("Replication Assured Timer for " +
@@ -2316,7 +2315,7 @@
     /*
      * Store DS connected to remote RS and update information about the peer RS
      */
-    handler.receiveTopoInfoFromRS(topoMsg);
+    handler.processTopoInfoFromRS(topoMsg);
 
     /*
      * Handle generation id
@@ -2904,8 +2903,19 @@
 
   /**
    * Computes the eligible server state for the domain.
-   * Consists in taking the most recent change from the dbServerState and the
-   * eligibleCN.
+   *
+   *     s1               s2          s3
+   *     --               --          --
+   *                                 cn31
+   *     cn15
+   *
+   *  ----------------------------------------- eligibleCN
+   *     cn14
+   *                     cn26
+   *     cn13
+   *
+   * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
+   *
    * @param eligibleCN The provided eligibleCN.
    * @return The computed eligible server state.
    */
@@ -2915,6 +2925,8 @@
 
     ServerState dbState = this.getDbServerState();
 
+    // The result is initialized from the dbState.
+    // From it, we don't want to kepp the changes newer than eligibleCN.
     result = dbState.duplicate();
 
     if (eligibleCN != null)
@@ -2924,32 +2936,44 @@
       {
         int sid = it.next();
         DbHandler h = sourceDbHandlers.get(sid);
-        ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
+        ChangeNumber mostRecentDbCN = dbState.getMaxChangeNumber(sid);
         try
         {
-          if (eligibleCN.older(dbCN))
+          // Is the most recent change in the Db newer than eligible CN ?
+          // if yes (like cn15 in the example above, then we have to go back
+          // to the Db and look for the change older than  eligible CN (cn14)
+          if (eligibleCN.olderOrEqual(mostRecentDbCN))
           {
-            // some CN exist in the db newer than eligible CN
-            // let's get it
-            ReplicationIterator ri = h.generateIterator(eligibleCN);
+            // let's try to seek the first change <= eligibleCN
+            ReplicationIterator ri = null;
             try
             {
+              ri = h.generateIterator(eligibleCN);
               if ((ri != null) && (ri.getChange()!=null))
               {
                 ChangeNumber newCN = ri.getChange().getChangeNumber();
                 result.update(newCN);
               }
             }
+            catch(Exception e)
+            {
+              // there's no change older than eligibleCN (case of s3/cn31)
+              result.update(new ChangeNumber(0,0,sid));
+            }
             finally
             {
-              ri.releaseCursor();
-              ri = null;
+              if (ri != null)
+              {
+                ri.releaseCursor();
+                ri = null;
+              }
             }
           }
           else
           {
-            // no CN exist in the db newer than elligible CN
-            result.update(dbCN);
+            // for this serverid, all changes in the ChangelogDb are holder
+            // than eligibleCN , the most recent in the db is our guy.
+            result.update(mostRecentDbCN);
           }
         }
         catch(Exception e)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 6ce4627..91015f3 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -638,7 +638,7 @@
    *
    * @param topoMsg The received topology message
    */
-  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
+  public void processTopoInfoFromRS(TopologyMsg topoMsg)
   {
     // Store info for remote RS
     List<RSInfo> rsInfos = topoMsg.getRsList();
@@ -836,4 +836,16 @@
     session.publish(msg);
   }
 
+  /**
+   * Receives a topology msg.
+   * @param topoMsg The message received.
+   * @throws DirectoryException when it occurs.
+   * @throws IOException when it occurs.
+   */
+  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
+  throws DirectoryException, IOException
+  {
+    if (replicationServerDomain != null)
+      replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 489dbae..784f01b 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -46,13 +46,15 @@
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.ReplicationMsg;
 import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
 import org.opends.server.replication.protocol.ErrorMsg;
 import org.opends.server.replication.protocol.HeartbeatThread;
 import org.opends.server.replication.protocol.MonitorMsg;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
 import org.opends.server.replication.protocol.RoutableMsg;
 import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.replication.protocol.StartMsg;
@@ -297,7 +299,8 @@
     // replication server domain
     if (oldGenerationId != -100)
     {
-      replicationServerDomain.changeGenerationId(oldGenerationId, false);
+      if (replicationServerDomain!=null)
+        replicationServerDomain.changeGenerationId(oldGenerationId, false);
     }
   }
 
@@ -363,8 +366,7 @@
 
       writer = new ServerWriter(session, serverId,
           this, replicationServerDomain);
-      reader = new ServerReader(session, serverId,
-          this, replicationServerDomain);
+      reader = new ServerReader(session, serverId, this);
 
       reader.start();
       writer.start();
@@ -947,6 +949,20 @@
   }
 
   /**
+   * Processes a change time heartbeat msg.
+   *
+   * @param msg The message to be processed.
+   */
+  public void process(ChangeTimeHeartbeatMsg msg)
+  {
+    if (debugEnabled())
+      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
+          getMonitorInstanceName() + this +
+          " processes received msg:\n" + msg);
+    replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg);
+  }
+
+  /**
    * Process the reception of a WindowProbeMsg message.
    *
    * @param  windowProbeMsg The message to process.
@@ -1231,8 +1247,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-        replicationServerDomain.getReplicationServer().
-        getMonitorInstanceName() + ", " +
+        this.replicationServer.getMonitorInstanceName() + ", " +
         this.getClass().getSimpleName() + " " + this + ":" +
         "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
         "\nAND REPLIED:\n" + outStartMsg.toString());
@@ -1251,8 +1266,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-        replicationServerDomain.getReplicationServer().
-        getMonitorInstanceName() + ", " +
+        this.replicationServer.getMonitorInstanceName() + ", " +
         this.getClass().getSimpleName() + " " + this + ":" +
         "\nSH START HANDSHAKE SENT("+ this +
         "):\n" + outStartMsg.toString()+
@@ -1272,8 +1286,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + ", " +
+          this.replicationServer.getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + ":" +
           "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
           "\nAND REPLIED:\n" + outTopoMsg.toString());
@@ -1292,8 +1305,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + ", " +
+          this.replicationServer.getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + ":" +
           "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
           "\nAND RECEIVED:\n" + inTopoMsg.toString());
@@ -1312,8 +1324,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + ", " +
+          this.replicationServer.getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + " :" +
           "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
           "\nAND REPLIED:\n" + outTopoMsg.toString());
@@ -1328,8 +1339,7 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + ", " +
+          this.replicationServer.getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + " :" +
           "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
     }
@@ -1345,11 +1355,63 @@
     if (debugEnabled())
     {
       TRACER.debugInfo("In " +
-          replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() + ", " +
+          this.replicationServer.getMonitorInstanceName() + ", " +
           this.getClass().getSimpleName() + " " + this + " :" +
           "\nSH SESSION HANDSHAKE RECEIVED:\n" +
           inStartECLSessionMsg.toString());
     }
   }
+
+  /**
+   * Process a Ack message received.
+   * @param ack the message received.
+   */
+  public void processAck(AckMsg ack)
+  {
+    if (replicationServerDomain!=null)
+      replicationServerDomain.processAck(ack, this);
+  }
+
+  /**
+   * Get the reference generation id (associated with the changes in the db).
+   * @return the reference generation id.
+   */
+  public long getReferenceGenId()
+  {
+    long refgenid = -1;
+    if (replicationServerDomain!=null)
+      refgenid = replicationServerDomain.getGenerationId();
+    return refgenid;
+  }
+
+  /**
+   * Process a ResetGenerationIdMsg message received.
+   * @param msg the message received.
+   */
+  public void processResetGenId(ResetGenerationIdMsg msg)
+  {
+    if (replicationServerDomain!=null)
+      replicationServerDomain.resetGenerationId(this, msg);
+  }
+
+  /**
+   * Put a new update message received.
+   * @param update the update message received.
+   * @throws IOException when it occurs.
+   */
+  public void put(UpdateMsg update)
+  throws IOException
+  {
+    if (replicationServerDomain!=null)
+      replicationServerDomain.put(update, this);
+  }
+
+  /**
+   * Stop this handler.
+   */
+  public void doStop()
+  {
+    if (replicationServerDomain!=null)
+      replicationServerDomain.stopServer(this);
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 4b5037d..934fe4d 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -60,7 +60,6 @@
   private int serverId;
   private ProtocolSession session;
   private ServerHandler handler;
-  private ReplicationServerDomain replicationServerDomain;
 
   /**
    * Constructor for the LDAP server reader part of the replicationServer.
@@ -68,20 +67,15 @@
    * @param session The ProtocolSession from which to read the data.
    * @param serverId The server ID of the server from which we read messages.
    * @param handler The server handler for this server reader.
-   * @param replicationServerDomain The ReplicationServerDomain for this server
-   *        reader.
    */
   public ServerReader(ProtocolSession session, int serverId,
-    ServerHandler handler,
-    ReplicationServerDomain replicationServerDomain)
+      ServerHandler handler)
   {
-    super("Replication Reader Thread for handler of " +
-        handler.toString() +
-        " in " + replicationServerDomain);
+    super("Replication Reader Thread for RS handler " +
+        handler.getMonitorInstanceName());
     this.session = session;
     this.serverId = serverId;
     this.handler = handler;
-    this.replicationServerDomain = replicationServerDomain;
   }
 
   /**
@@ -109,15 +103,14 @@
 
           if (debugEnabled())
           {
-            TRACER.debugInfo("In " + replicationServerDomain + " " +
-                getName() + " receives " + msg);
+            TRACER.debugInfo("In " + getName() + " receives " + msg);
           }
 
           if (msg instanceof AckMsg)
           {
             AckMsg ack = (AckMsg) msg;
             handler.checkWindow();
-            replicationServerDomain.processAck(ack, handler);
+            handler.processAck(ack);
           } else if (msg instanceof UpdateMsg)
           {
             boolean filtered = false;
@@ -141,22 +134,19 @@
               if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
                 (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
               {
-                long referenceGenerationId =
-                  replicationServerDomain.getGenerationId();
+                long referenceGenerationId = handler.getReferenceGenId();
                 if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
                   logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
-                    Integer.toString(replicationServerDomain.
-                    getReplicationServer().getServerId()),
-                    replicationServerDomain.getBaseDn(),
+                    Integer.toString(handler.getReplicationServerId()),
+                    handler.getServiceId(),
                     ((UpdateMsg) msg).getChangeNumber().toString(),
                     Integer.toString(handler.getServerId()),
                     Long.toString(referenceGenerationId),
                     Long.toString(handler.getGenerationId())));
                 if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
                   logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
-                    Integer.toString(replicationServerDomain.
-                    getReplicationServer().getServerId()),
-                    replicationServerDomain.getBaseDn(),
+                    Integer.toString(handler.getReplicationServerId()),
+                    handler.getServiceId(),
                     ((UpdateMsg) msg).getChangeNumber().toString(),
                     Integer.toString(handler.getServerId())));
                 filtered = true;
@@ -167,17 +157,15 @@
                * Ignore updates from RS with bad gen id
                * (no system managed status for a RS)
                */
-              long referenceGenerationId =
-                replicationServerDomain.getGenerationId();
+              long referenceGenerationId =handler.getReferenceGenId();
               if ((referenceGenerationId > 0) &&
                 (referenceGenerationId != handler.getGenerationId()))
               {
                 logError(
                     ERR_IGNORING_UPDATE_FROM_RS.get(
                         Integer.toString(
-                            replicationServerDomain.getReplicationServer().
-                            getServerId()),
-                        replicationServerDomain.getBaseDn(),
+                            handler.getReplicationServerId()),
+                        handler.getServiceId(),
                         ((UpdateMsg) msg).getChangeNumber().toString(),
                         Integer.toString(handler.getServerId()),
                         Long.toString(referenceGenerationId),
@@ -190,7 +178,7 @@
             {
               UpdateMsg update = (UpdateMsg) msg;
               handler.decAndCheckWindow();
-              replicationServerDomain.put(update, handler);
+              handler.put(update);
             }
           } else if (msg instanceof WindowMsg)
           {
@@ -220,7 +208,7 @@
           } else if (msg instanceof ResetGenerationIdMsg)
           {
             ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
-            replicationServerDomain.resetGenerationId(handler, genIdMsg);
+            handler.processResetGenId(genIdMsg);
           } else if (msg instanceof WindowProbeMsg)
           {
             WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
@@ -231,8 +219,7 @@
             try
             {
               ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
-              replicationServerDomain.receiveTopoInfoFromRS(topoMsg,
-                  rsh, true);
+              rsh.receiveTopoInfoFromRS(topoMsg);
             }
             catch(Exception e)
             {
@@ -247,13 +234,13 @@
             try
             {
               DataServerHandler dsh = (DataServerHandler)handler;
-              replicationServerDomain.processNewStatus(dsh, csMsg);
+              dsh.receiveNewStatus(csMsg);
             }
             catch(Exception e)
             {
               errMessage =
                 ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(
-                    replicationServerDomain.getBaseDn(),
+                    handler.getServiceId(),
                     Integer.toString(handler.getServerId()),
                     csMsg.toString());
               logError(errMessage);
@@ -270,8 +257,7 @@
           } else if (msg instanceof ChangeTimeHeartbeatMsg)
           {
             ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
-            replicationServerDomain.processChangeTimeHeartbeatMsg(handler,
-                cthbMsg);
+            handler.process(cthbMsg);
           } else if (msg instanceof StopMsg)
           {
             // Peer server is properly disconnecting: go out of here to
@@ -280,8 +266,7 @@
             {
               TRACER.debugInfo(handler.toString() + " has properly " +
                 "disconnected from this replication server " +
-                Integer.toString(replicationServerDomain.getReplicationServer().
-                getServerId()));
+                Integer.toString(handler.getReplicationServerId()));
             }
             return;
           } else if (msg == null)
@@ -300,9 +285,8 @@
           // we just trash the message and log the event for debug purpose,
           // then continue receiving messages.
           if (debugEnabled())
-            TRACER.debugInfo("In " + replicationServerDomain.
-              getReplicationServer().
-              getMonitorInstanceName() + ":" + e.getMessage());
+            TRACER.debugInfo(
+                "In " + this.getName() + " " + stackTraceToSingleLineString(e));
         }
       }
     }
@@ -315,24 +299,16 @@
        */
       if (debugEnabled())
         TRACER.debugInfo(
-          "In RS " + replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() +
-          " reader IO EXCEPTION for serverID=" + serverId + " " +
-          this + " " +
-          stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
+            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
       errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
-        Integer.toString(replicationServerDomain.
-        getReplicationServer().getServerId()));
+        Integer.toString(handler.getReplicationServerId()));
       logError(errMessage);
     }
     catch (ClassNotFoundException e)
     {
       if (debugEnabled())
         TRACER.debugInfo(
-          "In RS <" + replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() +
-          " reader CNF EXCEPTION serverID=" + serverId +
-          stackTraceToSingleLineString(e));
+            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
       /*
        * The remote server has sent an unknown message,
        * close the connection.
@@ -344,10 +320,7 @@
     {
       if (debugEnabled())
         TRACER.debugInfo(
-          "In RS <" + replicationServerDomain.getReplicationServer().
-          getMonitorInstanceName() +
-          " server reader EXCEPTION serverID=" + serverId +
-          " " + stackTraceToSingleLineString(e));
+          "In " + this.getName() + " " + stackTraceToSingleLineString(e));
       /*
        * The remote server has sent an unknown message,
        * close the connection.
@@ -364,11 +337,6 @@
        */
       try
       {
-        if (debugEnabled())
-          TRACER.debugInfo(
-            "In RS " + replicationServerDomain.getReplicationServer().
-            getMonitorInstanceName() +
-            this + " is closing the session");
         if (handler.getProtocolVersion() >=
           ProtocolVersion.REPLICATION_PROTOCOL_V4)
         {
@@ -382,12 +350,14 @@
             // Anyway, going to close session, so nothing to do
           }
         }
+        if (debugEnabled())
+          TRACER.debugInfo("In " + this.getName() + " closing the session");
         session.close();
       } catch (IOException e)
       {
       // ignore
       }
-      replicationServerDomain.stopServer(handler);
+      handler.doStop();
       if (debugEnabled())
       {
         TRACER.debugInfo(this.getName() + " stopped " + errMessage);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
index e646e29..2be61af 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -37,6 +37,7 @@
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import static org.opends.server.loggers.ErrorLogger.logError;
 
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
@@ -230,7 +231,7 @@
     // Write additional changes and read ECL from a provided draft change number
     ts = ECLCompatWriteReadAllOps(5);replicationServer.clearDb();
 
-    // ECLIncludeAttributes();replicationServer.clearDb();
+    ECLIncludeAttributes();replicationServer.clearDb();
   }
 
   @Test(enabled=true, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
@@ -260,7 +261,7 @@
     ECLRemoteNonEmpty();replicationServer.clearDb();
 
     // Test with a mix of domains, a mix of DSes
-    //ECLTwoDomains();
+    ECLTwoDomains();
     // changelogDb required NOT empty for the next test
 
     // Test ECL after changelog triming
@@ -1340,10 +1341,10 @@
       // test success
       waitOpResult(searchOp, ResultCode.SUCCESS);
       // test 4 entries returned
-      String cookie1 = "o=test:"+cn1.toString()+";o=test2:;";
-      String cookie2 = "o=test:"+cn2.toString()+";o=test2:;";
-      String cookie3 = "o=test:"+cn3.toString()+";o=test2:;";
-      String cookie4 = "o=test:"+cn4.toString()+";o=test2:;";
+      String cookie1 = "o=test:"+cn1.toString()+";";
+      String cookie2 = "o=test:"+cn2.toString()+";";
+      String cookie3 = "o=test:"+cn3.toString()+";";
+      String cookie4 = "o=test:"+cn4.toString()+";";
 
       assertEquals(searchOp.getSearchEntries().size(), 4);
       LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
@@ -2017,7 +2018,7 @@
       s2 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort());
       org.opends.server.tools.LDAPReader r2 = new org.opends.server.tools.LDAPReader(s2);
       LDAPWriter w2 = new LDAPWriter(s2);
-      s2.setSoTimeout(15000);
+      s2.setSoTimeout(30000);
       bindAsManager(w2, r2);
 
       // Connects and bind
@@ -2483,6 +2484,7 @@
   private static void removeTestBackend2(Backend backend)
   {
     MemoryBackend memoryBackend = (MemoryBackend)backend;
+    memoryBackend.clearMemoryBackend();
     memoryBackend.finalizeBackend();
     DirectoryServer.deregisterBackend(memoryBackend);
   }
@@ -2757,7 +2759,7 @@
             checkValue(resultEntry,"replicaidentifier","1201");
             checkValue(resultEntry,"targetdn","uid="+tn+"1," + TEST_ROOT_DN_STRING);
             checkValue(resultEntry,"changetype","delete");
-            checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";o=test2:;");
+            checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";");
             checkValue(resultEntry,"targetentryuuid",user1entryUUID);
             checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+0));
             checkValue(resultEntry,"targetuniqueid","11111111-11121113-11141111-11111115");
@@ -2775,7 +2777,7 @@
             checkValue(resultEntry,"replicaidentifier","1201");
             checkValue(resultEntry,"targetdn","uid="+tn+"2," + TEST_ROOT_DN_STRING);
             checkValue(resultEntry,"changetype","add");
-            checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;");
+            checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";");
             checkValue(resultEntry,"targetentryuuid",user1entryUUID);
             checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+1));
           } else if (i==3)
@@ -2790,7 +2792,7 @@
             checkValue(resultEntry,"replicaidentifier","1201");
             checkValue(resultEntry,"targetdn","uid="+tn+"3," + TEST_ROOT_DN_STRING);
             checkValue(resultEntry,"changetype","modify");
-            checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";o=test2:;");
+            checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";");
             checkValue(resultEntry,"targetentryuuid",user1entryUUID);
             checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+2));
           } else if (i==4)
@@ -2802,7 +2804,7 @@
             checkValue(resultEntry,"replicaidentifier","1201");
             checkValue(resultEntry,"targetdn","uid="+tn+"4," + TEST_ROOT_DN_STRING);
             checkValue(resultEntry,"changetype","modrdn");
-            checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";o=test2:;");
+            checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";");
             checkValue(resultEntry,"targetentryuuid",user1entryUUID);
             checkValue(resultEntry,"newrdn","uid="+tn+"new4");
             checkValue(resultEntry,"newsuperior",TEST_ROOT_DN_STRING2);
@@ -2849,7 +2851,7 @@
             checkValue(resultEntry,"replicaidentifier","1201");
             checkValue(resultEntry,"targetdn","uid="+tn+"1," + TEST_ROOT_DN_STRING);
             checkValue(resultEntry,"changetype","delete");
-            checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";o=test2:;");
+            checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";");
             checkValue(resultEntry,"targetentryuuid",user1entryUUID);
             checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+0));
             checkValue(resultEntry,"targetuniqueid","11111111-11121113-11141111-11111115");
@@ -2867,7 +2869,7 @@
             checkValue(resultEntry,"replicaidentifier","1201");
             checkValue(resultEntry,"targetdn","uid="+tn+"2," + TEST_ROOT_DN_STRING);
             checkValue(resultEntry,"changetype","add");
-            checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;");
+            checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";");
             checkValue(resultEntry,"targetentryuuid",user1entryUUID);
             checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+1));
           } else if (i==3)
@@ -2882,7 +2884,7 @@
             checkValue(resultEntry,"replicaidentifier","1201");
             checkValue(resultEntry,"targetdn","uid="+tn+"3," + TEST_ROOT_DN_STRING);
             checkValue(resultEntry,"changetype","modify");
-            checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";o=test2:;");
+            checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";");
             checkValue(resultEntry,"targetentryuuid",user1entryUUID);
             checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+2));
           } else if (i==4)
@@ -2894,7 +2896,7 @@
             checkValue(resultEntry,"replicaidentifier","1201");
             checkValue(resultEntry,"targetdn","uid="+tn+"4," + TEST_ROOT_DN_STRING);
             checkValue(resultEntry,"changetype","modrdn");
-            checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";o=test2:;");
+            checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";");
             checkValue(resultEntry,"targetentryuuid",user1entryUUID);
             checkValue(resultEntry,"newrdn","uid="+tn+"new4");
             checkValue(resultEntry,"newsuperior",TEST_ROOT_DN_STRING2);
@@ -2967,7 +2969,7 @@
           checkValue(resultEntry,"replicationcsn",gblCN.toString());
           checkValue(resultEntry,"replicaidentifier","1201");
           checkValue(resultEntry,"changetype","add");
-          checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;");
+          checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";");
           checkValue(resultEntry,"targetentryuuid",user1entryUUID);
           checkValue(resultEntry,"changenumber","6");
         }
@@ -3603,6 +3605,7 @@
       waitOpResult(searchOp, ResultCode.SUCCESS);
       LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
 
+      sleep(2000);
 
       assertTrue(entries != null);
       String s = tn + " entries returned= ";

--
Gitblit v1.10.0