From 0a9135e3444bbefde6188f456b9c9772a816096d Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 18 Sep 2013 15:17:14 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 82 ++++++++++++++++++++++-------------------
1 files changed, 44 insertions(+), 38 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index bffdd04..577c766 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -50,6 +50,7 @@
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.types.Attribute;
+import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
@@ -79,7 +80,7 @@
* and which can start receiving updates.
* <p>
* When updates are received the Replication Service calls the
- * {@link #processUpdate(UpdateMsg)} method.
+ * {@link #processUpdate(UpdateMsg, AtomicBoolean)} method.
* ReplicationDomain implementation should implement the appropriate code
* for replaying the update on the local repository.
* When fully done the subclass must call the
@@ -121,7 +122,7 @@
* All Replication Domain using this baseDN will be connected
* through the Replication Service.
*/
- private final String baseDN;
+ private final DN baseDN;
/**
* The identifier of this Replication Domain inside the
@@ -161,8 +162,8 @@
/**
* A Map used to store all the ReplicationDomains created on this server.
*/
- private static Map<String, ReplicationDomain> domains =
- new HashMap<String, ReplicationDomain>();
+ private static Map<DN, ReplicationDomain> domains =
+ new HashMap<DN, ReplicationDomain>();
/*
* Assured mode properties
@@ -335,7 +336,7 @@
* is participating to a given Replication Domain.
* @param initWindow Window used during initialization.
*/
- public ReplicationDomain(String baseDN, int serverID,int initWindow)
+ public ReplicationDomain(DN baseDN, int serverID, int initWindow)
{
this.baseDN = baseDN;
this.serverID = serverID;
@@ -358,8 +359,7 @@
* is participating to a given Replication Domain.
* @param serverState The serverState to use
*/
- public ReplicationDomain(String baseDN, int serverID,
- ServerState serverState)
+ public ReplicationDomain(DN baseDN, int serverID, ServerState serverState)
{
this.baseDN = baseDN;
this.serverID = serverID;
@@ -397,7 +397,7 @@
if (!isValidInitialStatus(initStatus))
{
Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
- baseDN, Integer.toString(serverID));
+ getBaseDNString(), Integer.toString(serverID));
logError(msg);
} else
{
@@ -426,7 +426,7 @@
if (event == StatusMachineEvent.INVALID_EVENT)
{
Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
- baseDN, Integer.toString(serverID));
+ getBaseDNString(), Integer.toString(serverID));
logError(msg);
return;
}
@@ -482,13 +482,23 @@
}
/**
+ * Returns the base DN of this ReplicationDomain.
+ *
+ * @return The base DN of this ReplicationDomain
+ */
+ public DN getBaseDN()
+ {
+ return baseDN;
+ }
+
+ /**
* Gets the baseDN of this domain.
*
* @return The baseDN for this domain.
*/
public String getBaseDNString()
{
- return baseDN;
+ return baseDN.toNormalizedString();
}
/**
@@ -943,7 +953,7 @@
requested servers. Log problem
*/
Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(
- baseDN, Integer.toString(serverID),
+ getBaseDNString(), Integer.toString(serverID),
update.toString(), ack.errorsToString());
logError(errorMsg);
@@ -1387,7 +1397,7 @@
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
- countEntries(), baseDN, serverID);
+ countEntries(), getBaseDNString(), serverID);
logError(msg);
for (DSInfo dsi : getReplicasList())
@@ -1403,7 +1413,7 @@
else
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
- countEntries(), baseDN, serverID, serverToInitialize);
+ countEntries(), getBaseDNString(), serverID, serverToInitialize);
logError(msg);
ieContext.startList.add(serverToInitialize);
@@ -1434,8 +1444,8 @@
// Send start message to the peer
InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
- baseDN, serverID, serverToInitialize, serverRunningTheTask,
- ieContext.entryCount, initWindow);
+ getBaseDNString(), serverID, serverToInitialize,
+ serverRunningTheTask, ieContext.entryCount, initWindow);
broker.publish(initTargetMsg);
@@ -1457,7 +1467,6 @@
// Notify the peer of the success
DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
broker.publish(doneMsg);
-
}
catch(DirectoryException exportException)
{
@@ -1558,14 +1567,14 @@
.getLocalizedMessage() : "";
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
- Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL
- .get(baseDN, serverID, cause);
+ Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get(
+ getBaseDNString(), serverID, cause);
logError(msg);
}
else
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
- baseDN, serverID, serverToInitialize, cause);
+ getBaseDNString(), serverID, serverToInitialize, cause);
logError(msg);
}
@@ -1922,7 +1931,7 @@
Message errMsg =
Message.raw(Category.SYNC, Severity.NOTICE,
ERR_INIT_EXPORTER_DISCONNECTION.get(
- this.baseDN,
+ getBaseDNString(),
Integer.toString(this.serverID),
Integer.toString(ieContext.importSource)));
if (ieContext.getException()==null)
@@ -2200,7 +2209,7 @@
ieContext.initializeTask = initTask;
ieContext.attemptCnt = 0;
ieContext.initReqMsgSent = new InitializeRequestMsg(
- baseDN, serverID, source, this.initWindow);
+ getBaseDNString(), serverID, source, this.initWindow);
// Publish Init request msg
broker.publish(ieContext.initReqMsgSent);
@@ -2261,7 +2270,7 @@
{
// Log starting
Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
- baseDN, initTargetMsgReceived.getSenderID(), serverID);
+ getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID);
logError(msg);
// Go into full update status
@@ -2395,7 +2404,7 @@
finally
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
- baseDN, initTargetMsgReceived.getSenderID(), serverID,
+ getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID,
(ieContext.getException() != null ? ieContext
.getException().getLocalizedMessage() : ""));
logError(msg);
@@ -2436,7 +2445,7 @@
if (newStatus == ServerStatus.INVALID_STATUS)
{
- Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDN,
+ Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(),
Integer.toString(serverID), status.toString(), event.toString());
logError(msg);
return;
@@ -2514,10 +2523,8 @@
}
if (!allSet)
{
- ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_RESET_GENERATION_ID_FAILED.get(baseDN);
- throw new DirectoryException(
- resultCode, message);
+ Message message = ERR_RESET_GENERATION_ID_FAILED.get(getBaseDNString());
+ throw new DirectoryException(ResultCode.OTHER, message);
}
}
@@ -2592,7 +2599,7 @@
if (!isConnected())
{
ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(baseDN,
+ Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
Integer.toString(serverID),
Long.toString(genIdMessage.getGenerationId()));
throw new DirectoryException(
@@ -2987,11 +2994,10 @@
* Starts the receiver side of the Replication Service.
* <p>
* After this method has been called, the Replication Service will start
- * calling the {@link #processUpdate(UpdateMsg)}.
+ * calling the {@link #processUpdate(UpdateMsg, AtomicBoolean)}.
* <p>
* This method must be called once and must be called after the
* {@link #startPublishService(Collection, int, long, long)}.
- *
*/
public void startListenService()
{
@@ -3234,8 +3240,8 @@
} else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
{
Message errorMsg = ERR_DS_UNKNOWN_ASSURED_MODE.get(
- Integer.toString(serverID), msgAssuredMode.toString(), baseDN,
- msg.toString());
+ Integer.toString(serverID), msgAssuredMode.toString(),
+ getBaseDNString(), msg.toString());
logError(errorMsg);
}
// Nothing to do in Assured safe data mode, only RS ack updates.
@@ -3401,9 +3407,9 @@
* Publish an {@link UpdateMsg} to the Replication Service.
* <p>
* The Replication Service will handle the delivery of this {@link UpdateMsg}
- * to all the participants of this Replication Domain.
- * These members will be receive this {@link UpdateMsg} through a call
- * of the {@link #processUpdate(UpdateMsg)} message.
+ * to all the participants of this Replication Domain. These members will be
+ * receive this {@link UpdateMsg} through a call of the
+ * {@link #processUpdate(UpdateMsg, AtomicBoolean)} message.
*
* @param msg The UpdateMsg that should be pushed.
*/
@@ -3449,8 +3455,8 @@
{
// This exception may only be raised if assured replication is
// enabled
- Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(baseDN, Long.toString(
- assuredTimeout), update.toString());
+ Message errorMsg = NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
+ Long.toString(assuredTimeout), update.toString());
logError(errorMsg);
}
}
--
Gitblit v1.10.0