OPENDJ-1453 Make the Medium Consistency Point support replicas temporarily leaving the topology
Partly reverted r10688: Using ChangeTimeHeartbeatMsg to convey replica offline messages is not a good approach.
Future change will introduce a new message type: ReplicaOfflineMsg.
| | |
| | | */ |
| | | public class ChangeTimeHeartbeatMsg extends ReplicationMsg |
| | | { |
| | | private static final byte NORMAL_HEARTBEAT = 0; |
| | | private static final byte REPLICA_OFFLINE_HEARTBEAT = 1; |
| | | |
| | | /** |
| | | * The CSN containing the change time. |
| | | */ |
| | | private final CSN csn; |
| | | /** |
| | | * The CSN containing the change time. |
| | | */ |
| | | private final byte eventType; |
| | | |
| | | private ChangeTimeHeartbeatMsg(CSN csn, byte eventType) |
| | | { |
| | | this.csn = csn; |
| | | this.eventType = eventType; |
| | | } |
| | | |
| | | /** |
| | | * Factory method that builds a change time heartbeat message providing the |
| | | * change time value in a CSN. |
| | | * Constructor of a Change Time Heartbeat message providing the change time |
| | | * value in a CSN. |
| | | * |
| | | * @param csn |
| | | * The provided CSN. |
| | | * @return a new ChangeTimeHeartbeatMsg |
| | | */ |
| | | public static ChangeTimeHeartbeatMsg heartbeatMsg(CSN csn) |
| | | public ChangeTimeHeartbeatMsg(CSN csn) |
| | | { |
| | | return new ChangeTimeHeartbeatMsg(csn, NORMAL_HEARTBEAT); |
| | | } |
| | | |
| | | /** |
| | | * Factory method that builds a change time heartbeat message for a replica |
| | | * going offline. |
| | | * |
| | | * @param offlineCSN |
| | | * the serverId and timestamp of the replica going offline |
| | | * @return a new ChangeTimeHeartbeatMsg |
| | | */ |
| | | public static ChangeTimeHeartbeatMsg replicaOfflineMsg(CSN offlineCSN) |
| | | { |
| | | return new ChangeTimeHeartbeatMsg(offlineCSN, REPLICA_OFFLINE_HEARTBEAT); |
| | | this.csn = csn; |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns whether this is a replica offline message. |
| | | * |
| | | * @return true if this is a replica offline message, false if this is a |
| | | * regular heartbeat message. |
| | | */ |
| | | public boolean isReplicaOfflineMsg() |
| | | { |
| | | return eventType == REPLICA_OFFLINE_HEARTBEAT; |
| | | } |
| | | |
| | | /** |
| | | * Creates a message from a provided byte array. |
| | | * |
| | | * @param in |
| | |
| | | csn = version >= REPLICATION_PROTOCOL_V7 |
| | | ? scanner.nextCSN() |
| | | : scanner.nextCSNUTF8(); |
| | | eventType = version >= REPLICATION_PROTOCOL_V8 |
| | | ? scanner.nextByte() |
| | | : NORMAL_HEARTBEAT; |
| | | |
| | | if (!scanner.isEmpty()) |
| | | { |
| | |
| | | final ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csns(1)); |
| | | builder.appendByte(MSG_TYPE_CT_HEARTBEAT); |
| | | builder.appendCSN(csn); |
| | | if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V8) |
| | | { |
| | | builder.appendByte(eventType); |
| | | } |
| | | return builder.toByteArray(); |
| | | } |
| | | |
| | |
| | | { |
| | | try |
| | | { |
| | | if (msg.isReplicaOfflineMsg()) |
| | | { |
| | | domainDB.notifyReplicaOffline(baseDN, msg.getCSN()); |
| | | } |
| | | else |
| | | { |
| | | domainDB.replicaHeartbeat(baseDN, msg.getCSN()); |
| | | } |
| | | domainDB.replicaHeartbeat(baseDN, msg.getCSN()); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | if (now > session.getLastPublishTime() + heartbeatInterval) |
| | | { |
| | | final CSN csn = new CSN(now, 0, serverId); |
| | | session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn)); |
| | | session.publish(new ChangeTimeHeartbeatMsg(csn)); |
| | | lastHeartbeatTime = csn.getTime(); |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | if (shutdown) |
| | | { |
| | | /* |
| | | * Shortcoming: this thread is restarted each time the DS reconnects, |
| | | * e.g. during load balancing. This is not that much of a problem |
| | | * because the ChangeNumberIndexer tolerates receiving replica offline |
| | | * heartbeats and then receiving messages back again. |
| | | */ |
| | | /* |
| | | * However, during shutdown we need to be sure that all pending client |
| | | * operations have either completed or have been aborted before shutting |
| | | * down replication. Otherwise, the medium consistency will move forward |
| | | * without knowing about these changes. |
| | | */ |
| | | final long now = System.currentTimeMillis(); |
| | | final int seqNum = lastHeartbeatTime == now ? 1 : 0; |
| | | final CSN offlineCSN = new CSN(now, seqNum, serverId); |
| | | session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN)); |
| | | } |
| | | } |
| | | catch (IOException e) |
| | | { |
| | |
| | | @Test |
| | | public void changeTimeHeartbeatMsgTest() throws Exception |
| | | { |
| | | final short v1 = REPLICATION_PROTOCOL_V1; |
| | | final short v7 = REPLICATION_PROTOCOL_V7; |
| | | final short v8 = REPLICATION_PROTOCOL_V8; |
| | | |
| | | final CSN csn = new CSN(System.currentTimeMillis(), 0, 42); |
| | | final ChangeTimeHeartbeatMsg heartbeatMsg = ChangeTimeHeartbeatMsg.heartbeatMsg(csn); |
| | | assertCTHearbeatMsg(heartbeatMsg, v1, false); |
| | | assertCTHearbeatMsg(heartbeatMsg, v7, false); |
| | | assertCTHearbeatMsg(heartbeatMsg, v8, false); |
| | | |
| | | final ChangeTimeHeartbeatMsg offlineMsg = ChangeTimeHeartbeatMsg.replicaOfflineMsg(csn); |
| | | assertCTHearbeatMsg(offlineMsg, v1, false); |
| | | assertCTHearbeatMsg(offlineMsg, v7, false); |
| | | assertCTHearbeatMsg(offlineMsg, v8, true); |
| | | final ChangeTimeHeartbeatMsg heartbeatMsg = new ChangeTimeHeartbeatMsg(csn); |
| | | assertCTHearbeatMsg(heartbeatMsg, REPLICATION_PROTOCOL_V1); |
| | | assertCTHearbeatMsg(heartbeatMsg, REPLICATION_PROTOCOL_V7); |
| | | } |
| | | |
| | | private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg heartbeatMsg, |
| | | short version, boolean expected) throws DataFormatException |
| | | private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg expectedMsg, |
| | | short version) throws DataFormatException |
| | | { |
| | | final byte[] bytes = heartbeatMsg.getBytes(version); |
| | | final byte[] bytes = expectedMsg.getBytes(version); |
| | | ChangeTimeHeartbeatMsg decodedMsg = new ChangeTimeHeartbeatMsg(bytes, version); |
| | | assertEquals(decodedMsg.isReplicaOfflineMsg(), expected); |
| | | assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN()); |
| | | } |
| | | |
| | | /** |