mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.29.2014 e4ae0bdaa33ad0568be2e3b7f1e6a8d5695513e8
OPENDJ-1453 Make the Medium Consistency Point support replicas temporarily leaving the topology


Partly reverted r10688: Using ChangeTimeHeartbeatMsg to convey replica offline messages is not a good approach.
Future change will introduce a new message type: ReplicaOfflineMsg.
4 files modified
107 ■■■■ changed files
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java 52 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 9 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java 22 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 24 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -39,48 +39,22 @@
 */
public class ChangeTimeHeartbeatMsg extends ReplicationMsg
{
  private static final byte NORMAL_HEARTBEAT = 0;
  private static final byte REPLICA_OFFLINE_HEARTBEAT = 1;
  /**
   * The CSN containing the change time.
   */
  private final CSN csn;
  /**
   * The CSN containing the change time.
   */
  private final byte eventType;
  private ChangeTimeHeartbeatMsg(CSN csn, byte eventType)
  {
    this.csn = csn;
    this.eventType = eventType;
  }
  /**
   * Factory method that builds a change time heartbeat message providing the
   * change time value in a CSN.
   * Constructor of a Change Time Heartbeat message providing the change time
   * value in a CSN.
   *
   * @param csn
   *          The provided CSN.
   * @return a new ChangeTimeHeartbeatMsg
   */
  public static ChangeTimeHeartbeatMsg heartbeatMsg(CSN csn)
  public ChangeTimeHeartbeatMsg(CSN csn)
  {
    return new ChangeTimeHeartbeatMsg(csn, NORMAL_HEARTBEAT);
  }
  /**
   * Factory method that builds a change time heartbeat message for a replica
   * going offline.
   *
   * @param offlineCSN
   *          the serverId and timestamp of the replica going offline
   * @return a new ChangeTimeHeartbeatMsg
   */
  public static ChangeTimeHeartbeatMsg replicaOfflineMsg(CSN offlineCSN)
  {
    return new ChangeTimeHeartbeatMsg(offlineCSN, REPLICA_OFFLINE_HEARTBEAT);
    this.csn = csn;
  }
  /**
@@ -94,17 +68,6 @@
  }
  /**
   * Returns whether this is a replica offline message.
   *
   * @return true if this is a replica offline message, false if this is a
   *         regular heartbeat message.
   */
  public boolean isReplicaOfflineMsg()
  {
    return eventType == REPLICA_OFFLINE_HEARTBEAT;
  }
  /**
   * Creates a message from a provided byte array.
   *
   * @param in
@@ -130,9 +93,6 @@
      csn = version >= REPLICATION_PROTOCOL_V7
          ? scanner.nextCSN()
          : scanner.nextCSNUTF8();
      eventType = version >= REPLICATION_PROTOCOL_V8
          ? scanner.nextByte()
          : NORMAL_HEARTBEAT;
      if (!scanner.isEmpty())
      {
@@ -163,10 +123,6 @@
    final ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csns(1));
    builder.appendByte(MSG_TYPE_CT_HEARTBEAT);
    builder.appendCSN(csn);
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V8)
    {
      builder.appendByte(eventType);
    }
    return builder.toByteArray();
  }
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2508,14 +2508,7 @@
  {
    try
    {
      if (msg.isReplicaOfflineMsg())
      {
        domainDB.notifyReplicaOffline(baseDN, msg.getCSN());
      }
      else
      {
        domainDB.replicaHeartbeat(baseDN, msg.getCSN());
      }
      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
    }
    catch (ChangelogException e)
    {
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -107,7 +107,7 @@
        if (now > session.getLastPublishTime() + heartbeatInterval)
        {
          final CSN csn = new CSN(now, 0, serverId);
          session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn));
          session.publish(new ChangeTimeHeartbeatMsg(csn));
          lastHeartbeatTime = csn.getTime();
        }
@@ -137,26 +137,6 @@
          }
        }
      }
      if (shutdown)
      {
        /*
         * Shortcoming: this thread is restarted each time the DS reconnects,
         * e.g. during load balancing. This is not that much of a problem
         * because the ChangeNumberIndexer tolerates receiving replica offline
         * heartbeats and then receiving messages back again.
         */
        /*
         * However, during shutdown we need to be sure that all pending client
         * operations have either completed or have been aborted before shutting
         * down replication. Otherwise, the medium consistency will move forward
         * without knowing about these changes.
         */
        final long now = System.currentTimeMillis();
        final int seqNum = lastHeartbeatTime == now ? 1 : 0;
        final CSN offlineCSN = new CSN(now, seqNum, serverId);
        session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN));
      }
    }
    catch (IOException e)
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -829,28 +829,18 @@
  @Test
  public void changeTimeHeartbeatMsgTest() throws Exception
  {
    final short v1 = REPLICATION_PROTOCOL_V1;
    final short v7 = REPLICATION_PROTOCOL_V7;
    final short v8 = REPLICATION_PROTOCOL_V8;
    final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
    final ChangeTimeHeartbeatMsg heartbeatMsg = ChangeTimeHeartbeatMsg.heartbeatMsg(csn);
    assertCTHearbeatMsg(heartbeatMsg, v1, false);
    assertCTHearbeatMsg(heartbeatMsg, v7, false);
    assertCTHearbeatMsg(heartbeatMsg, v8, false);
    final ChangeTimeHeartbeatMsg offlineMsg = ChangeTimeHeartbeatMsg.replicaOfflineMsg(csn);
    assertCTHearbeatMsg(offlineMsg, v1, false);
    assertCTHearbeatMsg(offlineMsg, v7, false);
    assertCTHearbeatMsg(offlineMsg, v8, true);
    final ChangeTimeHeartbeatMsg heartbeatMsg = new ChangeTimeHeartbeatMsg(csn);
    assertCTHearbeatMsg(heartbeatMsg, REPLICATION_PROTOCOL_V1);
    assertCTHearbeatMsg(heartbeatMsg, REPLICATION_PROTOCOL_V7);
  }
  private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg heartbeatMsg,
      short version, boolean expected) throws DataFormatException
  private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg expectedMsg,
      short version) throws DataFormatException
  {
    final byte[] bytes = heartbeatMsg.getBytes(version);
    final byte[] bytes = expectedMsg.getBytes(version);
    ChangeTimeHeartbeatMsg decodedMsg = new ChangeTimeHeartbeatMsg(bytes, version);
    assertEquals(decodedMsg.isReplicaOfflineMsg(), expected);
    assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
  }
  /**