| | |
| | | import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; |
| | | import org.opends.server.replication.protocol.Session; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | /** |
| | | * This thread publishes a {@link ChangeTimeHeartbeatMsg} on a given protocol |
| | |
| | | * @param session The session on which heartbeats are to be sent. |
| | | * @param heartbeatInterval The interval between heartbeats sent |
| | | * (in milliseconds). |
| | | * @param serverId2 The serverId of the sender domain. |
| | | * @param serverId The serverId of the sender domain. |
| | | */ |
| | | public CTHeartbeatPublisherThread(String threadName, Session session, |
| | | long heartbeatInterval, int serverId2) |
| | | long heartbeatInterval, int serverId) |
| | | { |
| | | super(threadName); |
| | | this.session = session; |
| | | this.heartbeatInterval = heartbeatInterval; |
| | | this.serverId = serverId2; |
| | | this.serverId = serverId; |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public void run() |
| | | { |
| | | long lastHeartbeatTime = 0; |
| | | try |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | |
| | | |
| | | while (!shutdown) |
| | | { |
| | | long now = System.currentTimeMillis(); |
| | | final CSN csn = new CSN(TimeThread.getTime(), 0, serverId); |
| | | ChangeTimeHeartbeatMsg ctHeartbeatMsg = new ChangeTimeHeartbeatMsg(csn); |
| | | |
| | | final long now = System.currentTimeMillis(); |
| | | if (now > session.getLastPublishTime() + heartbeatInterval) |
| | | { |
| | | session.publish(ctHeartbeatMsg); |
| | | final CSN csn = new CSN(now, 0, serverId); |
| | | session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn)); |
| | | lastHeartbeatTime = csn.getTime(); |
| | | } |
| | | |
| | | long sleepTime = session.getLastPublishTime() + heartbeatInterval - now; |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (shutdown) |
| | | { |
| | | /* |
| | | * Shortcoming: this thread is restarted each time the DS reconnects, |
| | | * e.g. during load balancing. This is not that much of a problem |
| | | * because the ChangeNumberIndexer tolerates receiving replica offline |
| | | * heartbeats and then receiving messages back again. |
| | | */ |
| | | /* |
| | | * However, during shutdown we need to be sure that all pending client |
| | | * operations have either completed or have been aborted before shutting |
| | | * down replication. Otherwise, the medium consistency will move forward |
| | | * without knowing about these changes. |
| | | */ |
| | | final long now = System.currentTimeMillis(); |
| | | final int seqNum = lastHeartbeatTime == now ? 1 : 0; |
| | | final CSN offlineCSN = new CSN(now, seqNum, serverId); |
| | | session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN)); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |