mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
19.56.2009 ed847e95ab009b3f8a7b57636aa3bbe977bf875d
Fix #4270 ECL Should not establish connections between RSes
13 files modified
348 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 33 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java 3 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/MessageHandler.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 46 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 14 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 96 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 88 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java 35 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -4533,6 +4533,7 @@
      {
        AttributeType atype = DirectoryServer.getAttributeType(name);
        List<Attribute> attrs = entry.getAttribute(atype);
        if (attrs != null)
        for (Attribute a : attrs)
          newattrs.add(a);
      }
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -28,6 +28,7 @@
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.io.InputStream;
@@ -103,7 +104,7 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("Closing SocketSession." +
          Thread.currentThread().getStackTrace());
          stackTraceToSingleLineString(new Exception("Stack:")));
    }
    if (plainSocket != null && !plainSocket.isClosed())
    {
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -780,4 +780,14 @@
    }
    return startSessionMsg;
  }
  /**
   * Process message of a remote server changing his status.
   * @param csMsg The message containing the new status
   */
  public void receiveNewStatus(ChangeStatusMsg csMsg)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.processNewStatus(this, csMsg);
  }
}
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -371,19 +371,17 @@
          replicationServerURL,
          getServiceId(),
          maxRcvWindow,
          replicationServerDomain.getDbServerState(),
          new ServerState(),
          protocolVersion,
          localGenerationId,
          sslEncryption,
          getLocalGroupId(),
          replicationServerDomain.
          getReplicationServer().getDegradedStatusThreshold(),
          0,
          replicationServer.getWeight(),
          replicationServerDomain.getConnectedLDAPservers().size());
          0);
      session.publish(outReplServerStartDSMsg);
      return outReplServerStartDSMsg;
    }
  }
@@ -462,9 +460,11 @@
        processStartFromRemote(inECLStartMsg);
      // lock with timeout
      if (this.replicationServerDomain != null)
      lockDomain(true);
      this.localGenerationId = replicationServerDomain.getGenerationId();
//    this.localGenerationId = replicationServerDomain.getGenerationId();
      this.localGenerationId = -1;
      // send start to remote
      StartMsg outStartMsg =
@@ -708,7 +708,7 @@
  {
    HashMap<String,ServerState> startStates = new HashMap<String,ServerState>();
    ReplicationServer rs = replicationServerDomain.getReplicationServer();
    ReplicationServer rs = this.replicationServer;
    // Parse the provided cookie and overwrite startState from it.
    if ((providedCookie != null) && (providedCookie.length()!=0))
@@ -740,6 +740,10 @@
          if (excludedServiceIDs.contains(rsd.getBaseDn()))
            continue;
          // skip unused domains
          if (rsd.getDbServerState().isEmpty())
            continue;
          // Creates the new domain context
          DomainContext newDomainCtxt = new DomainContext();
          newDomainCtxt.active = true;
@@ -826,6 +830,7 @@
   */
  private void registerIntoDomain()
  {
    if (replicationServerDomain!=null)
    replicationServerDomain.registerHandler(this);
  }
@@ -877,7 +882,7 @@
    String str = serverURL + " " + String.valueOf(serverId);
    return "Connected External Changelog Server " + str +
    ",cn=" + replicationServerDomain.getMonitorInstanceName();
    ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT;
  }
  /**
@@ -982,8 +987,7 @@
      sendWindow = new Semaphore(sendWindowSize);
      // create reader
      reader = new ServerReader(session, serverId,
          this, replicationServerDomain);
      reader = new ServerReader(session, serverId, this);
      reader.start();
      if (writer == null)
@@ -1132,8 +1136,7 @@
    ECLUpdateMsg oldestChange = null;
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + "," + this +
      TRACER.debugInfo("In cn=changelog" + this +
          " getNextECLUpdate starts: " + dumpState());
    try
@@ -1443,8 +1446,7 @@
    // starvation of changelog messages
    // all domain have been unactived means are covered
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + "," + this + " closeInitPhase(): "
      TRACER.debugInfo("In cn=changelog" + "," + this + " closeInitPhase(): "
          + dumpState());
    // go to persistent phase if one
@@ -1503,8 +1505,7 @@
    }
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName()
      TRACER.debugInfo("In cn=changelog"
          + "," + this + " getOldestChangeFromDomainCtxts() returns " +
          ((oldest!=-1)?domainCtxts[oldest].nextMsg:"-1"));
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -193,6 +193,7 @@
          // Can't do much more : ignore
        }
      }
      if (replicationServerDomain!=null)
      replicationServerDomain.stopServer(handler);
    }
  }
@@ -243,7 +244,7 @@
            // session is null in pusherOnly mode
            // Done is used to end phase 1
            session.publish(new DoneMsg(
                replicationServerDomain.getReplicationServer().getServerId(),
                handler.getReplicationServerId(),
                handler.getServerId()), protocolVersion);
          }
        }
opends/src/server/org/opends/server/replication/server/ExternalChangeLogSessionImpl.java
@@ -92,6 +92,7 @@
   */
  public void close()
  {
    if (handler.getDomain() != null)
    handler.getDomain().stopServer(handler);
  }
}
opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -747,6 +747,7 @@
    else
    {
      this.serviceId = serviceId;
      if (!serviceId.equalsIgnoreCase("cn=changelog"))
      this.replicationServerDomain = getDomain(true, isDataServer);
    }
  }
@@ -802,4 +803,12 @@
    return replicationServer.getGroupId();
  }
  /**
   * Get the serverId of the hosting replication server.
   * @return the replication serverId.
   */
  public int getReplicationServerId()
  {
    return this.replicationServerId;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -1596,11 +1596,9 @@
      while (rsdi.hasNext())
      {
        ReplicationServerDomain domain = rsdi.next();
        if (excludedServiceIDs.contains(domain.getBaseDn()))
        {
        if ((excludedServiceIDs != null) &&
            excludedServiceIDs.contains(domain.getBaseDn()))
          continue;
        }
        ChangeNumber domainEligibleCN = domain.getEligibleCN();
        String dates = "";
@@ -1830,6 +1828,9 @@
            && (excludedServiceIDs.contains(rsd.getBaseDn())))
          continue;
        if (rsd.getDbServerState().isEmpty())
          continue;
        result.update(rsd.getBaseDn(), rsd.getEligibleState(
            getEligibleCN()));
      }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -194,7 +194,6 @@
  {
    super("Replication Server " + replicationServer.getReplicationPort() + " "
        + baseDn + " " + replicationServer.getServerId());
    this.baseDn = baseDn;
    this.replicationServer = replicationServer;
    this.assuredTimeoutTimer = new Timer("Replication Assured Timer for " +
@@ -2316,7 +2315,7 @@
    /*
     * Store DS connected to remote RS and update information about the peer RS
     */
    handler.receiveTopoInfoFromRS(topoMsg);
    handler.processTopoInfoFromRS(topoMsg);
    /*
     * Handle generation id
@@ -2904,8 +2903,19 @@
  /**
   * Computes the eligible server state for the domain.
   * Consists in taking the most recent change from the dbServerState and the
   * eligibleCN.
   *
   *     s1               s2          s3
   *     --               --          --
   *                                 cn31
   *     cn15
   *
   *  ----------------------------------------- eligibleCN
   *     cn14
   *                     cn26
   *     cn13
   *
   * The eligibleState is : s1;cn14 / s2;cn26 / s3;cn31
   *
   * @param eligibleCN The provided eligibleCN.
   * @return The computed eligible server state.
   */
@@ -2915,6 +2925,8 @@
    ServerState dbState = this.getDbServerState();
    // The result is initialized from the dbState.
    // From it, we don't want to kepp the changes newer than eligibleCN.
    result = dbState.duplicate();
    if (eligibleCN != null)
@@ -2924,32 +2936,44 @@
      {
        int sid = it.next();
        DbHandler h = sourceDbHandlers.get(sid);
        ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
        ChangeNumber mostRecentDbCN = dbState.getMaxChangeNumber(sid);
        try
        {
          if (eligibleCN.older(dbCN))
          // Is the most recent change in the Db newer than eligible CN ?
          // if yes (like cn15 in the example above, then we have to go back
          // to the Db and look for the change older than  eligible CN (cn14)
          if (eligibleCN.olderOrEqual(mostRecentDbCN))
          {
            // some CN exist in the db newer than eligible CN
            // let's get it
            ReplicationIterator ri = h.generateIterator(eligibleCN);
            // let's try to seek the first change <= eligibleCN
            ReplicationIterator ri = null;
            try
            {
              ri = h.generateIterator(eligibleCN);
              if ((ri != null) && (ri.getChange()!=null))
              {
                ChangeNumber newCN = ri.getChange().getChangeNumber();
                result.update(newCN);
              }
            }
            catch(Exception e)
            {
              // there's no change older than eligibleCN (case of s3/cn31)
              result.update(new ChangeNumber(0,0,sid));
            }
            finally
            {
              if (ri != null)
              {
              ri.releaseCursor();
              ri = null;
            }
          }
          }
          else
          {
            // no CN exist in the db newer than elligible CN
            result.update(dbCN);
            // for this serverid, all changes in the ChangelogDb are holder
            // than eligibleCN , the most recent in the db is our guy.
            result.update(mostRecentDbCN);
          }
        }
        catch(Exception e)
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -638,7 +638,7 @@
   *
   * @param topoMsg The received topology message
   */
  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
  public void processTopoInfoFromRS(TopologyMsg topoMsg)
  {
    // Store info for remote RS
    List<RSInfo> rsInfos = topoMsg.getRsList();
@@ -836,4 +836,16 @@
    session.publish(msg);
  }
  /**
   * Receives a topology msg.
   * @param topoMsg The message received.
   * @throws DirectoryException when it occurs.
   * @throws IOException when it occurs.
   */
  public void receiveTopoInfoFromRS(TopologyMsg topoMsg)
  throws DirectoryException, IOException
  {
    if (replicationServerDomain != null)
      replicationServerDomain.receiveTopoInfoFromRS(topoMsg, this, true);
  }
}
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -46,13 +46,15 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
@@ -297,6 +299,7 @@
    // replication server domain
    if (oldGenerationId != -100)
    {
      if (replicationServerDomain!=null)
      replicationServerDomain.changeGenerationId(oldGenerationId, false);
    }
  }
@@ -363,8 +366,7 @@
      writer = new ServerWriter(session, serverId,
          this, replicationServerDomain);
      reader = new ServerReader(session, serverId,
          this, replicationServerDomain);
      reader = new ServerReader(session, serverId, this);
      reader.start();
      writer.start();
@@ -947,6 +949,20 @@
  }
  /**
   * Processes a change time heartbeat msg.
   *
   * @param msg The message to be processed.
   */
  public void process(ChangeTimeHeartbeatMsg msg)
  {
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + this +
          " processes received msg:\n" + msg);
    replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg);
  }
  /**
   * Process the reception of a WindowProbeMsg message.
   *
   * @param  windowProbeMsg The message to process.
@@ -1231,8 +1247,7 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
        replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() + ", " +
        this.replicationServer.getMonitorInstanceName() + ", " +
        this.getClass().getSimpleName() + " " + this + ":" +
        "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
        "\nAND REPLIED:\n" + outStartMsg.toString());
@@ -1251,8 +1266,7 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
        replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() + ", " +
        this.replicationServer.getMonitorInstanceName() + ", " +
        this.getClass().getSimpleName() + " " + this + ":" +
        "\nSH START HANDSHAKE SENT("+ this +
        "):\n" + outStartMsg.toString()+
@@ -1272,8 +1286,7 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + ":" +
          "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
          "\nAND REPLIED:\n" + outTopoMsg.toString());
@@ -1292,8 +1305,7 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + ":" +
          "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
          "\nAND RECEIVED:\n" + inTopoMsg.toString());
@@ -1312,8 +1324,7 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
          "\nAND REPLIED:\n" + outTopoMsg.toString());
@@ -1328,8 +1339,7 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
    }
@@ -1345,11 +1355,63 @@
    if (debugEnabled())
    {
      TRACER.debugInfo("In " +
          replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + ", " +
          this.replicationServer.getMonitorInstanceName() + ", " +
          this.getClass().getSimpleName() + " " + this + " :" +
          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
          inStartECLSessionMsg.toString());
    }
  }
  /**
   * Process a Ack message received.
   * @param ack the message received.
   */
  public void processAck(AckMsg ack)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.processAck(ack, this);
  }
  /**
   * Get the reference generation id (associated with the changes in the db).
   * @return the reference generation id.
   */
  public long getReferenceGenId()
  {
    long refgenid = -1;
    if (replicationServerDomain!=null)
      refgenid = replicationServerDomain.getGenerationId();
    return refgenid;
  }
  /**
   * Process a ResetGenerationIdMsg message received.
   * @param msg the message received.
   */
  public void processResetGenId(ResetGenerationIdMsg msg)
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.resetGenerationId(this, msg);
  }
  /**
   * Put a new update message received.
   * @param update the update message received.
   * @throws IOException when it occurs.
   */
  public void put(UpdateMsg update)
  throws IOException
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.put(update, this);
  }
  /**
   * Stop this handler.
   */
  public void doStop()
  {
    if (replicationServerDomain!=null)
      replicationServerDomain.stopServer(this);
  }
}
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -60,7 +60,6 @@
  private int serverId;
  private ProtocolSession session;
  private ServerHandler handler;
  private ReplicationServerDomain replicationServerDomain;
  /**
   * Constructor for the LDAP server reader part of the replicationServer.
@@ -68,20 +67,15 @@
   * @param session The ProtocolSession from which to read the data.
   * @param serverId The server ID of the server from which we read messages.
   * @param handler The server handler for this server reader.
   * @param replicationServerDomain The ReplicationServerDomain for this server
   *        reader.
   */
  public ServerReader(ProtocolSession session, int serverId,
    ServerHandler handler,
    ReplicationServerDomain replicationServerDomain)
      ServerHandler handler)
  {
    super("Replication Reader Thread for handler of " +
        handler.toString() +
        " in " + replicationServerDomain);
    super("Replication Reader Thread for RS handler " +
        handler.getMonitorInstanceName());
    this.session = session;
    this.serverId = serverId;
    this.handler = handler;
    this.replicationServerDomain = replicationServerDomain;
  }
  /**
@@ -109,15 +103,14 @@
          if (debugEnabled())
          {
            TRACER.debugInfo("In " + replicationServerDomain + " " +
                getName() + " receives " + msg);
            TRACER.debugInfo("In " + getName() + " receives " + msg);
          }
          if (msg instanceof AckMsg)
          {
            AckMsg ack = (AckMsg) msg;
            handler.checkWindow();
            replicationServerDomain.processAck(ack, handler);
            handler.processAck(ack);
          } else if (msg instanceof UpdateMsg)
          {
            boolean filtered = false;
@@ -141,22 +134,19 @@
              if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
                (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
              {
                long referenceGenerationId =
                  replicationServerDomain.getGenerationId();
                long referenceGenerationId = handler.getReferenceGenId();
                if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
                  logError(ERR_IGNORING_UPDATE_FROM_DS_BADGENID.get(
                    Integer.toString(replicationServerDomain.
                    getReplicationServer().getServerId()),
                    replicationServerDomain.getBaseDn(),
                    Integer.toString(handler.getReplicationServerId()),
                    handler.getServiceId(),
                    ((UpdateMsg) msg).getChangeNumber().toString(),
                    Integer.toString(handler.getServerId()),
                    Long.toString(referenceGenerationId),
                    Long.toString(handler.getGenerationId())));
                if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
                  logError(ERR_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                    Integer.toString(replicationServerDomain.
                    getReplicationServer().getServerId()),
                    replicationServerDomain.getBaseDn(),
                    Integer.toString(handler.getReplicationServerId()),
                    handler.getServiceId(),
                    ((UpdateMsg) msg).getChangeNumber().toString(),
                    Integer.toString(handler.getServerId())));
                filtered = true;
@@ -167,17 +157,15 @@
               * Ignore updates from RS with bad gen id
               * (no system managed status for a RS)
               */
              long referenceGenerationId =
                replicationServerDomain.getGenerationId();
              long referenceGenerationId =handler.getReferenceGenId();
              if ((referenceGenerationId > 0) &&
                (referenceGenerationId != handler.getGenerationId()))
              {
                logError(
                    ERR_IGNORING_UPDATE_FROM_RS.get(
                        Integer.toString(
                            replicationServerDomain.getReplicationServer().
                            getServerId()),
                        replicationServerDomain.getBaseDn(),
                            handler.getReplicationServerId()),
                        handler.getServiceId(),
                        ((UpdateMsg) msg).getChangeNumber().toString(),
                        Integer.toString(handler.getServerId()),
                        Long.toString(referenceGenerationId),
@@ -190,7 +178,7 @@
            {
              UpdateMsg update = (UpdateMsg) msg;
              handler.decAndCheckWindow();
              replicationServerDomain.put(update, handler);
              handler.put(update);
            }
          } else if (msg instanceof WindowMsg)
          {
@@ -220,7 +208,7 @@
          } else if (msg instanceof ResetGenerationIdMsg)
          {
            ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
            replicationServerDomain.resetGenerationId(handler, genIdMsg);
            handler.processResetGenId(genIdMsg);
          } else if (msg instanceof WindowProbeMsg)
          {
            WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
@@ -231,8 +219,7 @@
            try
            {
              ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
              replicationServerDomain.receiveTopoInfoFromRS(topoMsg,
                  rsh, true);
              rsh.receiveTopoInfoFromRS(topoMsg);
            }
            catch(Exception e)
            {
@@ -247,13 +234,13 @@
            try
            {
              DataServerHandler dsh = (DataServerHandler)handler;
              replicationServerDomain.processNewStatus(dsh, csMsg);
              dsh.receiveNewStatus(csMsg);
            }
            catch(Exception e)
            {
              errMessage =
                ERR_RECEIVED_CHANGE_STATUS_NOT_FROM_DS.get(
                    replicationServerDomain.getBaseDn(),
                    handler.getServiceId(),
                    Integer.toString(handler.getServerId()),
                    csMsg.toString());
              logError(errMessage);
@@ -270,8 +257,7 @@
          } else if (msg instanceof ChangeTimeHeartbeatMsg)
          {
            ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
            replicationServerDomain.processChangeTimeHeartbeatMsg(handler,
                cthbMsg);
            handler.process(cthbMsg);
          } else if (msg instanceof StopMsg)
          {
            // Peer server is properly disconnecting: go out of here to
@@ -280,8 +266,7 @@
            {
              TRACER.debugInfo(handler.toString() + " has properly " +
                "disconnected from this replication server " +
                Integer.toString(replicationServerDomain.getReplicationServer().
                getServerId()));
                Integer.toString(handler.getReplicationServerId()));
            }
            return;
          } else if (msg == null)
@@ -300,9 +285,8 @@
          // we just trash the message and log the event for debug purpose,
          // then continue receiving messages.
          if (debugEnabled())
            TRACER.debugInfo("In " + replicationServerDomain.
              getReplicationServer().
              getMonitorInstanceName() + ":" + e.getMessage());
            TRACER.debugInfo(
                "In " + this.getName() + " " + stackTraceToSingleLineString(e));
        }
      }
    }
@@ -315,24 +299,16 @@
       */
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " reader IO EXCEPTION for serverID=" + serverId + " " +
          this + " " +
          stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage());
            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      errMessage = ERR_SERVER_BADLY_DISCONNECTED.get(handler.toString(),
        Integer.toString(replicationServerDomain.
        getReplicationServer().getServerId()));
        Integer.toString(handler.getReplicationServerId()));
      logError(errMessage);
    }
    catch (ClassNotFoundException e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " reader CNF EXCEPTION serverID=" + serverId +
          stackTraceToSingleLineString(e));
            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the connection.
@@ -344,10 +320,7 @@
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS <" + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() +
          " server reader EXCEPTION serverID=" + serverId +
          " " + stackTraceToSingleLineString(e));
          "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the connection.
@@ -364,11 +337,6 @@
       */
      try
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "In RS " + replicationServerDomain.getReplicationServer().
            getMonitorInstanceName() +
            this + " is closing the session");
        if (handler.getProtocolVersion() >=
          ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
@@ -382,12 +350,14 @@
            // Anyway, going to close session, so nothing to do
          }
        }
        if (debugEnabled())
          TRACER.debugInfo("In " + this.getName() + " closing the session");
        session.close();
      } catch (IOException e)
      {
      // ignore
      }
      replicationServerDomain.stopServer(handler);
      handler.doStop();
      if (debugEnabled())
      {
        TRACER.debugInfo(this.getName() + " stopped " + errMessage);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -37,6 +37,7 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.opends.server.loggers.ErrorLogger.logError;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
@@ -230,7 +231,7 @@
    // Write additional changes and read ECL from a provided draft change number
    ts = ECLCompatWriteReadAllOps(5);replicationServer.clearDb();
    // ECLIncludeAttributes();replicationServer.clearDb();
    ECLIncludeAttributes();replicationServer.clearDb();
  }
  @Test(enabled=true, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
@@ -260,7 +261,7 @@
    ECLRemoteNonEmpty();replicationServer.clearDb();
    // Test with a mix of domains, a mix of DSes
    //ECLTwoDomains();
    ECLTwoDomains();
    // changelogDb required NOT empty for the next test
    // Test ECL after changelog triming
@@ -1340,10 +1341,10 @@
      // test success
      waitOpResult(searchOp, ResultCode.SUCCESS);
      // test 4 entries returned
      String cookie1 = "o=test:"+cn1.toString()+";o=test2:;";
      String cookie2 = "o=test:"+cn2.toString()+";o=test2:;";
      String cookie3 = "o=test:"+cn3.toString()+";o=test2:;";
      String cookie4 = "o=test:"+cn4.toString()+";o=test2:;";
      String cookie1 = "o=test:"+cn1.toString()+";";
      String cookie2 = "o=test:"+cn2.toString()+";";
      String cookie3 = "o=test:"+cn3.toString()+";";
      String cookie4 = "o=test:"+cn4.toString()+";";
      assertEquals(searchOp.getSearchEntries().size(), 4);
      LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
@@ -2017,7 +2018,7 @@
      s2 = new Socket("127.0.0.1", TestCaseUtils.getServerLdapPort());
      org.opends.server.tools.LDAPReader r2 = new org.opends.server.tools.LDAPReader(s2);
      LDAPWriter w2 = new LDAPWriter(s2);
      s2.setSoTimeout(15000);
      s2.setSoTimeout(30000);
      bindAsManager(w2, r2);
      // Connects and bind
@@ -2483,6 +2484,7 @@
  private static void removeTestBackend2(Backend backend)
  {
    MemoryBackend memoryBackend = (MemoryBackend)backend;
    memoryBackend.clearMemoryBackend();
    memoryBackend.finalizeBackend();
    DirectoryServer.deregisterBackend(memoryBackend);
  }
@@ -2757,7 +2759,7 @@
            checkValue(resultEntry,"replicaidentifier","1201");
            checkValue(resultEntry,"targetdn","uid="+tn+"1," + TEST_ROOT_DN_STRING);
            checkValue(resultEntry,"changetype","delete");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";o=test2:;");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";");
            checkValue(resultEntry,"targetentryuuid",user1entryUUID);
            checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+0));
            checkValue(resultEntry,"targetuniqueid","11111111-11121113-11141111-11111115");
@@ -2775,7 +2777,7 @@
            checkValue(resultEntry,"replicaidentifier","1201");
            checkValue(resultEntry,"targetdn","uid="+tn+"2," + TEST_ROOT_DN_STRING);
            checkValue(resultEntry,"changetype","add");
            checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;");
            checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";");
            checkValue(resultEntry,"targetentryuuid",user1entryUUID);
            checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+1));
          } else if (i==3)
@@ -2790,7 +2792,7 @@
            checkValue(resultEntry,"replicaidentifier","1201");
            checkValue(resultEntry,"targetdn","uid="+tn+"3," + TEST_ROOT_DN_STRING);
            checkValue(resultEntry,"changetype","modify");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";o=test2:;");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";");
            checkValue(resultEntry,"targetentryuuid",user1entryUUID);
            checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+2));
          } else if (i==4)
@@ -2802,7 +2804,7 @@
            checkValue(resultEntry,"replicaidentifier","1201");
            checkValue(resultEntry,"targetdn","uid="+tn+"4," + TEST_ROOT_DN_STRING);
            checkValue(resultEntry,"changetype","modrdn");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";o=test2:;");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";");
            checkValue(resultEntry,"targetentryuuid",user1entryUUID);
            checkValue(resultEntry,"newrdn","uid="+tn+"new4");
            checkValue(resultEntry,"newsuperior",TEST_ROOT_DN_STRING2);
@@ -2849,7 +2851,7 @@
            checkValue(resultEntry,"replicaidentifier","1201");
            checkValue(resultEntry,"targetdn","uid="+tn+"1," + TEST_ROOT_DN_STRING);
            checkValue(resultEntry,"changetype","delete");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";o=test2:;");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn1.toString()+";");
            checkValue(resultEntry,"targetentryuuid",user1entryUUID);
            checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+0));
            checkValue(resultEntry,"targetuniqueid","11111111-11121113-11141111-11111115");
@@ -2867,7 +2869,7 @@
            checkValue(resultEntry,"replicaidentifier","1201");
            checkValue(resultEntry,"targetdn","uid="+tn+"2," + TEST_ROOT_DN_STRING);
            checkValue(resultEntry,"changetype","add");
            checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;");
            checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";");
            checkValue(resultEntry,"targetentryuuid",user1entryUUID);
            checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+1));
          } else if (i==3)
@@ -2882,7 +2884,7 @@
            checkValue(resultEntry,"replicaidentifier","1201");
            checkValue(resultEntry,"targetdn","uid="+tn+"3," + TEST_ROOT_DN_STRING);
            checkValue(resultEntry,"changetype","modify");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";o=test2:;");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn3.toString()+";");
            checkValue(resultEntry,"targetentryuuid",user1entryUUID);
            checkValue(resultEntry,"changenumber",String.valueOf(firstDraftChangeNumber+2));
          } else if (i==4)
@@ -2894,7 +2896,7 @@
            checkValue(resultEntry,"replicaidentifier","1201");
            checkValue(resultEntry,"targetdn","uid="+tn+"4," + TEST_ROOT_DN_STRING);
            checkValue(resultEntry,"changetype","modrdn");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";o=test2:;");
            checkValue(resultEntry,"changelogcookie","o=test:"+cn4.toString()+";");
            checkValue(resultEntry,"targetentryuuid",user1entryUUID);
            checkValue(resultEntry,"newrdn","uid="+tn+"new4");
            checkValue(resultEntry,"newsuperior",TEST_ROOT_DN_STRING2);
@@ -2967,7 +2969,7 @@
          checkValue(resultEntry,"replicationcsn",gblCN.toString());
          checkValue(resultEntry,"replicaidentifier","1201");
          checkValue(resultEntry,"changetype","add");
          checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";o=test2:;");
          checkValue(resultEntry,"changelogcookie","o=test:"+gblCN.toString()+";");
          checkValue(resultEntry,"targetentryuuid",user1entryUUID);
          checkValue(resultEntry,"changenumber","6");
        }
@@ -3603,6 +3605,7 @@
      waitOpResult(searchOp, ResultCode.SUCCESS);
      LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
      sleep(2000);
      assertTrue(entries != null);
      String s = tn + " entries returned= ";