From 40c19698a7c27ef73ae26439a962c62e373813a8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Dec 2013 10:43:35 +0000
Subject: [PATCH] OPENDJ-1231 (CR-2724) Make the Medium Consistency Point support replica heartbeats
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java | 13 ++
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 41 +++++--
opends/src/messages/messages/replication.properties | 10 +
opends/src/server/org/opends/server/replication/common/ServerState.java | 207 ++++++++++++++++-------------------------
4 files changed, 129 insertions(+), 142 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index bc9d697..5d66ce7 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -527,4 +527,12 @@
for DS(%d) to connect to because it was the only one standing after all tests
NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d)
SEVERE_ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN_235=Could not \
- create replica database because the changelog database is shutting down
\ No newline at end of file
+ create replica database because the changelog database is shutting down
+FATAL_ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION_236=An unexpected error \
+ forced the %s thread to shutdown: %s. \
+ The changeNumber attribute will not move forward anymore. \
+ You can reenable this thread by first setting the "compute-change-number" \
+ property to false and then back to true
+FATAL_ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ_237=Aborting initialization: \
+ expected the newest change number index record CSN '%s' to be equal to \
+ the CSN read from the replica DBs '%s'
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 761d21c..81cb59d 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -30,11 +30,14 @@
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.types.ByteString;
+import org.opends.server.util.StaticUtils;
/**
* This class is used to associate serverIds with {@link CSN}s.
@@ -46,7 +49,8 @@
{
/** Associates a serverId with a CSN. */
- private final Map<Integer, CSN> serverIdToCSN = new HashMap<Integer, CSN>();
+ private final ConcurrentMap<Integer, CSN> serverIdToCSN =
+ new ConcurrentSkipListMap<Integer, CSN>();
/**
* Whether the state has been saved to persistent storage. It starts at true,
* and moves to false when an update is made to the current object.
@@ -68,10 +72,7 @@
*/
public void clear()
{
- synchronized (serverIdToCSN)
- {
- serverIdToCSN.clear();
- }
+ serverIdToCSN.clear();
}
@@ -154,14 +155,27 @@
saved = false;
- synchronized (serverIdToCSN)
+ final int serverId = csn.getServerId();
+ while (true)
{
- final int serverId = csn.getServerId();
final CSN existingCSN = serverIdToCSN.get(serverId);
- if (existingCSN == null || csn.isNewerThan(existingCSN))
+ if (existingCSN == null)
{
- serverIdToCSN.put(serverId, csn);
- return true;
+ if (serverIdToCSN.putIfAbsent(serverId, csn) == null)
+ {
+ return true;
+ }
+ // oops, a concurrent modification happened, run the same process again
+ continue;
+ }
+ else if (csn.isNewerThan(existingCSN))
+ {
+ if (serverIdToCSN.replace(serverId, existingCSN, csn))
+ {
+ return true;
+ }
+ // oops, a concurrent modification happened, run the same process again
+ continue;
}
return false;
}
@@ -203,19 +217,10 @@
if (expectedCSN == null)
return false;
- synchronized (serverIdToCSN)
+ if (serverIdToCSN.remove(expectedCSN.getServerId(), expectedCSN))
{
- for (Iterator<CSN> iter = serverIdToCSN.values().iterator();
- iter.hasNext();)
- {
- final CSN csn = iter.next();
- if (expectedCSN.equals(csn))
- {
- iter.remove();
- saved = false;
- return true;
- }
- }
+ saved = false;
+ return true;
}
return false;
}
@@ -232,11 +237,8 @@
return false;
}
- synchronized (serverIdToCSN)
- {
- clear();
- return update(serverState);
- }
+ clear();
+ return update(serverState);
}
/**
@@ -252,18 +254,13 @@
*/
public Set<String> toStringSet()
{
- Set<String> set = new HashSet<String>();
-
- synchronized (serverIdToCSN)
+ final Set<String> result = new HashSet<String>();
+ for (CSN change : serverIdToCSN.values())
{
- for (CSN change : serverIdToCSN.values())
- {
- Date date = new Date(change.getTime());
- set.add(change + " " + date + " " + change.getTime());
- }
+ Date date = new Date(change.getTime());
+ result.add(change + " " + date + " " + change.getTime());
}
-
- return set;
+ return result;
}
/**
@@ -274,14 +271,10 @@
*/
public ArrayList<ByteString> toASN1ArrayList()
{
- ArrayList<ByteString> values = new ArrayList<ByteString>(0);
-
- synchronized (serverIdToCSN)
+ final ArrayList<ByteString> values = new ArrayList<ByteString>(0);
+ for (CSN csn : serverIdToCSN.values())
{
- for (CSN csn : serverIdToCSN.values())
- {
- values.add(ByteString.valueOf(csn.toString()));
- }
+ values.add(ByteString.valueOf(csn.toString()));
}
return values;
}
@@ -301,21 +294,18 @@
public void writeTo(ASN1Writer writer, short protocolVersion)
throws IOException
{
- synchronized (serverIdToCSN)
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
{
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+ for (CSN csn : serverIdToCSN.values())
{
- for (CSN csn : serverIdToCSN.values())
- {
- writer.writeOctetString(csn.toByteString());
- }
+ writer.writeOctetString(csn.toByteString());
}
- else
+ }
+ else
+ {
+ for (CSN csn : serverIdToCSN.values())
{
- for (CSN csn : serverIdToCSN.values())
- {
- writer.writeOctetString(csn.toString());
- }
+ writer.writeOctetString(csn.toString());
}
}
}
@@ -327,19 +317,7 @@
@Override
public String toString()
{
- StringBuilder buffer = new StringBuilder();
-
- synchronized (serverIdToCSN)
- {
- for (CSN change : serverIdToCSN.values())
- {
- buffer.append(change).append(" ");
- }
- if (!serverIdToCSN.isEmpty())
- buffer.deleteCharAt(buffer.length() - 1);
- }
-
- return buffer.toString();
+ return StaticUtils.collectionToString(serverIdToCSN.values(), " ");
}
/**
@@ -357,26 +335,6 @@
}
/**
- * Returns the largest (most recent) {@code CSN} in this server state.
- *
- * @return The largest (most recent) {@code CSN} in this server state.
- */
- public CSN getMaxCSN()
- {
- CSN maxCSN = null;
-
- synchronized (serverIdToCSN)
- {
- for (CSN csn : serverIdToCSN.values())
- {
- if (maxCSN == null || csn.isNewerThan(maxCSN))
- maxCSN = csn;
- }
- }
- return maxCSN;
- }
-
- /**
* Add the tail into resultByteArray at position pos.
*/
private int addByteArray(byte[] tail, byte[] resultByteArray, int pos)
@@ -397,37 +355,38 @@
*/
public byte[] getBytes() throws UnsupportedEncodingException
{
- synchronized (serverIdToCSN)
+ // copy to protect from concurrent updates
+ // that could change the number of elements in the Map
+ final Map<Integer, CSN> copy = new HashMap<Integer, CSN>(serverIdToCSN);
+
+ final int size = copy.size();
+ List<String> idList = new ArrayList<String>(size);
+ List<String> csnList = new ArrayList<String>(size);
+ // calculate the total length needed to allocate byte array
+ int length = 0;
+ for (Entry<Integer, CSN> entry : copy.entrySet())
{
- final int size = serverIdToCSN.size();
- List<String> idList = new ArrayList<String>(size);
- List<String> csnList = new ArrayList<String>(size);
- // calculate the total length needed to allocate byte array
- int length = 0;
- for (Entry<Integer, CSN> entry : serverIdToCSN.entrySet())
- {
- // serverId is useless, see comment in ServerState ctor
- final String serverIdStr = String.valueOf(entry.getKey());
- idList.add(serverIdStr);
- length += serverIdStr.length() + 1;
+ // serverId is useless, see comment in ServerState ctor
+ final String serverIdStr = String.valueOf(entry.getKey());
+ idList.add(serverIdStr);
+ length += serverIdStr.length() + 1;
- final String csnStr = entry.getValue().toString();
- csnList.add(csnStr);
- length += csnStr.length() + 1;
- }
- byte[] result = new byte[length];
-
- // write the server state into the byte array
- int pos = 0;
- for (int i = 0; i < size; i++)
- {
- String str = idList.get(i);
- pos = addByteArray(str.getBytes("UTF-8"), result, pos);
- str = csnList.get(i);
- pos = addByteArray(str.getBytes("UTF-8"), result, pos);
- }
- return result;
+ final String csnStr = entry.getValue().toString();
+ csnList.add(csnStr);
+ length += csnStr.length() + 1;
}
+ byte[] result = new byte[length];
+
+ // write the server state into the byte array
+ int pos = 0;
+ for (int i = 0; i < size; i++)
+ {
+ String str = idList.get(i);
+ pos = addByteArray(str.getBytes("UTF-8"), result, pos);
+ str = csnList.get(i);
+ pos = addByteArray(str.getBytes("UTF-8"), result, pos);
+ }
+ return result;
}
/**
@@ -488,11 +447,8 @@
*/
public ServerState duplicate()
{
- ServerState newState = new ServerState();
- synchronized (serverIdToCSN)
- {
- newState.serverIdToCSN.putAll(serverIdToCSN);
- }
+ final ServerState newState = new ServerState();
+ newState.serverIdToCSN.putAll(serverIdToCSN);
return newState;
}
@@ -571,14 +527,11 @@
{
final CSN csn = new CSN(timestamp, 0, 0);
final ServerState newState = new ServerState();
- synchronized (serverIdToCSN)
+ for (CSN change : serverIdToCSN.values())
{
- for (CSN change : serverIdToCSN.values())
+ if (change.isOlderThan(csn))
{
- if (change.isOlderThan(csn))
- {
- newState.serverIdToCSN.put(change.getServerId(), change);
- }
+ newState.serverIdToCSN.put(change.getServerId(), change);
}
}
return newState;
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 9979535..4c46759 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -41,13 +41,14 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.DN;
-import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
+import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
/**
* Thread responsible for inserting replicated changes into the ChangeNumber
@@ -101,7 +102,9 @@
/**
* Holds the most recent changes or heartbeats received for each serverIds
- * cross domain.
+ * cross domain. changes are stored in the replicaDBs and hence persistent,
+ * heartbeats are transient because they are easily constructed on normal
+ * operations.
*/
private final MultiDomainServerState lastSeenUpdates =
new MultiDomainServerState();
@@ -276,9 +279,9 @@
final UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
if (!record.getCSN().equals(newestRecord.getCSN()))
{
- // TODO JNR i18n safety check, should never happen
- throw new ChangelogException(Message.raw("They do not equal! recordCSN="
- + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()));
+ throw new ChangelogException(
+ ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(newestRecord
+ .getCSN().toStringUI(), record.getCSN().toStringUI()));
}
mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
nextChangeForInsertDBCursor.next();
@@ -406,7 +409,7 @@
final CSN csn = msg.getCSN();
final DN baseDN = nextChangeForInsertDBCursor.getData();
// FIXME problem: what if the serverId is not part of the ServerState?
- // right now, thread will be blocked
+ // right now, change number will be blocked
if (!canMoveForwardMediumConsistencyPoint(baseDN))
{
// the oldest record to insert is newer than the medium consistency
@@ -451,17 +454,27 @@
}
}
}
- catch (ChangelogException e)
+ catch (RuntimeException e)
{
- if (debugEnabled())
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- // TODO JNR error message i18n
+ // Nothing can be done about it.
+ // Rely on the DirectoryThread uncaught exceptions handler
+ // for logging error + alert.
+ // Message logged here gives corrective information to the administrator.
+ Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
+ getClass().getSimpleName(), stackTraceToSingleLineString(e));
+ TRACER.debugError(msg.toString());
+ throw e;
}
- catch (DirectoryException e)
+ catch (Exception e)
{
- if (debugEnabled())
- TRACER.debugCaught(DebugLogLevel.ERROR, e);
- // TODO JNR error message i18n
+ // Nothing can be done about it.
+ // Rely on the DirectoryThread uncaught exceptions handler
+ // for logging error + alert.
+ // Message logged here gives corrective information to the administrator.
+ Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
+ getClass().getSimpleName(), stackTraceToSingleLineString(e));
+ TRACER.debugError(msg.toString());
+ throw new RuntimeException(e);
}
finally
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
index f491e04..68cf2c1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -162,9 +162,22 @@
final ServerState state = new ServerState();
assertTrue(state.update(csn1Server1));
+ // test 1
assertFalse(state.removeCSN(null));
+
+ // test 2
+ assertEquals(csn1Server1, state.getCSN(1));
assertFalse(state.removeCSN(csn2Server1));
+ assertEquals(csn1Server1, state.getCSN(1));
+
+ // test 3
+ assertNull(state.getCSN(2));
assertFalse(state.removeCSN(csn1Server2));
+ assertNull(state.getCSN(2));
+
+ // test 4
+ assertEquals(csn1Server1, state.getCSN(1));
assertTrue(state.removeCSN(csn1Server1));
+ assertNull(state.getCSN(1));
}
}
--
Gitblit v1.10.0