mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
12.12.2015 2b89a960a5dcfb1d6ee803eafd944f4ea5ce5c71
OPENDJ-2106 : Entry is not replicated on second instance in 2 RS topology

Code review: Matthew Swift



New use case failing (Ludovic Poitou asked the question about it, and Christophe Sovant added a functional test for it):
1. 2 DS-RSs
2. backup DS1
3. add entry1 to DS1
4. add entry2 to DS2
5. stop DS-RS2
6. restore DS2 from DS1 backup
7. start DS-RS2
8. ldapsearch entry1 on DS2 - OK
9. ldapsearch entry2 on DS2 - Not OK

Removing the "if (record.getCSN().getServerId() != sendToServerId)" condition in MessageHandler.fillLateQueue() makes the test pass.



Matthew Swift: "have you verified that the DS state is immediately updated on receipt of a change from the DS?"
JNR: Reading and Writing are decoupled.
The RS thread reading from DS updates the serverState in memory before the change is persisted in the changelogDb.
Then the RS thread writing to DS (or RS) reads from the changelogDb, and checks whether the remote DS (or RS) knows the change before sending it. The check is performed against the same in-memory serverState which prevents sending the message back.

Matthew Swift: "why is the DS not resent missing changes as part of recovery?"
JNR: The DS has been restored from an older backup so it no longer knows about changes performed after the backup.
So it cannot send back its own changes that happened after the backup was taken.
The RS does not resend changes back to DS as part of recovery because of the condition "if (record.getCSN().getServerId() != sendToServerId)" in MessageHandler.fillLateQueue().




MessageHandler.java:
In fillLateQueue(), removed the if statement + removed the now useless "sendToServerId" parameter + as a consequence updated the rest of the code
2 files modified
21 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java 19 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java 2 ●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -233,16 +233,13 @@
  }
  /**
   * Get the next update that must be sent to the consumer from the message queue or from the
   * database.
   * Get the next update that must be sent to the consumer from the message queue or from the database.
   *
   * @param sendToServerId
   *          serverId of the replica where changes will be sent
   * @return The next update that must be sent to the consumer, or {@code null} when queue is empty
   * @throws ChangelogException
   *            If a problem occurs when reading the changelog
   */
  protected UpdateMsg getNextMessage(int sendToServerId) throws ChangelogException
  protected UpdateMsg getNextMessage() throws ChangelogException
  {
    while (activeConsumer)
    {
@@ -275,7 +272,7 @@
           *           restart as usual
           *   load this change on the delayList
           */
          fillLateQueue(sendToServerId);
          fillLateQueue();
          if (lateQueue.isEmpty())
          {
            // we could not find any messages in the changelog
@@ -373,17 +370,13 @@
   * Fills the late queue with the most recent changes, accepting only the
   * messages from provided replica ids.
   */
  private void fillLateQueue(int sendToServerId) throws ChangelogException
  private void fillLateQueue() throws ChangelogException
  {
    try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
    {
      while (cursor.next() && isLateQueueBelowThreshold())
      {
        final UpdateMsg record = cursor.getRecord();
        if (record.getCSN().getServerId() != sendToServerId)
        {
          lateQueue.add(record);
        }
        lateQueue.add(cursor.getRecord());
      }
    }
  }
@@ -433,7 +426,7 @@
  private CSN findOldestCSNFromReplicaDBs()
  {
    try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
    try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState))
    {
      while (cursor.next())
      {
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
@@ -931,7 +931,7 @@
   */
  public UpdateMsg take() throws ChangelogException
  {
    final UpdateMsg msg = getNextMessage(serverId);
    final UpdateMsg msg = getNextMessage();
    acquirePermitInSendWindow();