mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.26.2014 de36fa06856d8d04652401bb24e49c3259aef154
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -35,7 +35,6 @@
import org.opends.server.replication.protocol.Session;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -76,15 +75,15 @@
   * @param session The session on which heartbeats are to be sent.
   * @param heartbeatInterval The interval between heartbeats sent
   *                          (in milliseconds).
   * @param serverId2 The serverId of the sender domain.
   * @param serverId The serverId of the sender domain.
   */
  public CTHeartbeatPublisherThread(String threadName, Session session,
                  long heartbeatInterval, int serverId2)
                  long heartbeatInterval, int serverId)
  {
    super(threadName);
    this.session = session;
    this.heartbeatInterval = heartbeatInterval;
    this.serverId = serverId2;
    this.serverId = serverId;
  }
  /**
@@ -93,6 +92,7 @@
  @Override
  public void run()
  {
    long lastHeartbeatTime = 0;
    try
    {
      if (debugEnabled())
@@ -103,13 +103,12 @@
      while (!shutdown)
      {
        long now = System.currentTimeMillis();
        final CSN csn = new CSN(TimeThread.getTime(), 0, serverId);
        ChangeTimeHeartbeatMsg ctHeartbeatMsg = new ChangeTimeHeartbeatMsg(csn);
        final long now = System.currentTimeMillis();
        if (now > session.getLastPublishTime() + heartbeatInterval)
        {
          session.publish(ctHeartbeatMsg);
          final CSN csn = new CSN(now, 0, serverId);
          session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn));
          lastHeartbeatTime = csn.getTime();
        }
        long sleepTime = session.getLastPublishTime() + heartbeatInterval - now;
@@ -138,6 +137,26 @@
          }
        }
      }
      if (shutdown)
      {
        /*
         * Shortcoming: this thread is restarted each time the DS reconnects,
         * e.g. during load balancing. This is not that much of a problem
         * because the ChangeNumberIndexer tolerates receiving replica offline
         * heartbeats and then receiving messages back again.
         */
        /*
         * However, during shutdown we need to be sure that all pending client
         * operations have either completed or have been aborted before shutting
         * down replication. Otherwise, the medium consistency will move forward
         * without knowing about these changes.
         */
        final long now = System.currentTimeMillis();
        final int seqNum = lastHeartbeatTime == now ? 1 : 0;
        final CSN offlineCSN = new CSN(now, seqNum, serverId);
        session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN));
      }
    }
    catch (IOException e)
    {