mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.43.2013 40c19698a7c27ef73ae26439a962c62e373813a8
OPENDJ-1231 (CR-2724) Make the Medium Consistency Point support replica heartbeats


Checkpoint commit that:
- adds i18n
- improves comments and javadocs
- improved ServerState class to internally use a lock free implementation


replication.properties:
Added 2 error messages.

ServerState.java:
Made the implementation use ConcurrentMap + removed all synchronized blocks + significantly change the code in update(), removeCSN() and toString().
Removed getMaxCSN() (never used).

ServerStateTest.java:
Added asserts to testRemoveCSN().
4 files modified
271 ■■■■ changed files
opends/src/messages/messages/replication.properties 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java 207 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 41 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java 13 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -527,4 +527,12 @@
 for DS(%d) to connect to because it was the only one standing after all tests
NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d)
SEVERE_ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN_235=Could not \
 create replica database because the changelog database is shutting down
 create replica database because the changelog database is shutting down
FATAL_ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION_236=An unexpected error \
 forced the %s thread to shutdown: %s. \
 The changeNumber attribute will not move forward anymore. \
 You can reenable this thread by first setting the "compute-change-number" \
 property to false and then back to true
FATAL_ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ_237=Aborting initialization: \
 expected the newest change number index record CSN '%s' to be equal to \
 the CSN read from the replica DBs '%s'
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -30,11 +30,14 @@
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.types.ByteString;
import org.opends.server.util.StaticUtils;
/**
 * This class is used to associate serverIds with {@link CSN}s.
@@ -46,7 +49,8 @@
{
  /** Associates a serverId with a CSN. */
  private final Map<Integer, CSN> serverIdToCSN = new HashMap<Integer, CSN>();
  private final ConcurrentMap<Integer, CSN> serverIdToCSN =
      new ConcurrentSkipListMap<Integer, CSN>();
  /**
   * Whether the state has been saved to persistent storage. It starts at true,
   * and moves to false when an update is made to the current object.
@@ -68,10 +72,7 @@
   */
  public void clear()
  {
    synchronized (serverIdToCSN)
    {
      serverIdToCSN.clear();
    }
    serverIdToCSN.clear();
  }
@@ -154,14 +155,27 @@
    saved = false;
    synchronized (serverIdToCSN)
    final int serverId = csn.getServerId();
    while (true)
    {
      final int serverId = csn.getServerId();
      final CSN existingCSN = serverIdToCSN.get(serverId);
      if (existingCSN == null || csn.isNewerThan(existingCSN))
      if (existingCSN == null)
      {
        serverIdToCSN.put(serverId, csn);
        return true;
        if (serverIdToCSN.putIfAbsent(serverId, csn) == null)
        {
          return true;
        }
        // oops, a concurrent modification happened, run the same process again
        continue;
      }
      else if (csn.isNewerThan(existingCSN))
      {
        if (serverIdToCSN.replace(serverId, existingCSN, csn))
        {
          return true;
        }
        // oops, a concurrent modification happened, run the same process again
        continue;
      }
      return false;
    }
@@ -203,19 +217,10 @@
    if (expectedCSN == null)
      return false;
    synchronized (serverIdToCSN)
    if (serverIdToCSN.remove(expectedCSN.getServerId(), expectedCSN))
    {
      for (Iterator<CSN> iter = serverIdToCSN.values().iterator();
          iter.hasNext();)
      {
        final CSN csn = iter.next();
        if (expectedCSN.equals(csn))
        {
          iter.remove();
          saved = false;
          return true;
        }
      }
      saved = false;
      return true;
    }
    return false;
  }
@@ -232,11 +237,8 @@
      return false;
    }
    synchronized (serverIdToCSN)
    {
      clear();
      return update(serverState);
    }
    clear();
    return update(serverState);
  }
  /**
@@ -252,18 +254,13 @@
   */
  public Set<String> toStringSet()
  {
    Set<String> set = new HashSet<String>();
    synchronized (serverIdToCSN)
    final Set<String> result = new HashSet<String>();
    for (CSN change : serverIdToCSN.values())
    {
      for (CSN change : serverIdToCSN.values())
      {
        Date date = new Date(change.getTime());
        set.add(change + " " + date + " " + change.getTime());
      }
      Date date = new Date(change.getTime());
      result.add(change + " " + date + " " + change.getTime());
    }
    return set;
    return result;
  }
  /**
@@ -274,14 +271,10 @@
   */
  public ArrayList<ByteString> toASN1ArrayList()
  {
    ArrayList<ByteString> values = new ArrayList<ByteString>(0);
    synchronized (serverIdToCSN)
    final ArrayList<ByteString> values = new ArrayList<ByteString>(0);
    for (CSN csn : serverIdToCSN.values())
    {
      for (CSN csn : serverIdToCSN.values())
      {
        values.add(ByteString.valueOf(csn.toString()));
      }
      values.add(ByteString.valueOf(csn.toString()));
    }
    return values;
  }
@@ -301,21 +294,18 @@
  public void writeTo(ASN1Writer writer, short protocolVersion)
      throws IOException
  {
    synchronized (serverIdToCSN)
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
      for (CSN csn : serverIdToCSN.values())
      {
        for (CSN csn : serverIdToCSN.values())
        {
          writer.writeOctetString(csn.toByteString());
        }
        writer.writeOctetString(csn.toByteString());
      }
      else
    }
    else
    {
      for (CSN csn : serverIdToCSN.values())
      {
        for (CSN csn : serverIdToCSN.values())
        {
          writer.writeOctetString(csn.toString());
        }
        writer.writeOctetString(csn.toString());
      }
    }
  }
@@ -327,19 +317,7 @@
  @Override
  public String toString()
  {
    StringBuilder buffer = new StringBuilder();
    synchronized (serverIdToCSN)
    {
      for (CSN change : serverIdToCSN.values())
      {
        buffer.append(change).append(" ");
      }
      if (!serverIdToCSN.isEmpty())
        buffer.deleteCharAt(buffer.length() - 1);
    }
    return buffer.toString();
    return StaticUtils.collectionToString(serverIdToCSN.values(), " ");
  }
  /**
@@ -357,26 +335,6 @@
  }
  /**
   * Returns the largest (most recent) {@code CSN} in this server state.
   *
   * @return The largest (most recent) {@code CSN} in this server state.
   */
  public CSN getMaxCSN()
  {
    CSN maxCSN = null;
    synchronized (serverIdToCSN)
    {
      for (CSN csn : serverIdToCSN.values())
      {
        if (maxCSN == null || csn.isNewerThan(maxCSN))
          maxCSN = csn;
      }
    }
    return maxCSN;
  }
  /**
   * Add the tail into resultByteArray at position pos.
   */
  private int addByteArray(byte[] tail, byte[] resultByteArray, int pos)
@@ -397,37 +355,38 @@
   */
  public byte[] getBytes() throws UnsupportedEncodingException
  {
    synchronized (serverIdToCSN)
    // copy to protect from concurrent updates
    // that could change the number of elements in the Map
    final Map<Integer, CSN> copy = new HashMap<Integer, CSN>(serverIdToCSN);
    final int size = copy.size();
    List<String> idList = new ArrayList<String>(size);
    List<String> csnList = new ArrayList<String>(size);
    // calculate the total length needed to allocate byte array
    int length = 0;
    for (Entry<Integer, CSN> entry : copy.entrySet())
    {
      final int size = serverIdToCSN.size();
      List<String> idList = new ArrayList<String>(size);
      List<String> csnList = new ArrayList<String>(size);
      // calculate the total length needed to allocate byte array
      int length = 0;
      for (Entry<Integer, CSN> entry : serverIdToCSN.entrySet())
      {
        // serverId is useless, see comment in ServerState ctor
        final String serverIdStr = String.valueOf(entry.getKey());
        idList.add(serverIdStr);
        length += serverIdStr.length() + 1;
      // serverId is useless, see comment in ServerState ctor
      final String serverIdStr = String.valueOf(entry.getKey());
      idList.add(serverIdStr);
      length += serverIdStr.length() + 1;
        final String csnStr = entry.getValue().toString();
        csnList.add(csnStr);
        length += csnStr.length() + 1;
      }
      byte[] result = new byte[length];
      // write the server state into the byte array
      int pos = 0;
      for (int i = 0; i < size; i++)
      {
        String str = idList.get(i);
        pos = addByteArray(str.getBytes("UTF-8"), result, pos);
        str = csnList.get(i);
        pos = addByteArray(str.getBytes("UTF-8"), result, pos);
      }
      return result;
      final String csnStr = entry.getValue().toString();
      csnList.add(csnStr);
      length += csnStr.length() + 1;
    }
    byte[] result = new byte[length];
    // write the server state into the byte array
    int pos = 0;
    for (int i = 0; i < size; i++)
    {
      String str = idList.get(i);
      pos = addByteArray(str.getBytes("UTF-8"), result, pos);
      str = csnList.get(i);
      pos = addByteArray(str.getBytes("UTF-8"), result, pos);
    }
    return result;
  }
  /**
@@ -488,11 +447,8 @@
   */
  public ServerState duplicate()
  {
    ServerState newState = new ServerState();
    synchronized (serverIdToCSN)
    {
      newState.serverIdToCSN.putAll(serverIdToCSN);
    }
    final ServerState newState = new ServerState();
    newState.serverIdToCSN.putAll(serverIdToCSN);
    return newState;
  }
@@ -571,14 +527,11 @@
  {
    final CSN csn = new CSN(timestamp, 0, 0);
    final ServerState newState = new ServerState();
    synchronized (serverIdToCSN)
    for (CSN change : serverIdToCSN.values())
    {
      for (CSN change : serverIdToCSN.values())
      if (change.isOlderThan(csn))
      {
        if (change.isOlderThan(csn))
        {
          newState.serverIdToCSN.put(change.getServerId(), change);
        }
        newState.serverIdToCSN.put(change.getServerId(), change);
      }
    }
    return newState;
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -41,13 +41,14 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * Thread responsible for inserting replicated changes into the ChangeNumber
@@ -101,7 +102,9 @@
  /**
   * Holds the most recent changes or heartbeats received for each serverIds
   * cross domain.
   * cross domain. changes are stored in the replicaDBs and hence persistent,
   * heartbeats are transient because they are easily constructed on normal
   * operations.
   */
  private final MultiDomainServerState lastSeenUpdates =
      new MultiDomainServerState();
@@ -276,9 +279,9 @@
      final UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
      if (!record.getCSN().equals(newestRecord.getCSN()))
      {
        // TODO JNR i18n safety check, should never happen
        throw new ChangelogException(Message.raw("They do not equal! recordCSN="
            + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()));
        throw new ChangelogException(
            ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(newestRecord
                .getCSN().toStringUI(), record.getCSN().toStringUI()));
      }
      mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
      nextChangeForInsertDBCursor.next();
@@ -406,7 +409,7 @@
          final CSN csn = msg.getCSN();
          final DN baseDN = nextChangeForInsertDBCursor.getData();
          // FIXME problem: what if the serverId is not part of the ServerState?
          // right now, thread will be blocked
          // right now, change number will be blocked
          if (!canMoveForwardMediumConsistencyPoint(baseDN))
          {
            // the oldest record to insert is newer than the medium consistency
@@ -451,17 +454,27 @@
        }
      }
    }
    catch (ChangelogException e)
    catch (RuntimeException e)
    {
      if (debugEnabled())
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      // TODO JNR error message i18n
      // Nothing can be done about it.
      // Rely on the DirectoryThread uncaught exceptions handler
      // for logging error + alert.
      // Message logged here gives corrective information to the administrator.
      Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
          getClass().getSimpleName(), stackTraceToSingleLineString(e));
      TRACER.debugError(msg.toString());
      throw e;
    }
    catch (DirectoryException e)
    catch (Exception e)
    {
      if (debugEnabled())
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      // TODO JNR error message i18n
      // Nothing can be done about it.
      // Rely on the DirectoryThread uncaught exceptions handler
      // for logging error + alert.
      // Message logged here gives corrective information to the administrator.
      Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
          getClass().getSimpleName(), stackTraceToSingleLineString(e));
      TRACER.debugError(msg.toString());
      throw new RuntimeException(e);
    }
    finally
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -162,9 +162,22 @@
    final ServerState state = new ServerState();
    assertTrue(state.update(csn1Server1));
    // test 1
    assertFalse(state.removeCSN(null));
    // test 2
    assertEquals(csn1Server1, state.getCSN(1));
    assertFalse(state.removeCSN(csn2Server1));
    assertEquals(csn1Server1, state.getCSN(1));
    // test 3
    assertNull(state.getCSN(2));
    assertFalse(state.removeCSN(csn1Server2));
    assertNull(state.getCSN(2));
    // test 4
    assertEquals(csn1Server1, state.getCSN(1));
    assertTrue(state.removeCSN(csn1Server1));
    assertNull(state.getCSN(1));
  }
}