From b4f8838b15342670c31753a484abf0129e3c9653 Mon Sep 17 00:00:00 2001
From: jcduff <jcduff@localhost>
Date: Thu, 23 Oct 2008 14:04:24 +0000
Subject: [PATCH] The commit will bring the following features : - An updated version of the underlying database. BDB JE 3.3 is now used. - Attribute API refactoring providing a better abstraction and offering improved performances. - A new GUI called the Control-Panel to replace the Status-Panel: the specifications for this GUI are available on OpenDS Wiki and contains a link to a mockup. See <https://www.opends.org/wiki/page/ControlPanelUISpecification>. - Some changes in the replication protocol to implement "Assured Replication Mode". The specifications are on OpenDS Wiki at <https://www.opends.org/wiki/page/AssuredMode> and section 7 described some of the replication changes required to support this. Assured Replication is not finished, but the main replication protocol changes to support it are done. As explained by Gilles on an email on the Dev mailing list (http://markmail.org/message/46rgo3meq3vriy4a), with these changes the newer versions of OpenDS may not be able to replicate with OpenDS 1.0 instances. - Support for Service Tags on the platforms where the functionality is available and enabled. Specifications are published at <https://www.opends.org/wiki/page/OpenDSServiceTagEnabled>. For more information on Service Tags see <http://wikis.sun.com/display/ServiceTag/Sun+Service+Tag+FAQ>. - The Admin Connector service. In order to provide agentry of the OpenDS server at any time, a new service has been added, dedicated to the administration, configuration and monitoring of the server. An overview of the Admin Connector service and it's use is available on the OpenDS wiki <https://www.opends.org/wiki/page/ManagingAdministrationTrafficToTheServer> - Updates to the various command line tools to support the Admin Connector service. - Some internal re-architecting of the server to put the foundation of future developments such as virtual directory services. The new NetworkGroups and WorkFlow internal services which have been specified in <https://www.opends.org/wiki/page/BasicOperationRoutingThroughNetworkGroup> are now implemented. - Many bug fixes...
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 875 ++++++++++++++++++++++++++++++++++++++++++---------------
1 files changed, 641 insertions(+), 234 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index fb7b9ee..ed583c6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -36,6 +36,7 @@
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.replication.common.StatusMachine.*;
import java.io.File;
import java.io.IOException;
@@ -50,10 +51,10 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedMap;
+import java.util.SortedSet;
import java.util.TreeMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
@@ -85,31 +86,42 @@
import org.opends.server.protocols.ldap.LDAPAttribute;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
+import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.AckMessage;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.common.StatusMachine;
+import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.AddContext;
import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.DeleteContext;
-import org.opends.server.replication.protocol.DoneMessage;
-import org.opends.server.replication.protocol.EntryMessage;
-import org.opends.server.replication.protocol.ErrorMessage;
-import org.opends.server.replication.protocol.HeartbeatMessage;
-import org.opends.server.replication.protocol.InitializeRequestMessage;
-import org.opends.server.replication.protocol.InitializeTargetMessage;
+import org.opends.server.replication.protocol.DoneMsg;
+import org.opends.server.replication.protocol.EntryMsg;
+import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.HeartbeatMsg;
+import org.opends.server.replication.protocol.InitializeRequestMsg;
+import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ModifyContext;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
+import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMessage;
-import org.opends.server.replication.protocol.ResetGenerationId;
-import org.opends.server.replication.protocol.RoutableMessage;
-import org.opends.server.replication.protocol.UpdateMessage;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
+import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
+import org.opends.server.types.AttributeBuilder;
+import org.opends.server.types.Attributes;
import org.opends.server.types.ExistingFileBehavior;
import org.opends.server.types.AbstractOperation;
import org.opends.server.types.Attribute;
@@ -183,8 +195,8 @@
// The update to replay message queue where the listener thread is going to
// push incoming update messages.
private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
- private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
- new TreeMap<ChangeNumber, UpdateMessage>();
+ private SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs =
+ new TreeMap<ChangeNumber, UpdateMsg>();
private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
private AtomicInteger numSentUpdates = new AtomicInteger(0);
private AtomicInteger numProcessedUpdates = new AtomicInteger();
@@ -208,7 +220,7 @@
/**
* This object is used to store the list of update currently being
* done on the local database.
- * Is is usefull to make sure that the local operations are sent in a
+ * Is is useful to make sure that the local operations are sent in a
* correct order to the replication server and that the ServerState
* is not updated too early.
*/
@@ -217,8 +229,8 @@
/**
* It contain the updates that were done on other servers, transmitted
* by the replication server and that are currently replayed.
- * It is usefull to make sure that dependencies between operations
- * are correctly fullfilled and to to make sure that the ServerState is
+ * It is useful to make sure that dependencies between operations
+ * are correctly fulfilled and to to make sure that the ServerState is
* not updated too early.
*/
private RemotePendingChanges remotePendingChanges;
@@ -228,7 +240,7 @@
* server. Zero means heartbeats are off.
*/
private long heartbeatInterval = 0;
- short serverId;
+ private short serverId;
// The context related to an import or export being processed
// Null when none is being processed.
@@ -236,7 +248,7 @@
private Collection<String> replicationServers;
- private DN baseDN;
+ private DN baseDn;
private boolean shutdown = false;
@@ -250,12 +262,42 @@
private int window = 100;
+ /*
+ * Assured mode properties
+ */
+ // Is assured mode enabled or not for this domain ?
+ private boolean assured = false;
+ // Assured sub mode (used when assured is true)
+ private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
+ // Safe Data level (used when assuredMode is SAFE_DATA)
+ private byte assuredSdLevel = (byte)1;
+ // Timeout (in milliseconds) when waiting for acknowledgments
+ private long assuredTimeout = 1000;
+
+ // Group id
+ private byte groupId = (byte)1;
+ // Referrals urls to be published to other servers of the topology
+ // TODO: fill that with all currently opened urls if no urls configured
+ private List<String> refUrls = new ArrayList<String>();
+
+ // Current status for this replicated domain
+ private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
+
+ /*
+ * Properties for the last topology info received from the network.
+ */
+ // Info for other DSs.
+ // Warning: does not contain info for us (for our server id)
+ private List<DSInfo> dsList = new ArrayList<DSInfo>();
+ // Info for other RSs.
+ private List<RSInfo> rsList = new ArrayList<RSInfo>();
+
/**
* The isolation policy that this domain is going to use.
* This field describes the behavior of the domain when an update is
* attempted and the domain could not connect to any Replication Server.
* Possible values are accept-updates or deny-updates, but other values
- * may be added in the futur.
+ * may be added in the future.
*/
private IsolationPolicy isolationpolicy;
@@ -281,9 +323,9 @@
// The input stream for the import
ReplLDIFInputStream ldifImportInputStream = null;
// The target in the case of an export
- short exportTarget = RoutableMessage.UNKNOWN_SERVER;
+ short exportTarget = RoutableMsg.UNKNOWN_SERVER;
// The source in the case of an import
- short importSource = RoutableMessage.UNKNOWN_SERVER;
+ short importSource = RoutableMsg.UNKNOWN_SERVER;
// The total entry count expected to be processed
long entryCount = 0;
@@ -295,7 +337,9 @@
/**
* Initializes the import/export counters with the provider value.
- * @param count The value with which to initialize the counters.
+ * @param total
+ * @param left
+ * @throws DirectoryException
*/
public void setCounters(long total, long left)
throws DirectoryException
@@ -321,6 +365,7 @@
/**
* Update the counters of the task for each entry processed during
* an import or export.
+ * @throws DirectoryException
*/
public void updateCounters()
throws DirectoryException
@@ -366,7 +411,7 @@
*/
public ExportThread(short target)
{
- super("Export thread");
+ super("Export thread " + serverId);
this.target = target;
}
@@ -406,12 +451,13 @@
LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
throws ConfigException
{
- super("replicationDomain_" + configuration.getBaseDN());
+ super("Replication State Saver for server id " + configuration.getServerId()
+ + " and domain " + configuration.getBaseDN());
// Read the configuration parameters.
replicationServers = configuration.getReplicationServer();
serverId = (short) configuration.getServerId();
- baseDN = configuration.getBaseDN();
+ baseDn = configuration.getBaseDN();
window = configuration.getWindowSize();
heartbeatInterval = configuration.getHeartbeatInterval();
isolationpolicy = configuration.getIsolationPolicy();
@@ -419,13 +465,43 @@
this.updateToReplayQueue = updateToReplayQueue;
/*
+ * Fill assured configuration properties
+ */
+ AssuredType assuredType = configuration.getAssuredType();
+ switch (assuredType)
+ {
+ case NOT_ASSURED:
+ assured = false;
+ break;
+ case SAFE_DATA:
+ assured = true;
+ this.assuredMode = AssuredMode.SAFE_DATA_MODE;
+ break;
+ case SAFE_READ:
+ assured = true;
+ this.assuredMode = AssuredMode.SAFE_READ_MODE;
+ break;
+ }
+ this.assuredSdLevel = (byte)configuration.getAssuredSdLevel();
+ this.groupId = (byte)configuration.getGroupId();
+ this.assuredTimeout = configuration.getAssuredTimeout();
+ SortedSet<String> urls = configuration.getReferralsUrl();
+ if (urls != null)
+ {
+ for (String url : urls)
+ {
+ this.refUrls.add(url);
+ }
+ }
+
+ /*
* Modify conflicts are solved for all suffixes but the schema suffix
* because we don't want to store extra information in the schema
* ldif files.
* This has no negative impact because the changes on schema should
* not produce conflicts.
*/
- if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
+ if (baseDn.compareTo(DirectoryServer.getSchemaDN()) == 0)
{
solveConflictFlag = false;
}
@@ -438,7 +514,7 @@
* Create a new Persistent Server State that will be used to store
* the last ChangeNmber seen from all LDAP servers in the topology.
*/
- state = new PersistentServerState(baseDN, serverId);
+ state = new PersistentServerState(baseDn, serverId);
/*
* Create a replication monitor object responsible for publishing
@@ -447,11 +523,11 @@
monitor = new ReplicationMonitor(this);
DirectoryServer.registerMonitorProvider(monitor);
- Backend backend = retrievesBackend(baseDN);
+ Backend backend = retrievesBackend(baseDn);
if (backend == null)
{
throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
- baseDN.toNormalizedString()));
+ baseDn.toNormalizedString()));
}
try
@@ -461,16 +537,16 @@
catch (DirectoryException e)
{
logError(ERR_LOADING_GENERATION_ID.get(
- baseDN.toNormalizedString(), e.getLocalizedMessage()));
+ baseDn.toNormalizedString(), e.getLocalizedMessage()));
}
/*
* create the broker object used to publish and receive changes
*/
- broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue,
- maxReceiveDelay, maxSendQueue, maxSendDelay, window,
+ broker = new ReplicationBroker(this, state, baseDn, serverId,
+ maxReceiveQueue, maxReceiveDelay, maxSendQueue, maxSendDelay, window,
heartbeatInterval, generationId,
- new ReplSessionSecurity(configuration));
+ new ReplSessionSecurity(configuration),getGroupId());
broker.start(replicationServers);
@@ -505,7 +581,7 @@
*/
public DN getBaseDN()
{
- return baseDN;
+ return baseDn;
}
/**
@@ -521,7 +597,7 @@
if ((!deleteOperation.isSynchronizationOperation())
&& (!brokerIsConnected(deleteOperation)))
{
- Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
+ Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
return new SynchronizationProviderResult.StopProcessing(
ResultCode.UNWILLING_TO_PERFORM, msg);
}
@@ -535,7 +611,7 @@
/*
* This is a replication operation
* Check that the modified entry has the same entryuuid
- * has was in the original message.
+ * as it was in the original message.
*/
String operationEntryUUID = ctx.getEntryUid();
String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
@@ -581,7 +657,7 @@
if ((!addOperation.isSynchronizationOperation())
&& (!brokerIsConnected(addOperation)))
{
- Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
+ Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
return new SynchronizationProviderResult.StopProcessing(
ResultCode.UNWILLING_TO_PERFORM, msg);
}
@@ -686,7 +762,7 @@
if ((!modifyDNOperation.isSynchronizationOperation())
&& (!brokerIsConnected(modifyDNOperation)))
{
- Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
+ Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
return new SynchronizationProviderResult.StopProcessing(
ResultCode.UNWILLING_TO_PERFORM, msg);
}
@@ -725,7 +801,7 @@
* parent is the same as when the operation was performed.
*/
String newParentId = findEntryId(modifyDNOperation.getNewSuperior());
- if ((newParentId != null) &&
+ if ((newParentId != null) && (ctx.getNewParentId() != null) &&
(!newParentId.equals(ctx.getNewParentId())))
{
return new SynchronizationProviderResult.StopProcessing(
@@ -765,7 +841,7 @@
if ((!modifyOperation.isSynchronizationOperation())
&& (!brokerIsConnected(modifyOperation)))
{
- Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
+ Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
return new SynchronizationProviderResult.StopProcessing(
ResultCode.UNWILLING_TO_PERFORM, msg);
}
@@ -851,6 +927,7 @@
findEntryId(addOperation.getEntryDN().getParentDNInSuffix()));
addOperation.setAttachment(SYNCHROCONTEXT, ctx);
+ Historical.generateState(addOperation);
}
/**
@@ -858,14 +935,14 @@
* also responsible for updating the list of pending changes
* @return the received message - null if none
*/
- public UpdateMessage receive()
+ public UpdateMsg receive()
{
- UpdateMessage update = null;
+ UpdateMsg update = null;
- while (update == null)
+ while ( (update == null) && (!shutdown) )
{
- InitializeRequestMessage initMsg = null;
- ReplicationMessage msg;
+ InitializeRequestMsg initMsg = null;
+ ReplicationMsg msg;
try
{
msg = broker.receive();
@@ -876,24 +953,24 @@
}
if (debugEnabled())
- if (!(msg instanceof HeartbeatMessage))
+ if (!(msg instanceof HeartbeatMsg))
TRACER.debugVerbose("Message received <" + msg + ">");
- if (msg instanceof AckMessage)
+ if (msg instanceof AckMsg)
{
- AckMessage ack = (AckMessage) msg;
+ AckMsg ack = (AckMsg) msg;
receiveAck(ack);
}
- else if (msg instanceof InitializeRequestMessage)
+ else if (msg instanceof InitializeRequestMsg)
{
// Another server requests us to provide entries
// for a total update
- initMsg = (InitializeRequestMessage)msg;
+ initMsg = (InitializeRequestMsg)msg;
}
- else if (msg instanceof InitializeTargetMessage)
+ else if (msg instanceof InitializeTargetMsg)
{
// Another server is exporting its entries to us
- InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
+ InitializeTargetMsg importMsg = (InitializeTargetMsg) msg;
try
{
@@ -907,8 +984,8 @@
catch(DirectoryException de)
{
// Returns an error message to notify the sender
- ErrorMessage errorMsg =
- new ErrorMessage(importMsg.getsenderID(),
+ ErrorMsg errorMsg =
+ new ErrorMsg(importMsg.getsenderID(),
de.getMessageObject());
MessageBuilder mb = new MessageBuilder();
mb.append(de.getMessageObject());
@@ -916,7 +993,7 @@
broker.publish(errorMsg);
}
}
- else if (msg instanceof ErrorMessage)
+ else if (msg instanceof ErrorMsg)
{
if (ieContext != null)
{
@@ -925,22 +1002,31 @@
// - or before an import really started
// For example, when we publish a request and the
// replicationServer did not find any import source.
- abandonImportExport((ErrorMessage)msg);
+ abandonImportExport((ErrorMsg)msg);
}
else
{
- /* We can receive an error message from the replication server
- * in the following cases :
- * - we connected with an incorrect generation id
+ /*
+ * Log error message
*/
- ErrorMessage errorMsg = (ErrorMessage)msg;
+ ErrorMsg errorMsg = (ErrorMsg)msg;
logError(ERR_ERROR_MSG_RECEIVED.get(
errorMsg.getDetails()));
}
}
- else if (msg instanceof UpdateMessage)
+ if (msg instanceof TopologyMsg)
{
- update = (UpdateMessage) msg;
+ TopologyMsg topoMsg = (TopologyMsg)msg;
+ receiveTopo(topoMsg);
+ }
+ if (msg instanceof ChangeStatusMsg)
+ {
+ ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
+ receiveChangeStatus(csMsg);
+ }
+ else if (msg instanceof UpdateMsg)
+ {
+ update = (UpdateMsg) msg;
receiveUpdate(update);
}
}
@@ -968,24 +1054,183 @@
}
/**
- * Do the necessary processing when an UpdateMessage was received.
+ * Processes an incoming TopologyMsg.
+ * Updates the structures for the local view of the topology.
*
- * @param update The received UpdateMessage.
+ * @param topoMsg The topology information received from RS.
*/
- public void receiveUpdate(UpdateMessage update)
+ public void receiveTopo(TopologyMsg topoMsg)
+ {
+
+ if (debugEnabled())
+ TRACER.debugInfo("Replication domain " + baseDn
+ + " received topology info update:\n" + topoMsg);
+
+ // Store new lists
+ synchronized(getDsList())
+ {
+ synchronized(getRsList())
+ {
+ dsList = topoMsg.getDsList();
+ rsList = topoMsg.getRsList();
+ }
+ }
+ }
+
+ /**
+ * Set the initial status of the domain, once he is connected to the topology.
+ * @param initStatus The status to enter the state machine with
+ */
+ public void setInitialStatus(ServerStatus initStatus)
+ {
+ // Sanity check: is it a valid initial status?
+ if (!isValidInitialStatus(initStatus))
+ {
+ Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
+ baseDn.toString(), Short.toString(serverId));
+ logError(msg);
+ } else
+ {
+ status = initStatus;
+ }
+ }
+
+ /**
+ * Processes an incoming ChangeStatusMsg. Compute new status according to
+ * given order. Then update domain for being compliant with new status
+ * definition.
+ * @param csMsg The received status message
+ */
+ private void receiveChangeStatus(ChangeStatusMsg csMsg)
+ {
+ if (debugEnabled())
+ TRACER.debugInfo("Replication domain " + baseDn +
+ " received change status message:\n" + csMsg);
+
+ ServerStatus reqStatus = csMsg.getRequestedStatus();
+
+ // Translate requested status to a state machine event
+ StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
+ if (event == StatusMachineEvent.INVALID_EVENT)
+ {
+ Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
+ baseDn.toString(), Short.toString(serverId));
+ logError(msg);
+ return;
+ }
+
+ // Compute new status and do matching tasks
+ // Use synchronized as admin task (thread) could order to go in admin status
+ // for instance (concurrent with receive thread).
+ synchronized (status)
+ {
+ ServerStatus newStatus =
+ StatusMachine.computeNewStatus(status, event);
+
+ if (newStatus == ServerStatus.INVALID_STATUS)
+ {
+ Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
+ Short.toString(serverId), status.toString(), event.toString());
+ logError(msg);
+ return;
+ }
+
+ // Store new status
+ status = newStatus;
+
+ if (debugEnabled())
+ TRACER.debugInfo("Replication domain " + baseDn +
+ " new status is: " + status);
+
+ // Perform whatever actions are needed to apply properties for being
+ // compliant with new status
+ updateDomainForNewStatus();
+ }
+ }
+
+ /**
+ * Called when first connection or disconnection detected.
+ */
+ public void toNotConnectedStatus()
+ {
+ // Go into not connected status
+ // Use synchronized as somebody could ask another status change at the same
+ // time
+ synchronized (status)
+ {
+ StatusMachineEvent event =
+ StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT;
+ ServerStatus newStatus =
+ StatusMachine.computeNewStatus(status, event);
+
+ if (newStatus == ServerStatus.INVALID_STATUS)
+ {
+ Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
+ Short.toString(serverId), status.toString(), event.toString());
+ logError(msg);
+ return;
+ }
+
+ // Store new status
+ status = newStatus;
+
+ if (debugEnabled())
+ TRACER.debugInfo("Replication domain " + baseDn +
+ " new status is: " + status);
+
+ // Perform whatever actions are needed to apply properties for being
+ // compliant with new status
+ updateDomainForNewStatus();
+ }
+ }
+
+ /**
+ * Perform whatever actions are needed to apply properties for being
+ * compliant with new status. Must be called in synchronized section for
+ * status. The new status is already set in status variable.
+ */
+ private void updateDomainForNewStatus()
+ {
+ switch (status)
+ {
+ case NOT_CONNECTED_STATUS:
+ break;
+ case NORMAL_STATUS:
+ break;
+ case DEGRADED_STATUS:
+ break;
+ case FULL_UPDATE_STATUS:
+ // Signal RS we just entered the full update status
+ broker.signalStatusChange(status);
+ break;
+ case BAD_GEN_ID_STATUS:
+ break;
+ default:
+ if (debugEnabled())
+ TRACER.debugInfo("updateDomainForNewStatus: unexpected status: " +
+ status);
+ }
+ }
+
+ /**
+ * Do the necessary processing when an UpdateMsg was received.
+ *
+ * @param update The received UpdateMsg.
+ */
+ public void receiveUpdate(UpdateMsg update)
{
remotePendingChanges.putRemoteUpdate(update);
numRcvdUpdates.incrementAndGet();
}
/**
- * Do the necessary processing when an AckMessage is received.
+ * Do the necessary processing when an AckMsg is received.
*
- * @param ack The AckMessage that was received.
+ * @param ack The AckMsg that was received.
*/
- public void receiveAck(AckMessage ack)
+ public void receiveAck(AckMsg ack)
{
- UpdateMessage update;
+ UpdateMsg update;
ChangeNumber changeNumber = ack.getChangeNumber();
synchronized (waitingAckMsgs)
@@ -1013,7 +1258,7 @@
{
numReplayedPostOpCalled++;
}
- UpdateMessage msg = null;
+ UpdateMsg msg = null;
// Note that a failed non-replication operation might not have a change
// number.
@@ -1025,7 +1270,7 @@
{
// Generate a replication message for a successful non-replication
// operation.
- msg = UpdateMessage.generateMsg(op, isAssured);
+ msg = UpdateMsg.generateMsg(op);
if (msg == null)
{
@@ -1209,7 +1454,7 @@
*/
public void ack(ChangeNumber changeNumber)
{
- broker.publish(new AckMessage(changeNumber));
+ broker.publish(new AckMsg(changeNumber));
}
/**
@@ -1304,18 +1549,17 @@
}
/**
- * Create and replay a synchronized Operation from an UpdateMessage.
+ * Create and replay a synchronized Operation from an UpdateMsg.
*
- * @param msg The UpdateMessage to be replayed.
+ * @param msg The UpdateMsg to be replayed.
*/
- public void replay(UpdateMessage msg)
+ public void replay(UpdateMsg msg)
{
Operation op = null;
boolean done = false;
boolean dependency = false;
ChangeNumber changeNumber = null;
int retryCount = 10;
- boolean firstTry = true;
// Try replay the operation, then flush (replaying) any pending operation
// whose dependency has been replayed until no more left.
@@ -1323,10 +1567,11 @@
{
try
{
+ op = msg.createOperation(conn);
+ dependency = remotePendingChanges.checkDependencies(op, msg);
+
while ((!dependency) && (!done) && (retryCount-- > 0))
{
- op = msg.createOperation(conn);
-
op.setInternalOperation(true);
op.setSynchronizationOperation(true);
changeNumber = OperationContext.getChangeNumber(op);
@@ -1341,36 +1586,23 @@
{
ModifyOperation newOp = (ModifyOperation) op;
dependency = remotePendingChanges.checkDependencies(newOp);
- if ((!dependency) && (!firstTry))
- {
- done = solveNamingConflict(newOp, msg);
- }
+ ModifyMsg modifyMsg = (ModifyMsg) msg;
+ done = solveNamingConflict(newOp, modifyMsg);
} else if (op instanceof DeleteOperation)
{
DeleteOperation newOp = (DeleteOperation) op;
dependency = remotePendingChanges.checkDependencies(newOp);
- if ((!dependency) && (!firstTry))
- {
- done = solveNamingConflict(newOp, msg);
- }
+ done = solveNamingConflict(newOp, msg);
} else if (op instanceof AddOperation)
{
AddOperation newOp = (AddOperation) op;
AddMsg addMsg = (AddMsg) msg;
dependency = remotePendingChanges.checkDependencies(newOp);
- if ((!dependency) && (!firstTry))
- {
- done = solveNamingConflict(newOp, addMsg);
- }
+ done = solveNamingConflict(newOp, addMsg);
} else if (op instanceof ModifyDNOperationBasis)
{
- ModifyDNMsg newMsg = (ModifyDNMsg) msg;
- dependency = remotePendingChanges.checkDependencies(newMsg);
- if ((!dependency) && (!firstTry))
- {
- ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
- done = solveNamingConflict(newOp, msg);
- }
+ ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
+ done = solveNamingConflict(newOp, msg);
} else
{
done = true; // unknown type of operation ?!
@@ -1382,11 +1614,19 @@
// however we still need to push this change to the serverState
updateError(changeNumber);
}
- } else
+ else
+ {
+ /*
+ * Create a new operation as the ConflictResolution
+ * different operation.
+ */
+ op = msg.createOperation(conn);
+ }
+ }
+ else
{
done = true;
}
- firstTry = false;
}
if (!done && !dependency)
@@ -1457,7 +1697,6 @@
dependency = false;
changeNumber = null;
retryCount = 10;
- firstTry = true;
} while (msg != null);
}
@@ -1494,14 +1733,16 @@
*
* @param dn The dn of the entry for which the unique Id is searched.
*
- * @return The unique Id of the entry whith the provided DN.
+ * @return The unique Id of the entry with the provided DN.
*/
- private String findEntryId(DN dn)
+ static String findEntryId(DN dn)
{
if (dn == null)
return null;
try
{
+ InternalClientConnection conn =
+ InternalClientConnection.getRootConnection();
LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
attrs.add(ENTRYUIDNAME);
InternalSearchOperation search = conn.processSearch(dn,
@@ -1530,17 +1771,17 @@
}
/**
- * find the current dn of an entry from its entry uuid.
+ * find the current DN of an entry from its entry UUID.
*
* @param uuid the Entry Unique ID.
- * @return The curernt dn of the entry or null if there is no entry with
- * the specified uuid.
+ * @return The current DN of the entry or null if there is no entry with
+ * the specified UUID.
*/
private DN findEntryDN(String uuid)
{
try
{
- InternalSearchOperation search = conn.processSearch(baseDN,
+ InternalSearchOperation search = conn.processSearch(baseDn,
SearchScope.WHOLE_SUBTREE,
SearchFilter.createFilterFromString("entryuuid="+uuid));
if (search.getResultCode() == ResultCode.SUCCESS)
@@ -1570,7 +1811,7 @@
* @return true if the process is completed, false if it must continue..
*/
private boolean solveNamingConflict(ModifyOperation op,
- UpdateMessage msg)
+ ModifyMsg msg)
{
ResultCode result = op.getResultCode();
ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
@@ -1601,6 +1842,57 @@
return true;
}
}
+ else if (result == ResultCode.NOT_ALLOWED_ON_RDN)
+ {
+ DN currentDN = findEntryDN(entryUid);
+ RDN currentRDN = null;
+ if (currentDN != null)
+ {
+ currentRDN = currentDN.getRDN();
+ }
+ else
+ {
+ // The entry does not exist anymore.
+ numResolvedNamingConflicts.incrementAndGet();
+ return true;
+ }
+
+ // The modify operation is trying to delete the value that is
+ // currently used in the RDN. We need to alter the modify so that it does
+ // not remove the current RDN value(s).
+
+ List<Modification> mods = op.getModifications();
+ for (Modification mod : mods)
+ {
+ AttributeType modAttrType = mod.getAttribute().getAttributeType();
+ if ((mod.getModificationType() == ModificationType.DELETE) ||
+ (mod.getModificationType() == ModificationType.REPLACE))
+ {
+ if (currentRDN.hasAttributeType(modAttrType))
+ {
+ // the attribute can't be deleted because it is used
+ // in the RDN, turn this operation is a replace with the
+ // current RDN value(s);
+ mod.setModificationType(ModificationType.REPLACE);
+ Attribute newAttribute = mod.getAttribute();
+ AttributeBuilder attrBuilder;
+ if (newAttribute == null)
+ {
+ attrBuilder = new AttributeBuilder(modAttrType);
+ }
+ else
+ {
+ attrBuilder = new AttributeBuilder(newAttribute);
+ }
+ attrBuilder.add(currentRDN.getAttributeValue(modAttrType));
+ mod.setAttribute(attrBuilder.toAttribute());
+ }
+ }
+ }
+ msg.setMods(mods);
+ numResolvedNamingConflicts.incrementAndGet();
+ return false;
+ }
else
{
// The other type of errors can not be caused by naming conflicts.
@@ -1621,7 +1913,7 @@
* @return true if the process is completed, false if it must continue..
*/
private boolean solveNamingConflict(DeleteOperation op,
- UpdateMessage msg)
+ UpdateMsg msg)
{
ResultCode result = op.getResultCode();
DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
@@ -1691,7 +1983,7 @@
* @throws Exception When the operation is not valid.
*/
private boolean solveNamingConflict(ModifyDNOperation op,
- UpdateMessage msg) throws Exception
+ UpdateMsg msg) throws Exception
{
ResultCode result = op.getResultCode();
ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
@@ -1710,40 +2002,43 @@
* - don't do anything if the operation is replayed.
*/
- // Construct the new DN to use for the entry.
- DN entryDN = op.getEntryDN();
- DN newSuperior = findEntryDN(newSuperiorID);
- RDN newRDN = op.getNewRDN();
- DN parentDN;
-
- if (newSuperior == null)
- {
- parentDN = entryDN.getParent();
- }
- else
- {
- parentDN = newSuperior;
- }
-
- if ((parentDN == null) || parentDN.isNullDN())
- {
- /* this should never happen
- * can't solve any conflict in this case.
- */
- throw new Exception("operation parameters are invalid");
- }
-
- DN newDN = parentDN.concat(newRDN);
-
// get the current DN of this entry in the database.
DN currentDN = findEntryDN(entryUid);
+ // Construct the new DN to use for the entry.
+ DN entryDN = op.getEntryDN();
+ DN newSuperior = null;
+ RDN newRDN = op.getNewRDN();
+
+ if (newSuperiorID != null)
+ {
+ newSuperior = findEntryDN(newSuperiorID);
+ }
+ else
+ {
+ newSuperior = entryDN.getParent();
+ }
+
+ //If we could not find the new parent entry, we missed this entry
+ // earlier or it has disappeared from the database
+ // Log this information for the repair tool and mark the entry
+ // as conflicting.
+ // stop the processing.
+ if (newSuperior == null)
+ {
+ markConflictEntry(op, currentDN, currentDN.getParent().concat(newRDN));
+ numUnresolvedNamingConflicts.incrementAndGet();
+ return true;
+ }
+
+ DN newDN = newSuperior.concat(newRDN);
+
if (currentDN == null)
{
- // The entry targetted by the Modify DN is not in the database
+ // The entry targeted by the Modify DN is not in the database
// anymore.
// This is a conflict between a delete and this modify DN.
- // The entry has been deleted anymore so we can safely assume
+ // The entry has been deleted, we can safely assume
// that the operation is completed.
numResolvedNamingConflicts.incrementAndGet();
return true;
@@ -1758,19 +2053,8 @@
return true;
}
- // If we could not find the new parent entry, we missed this entry
- // earlier or it has disappeared from the database
- // Log this information for the repair tool and mark the entry
- // as conflicting.
- // stop the processing.
- if (newSuperior == null)
- {
- markConflictEntry(op, currentDN, newDN);
- numUnresolvedNamingConflicts.incrementAndGet();
- return true;
- }
-
if ((result == ResultCode.NO_SUCH_OBJECT) ||
+ (result == ResultCode.UNWILLING_TO_PERFORM) ||
(result == ResultCode.OBJECTCLASS_VIOLATION))
{
/*
@@ -1858,7 +2142,7 @@
msg.setDn(generateConflictRDN(entryUid,
op.getEntryDN().getRDN().toString()) + ","
- + baseDN);
+ + baseDn);
// reset the parent uid so that the check done is the handleConflict
// phase does not fail.
msg.setParentUid(null);
@@ -1964,7 +2248,7 @@
}
} catch (DirectoryException e)
{
- // log errror and information for the REPAIR tool.
+ // log error and information for the REPAIR tool.
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_EXCEPTION_RENAME_CONFLICT_ENTRY.get());
mb.append(String.valueOf(entryDN));
@@ -1979,7 +2263,7 @@
/**
* Rename an entry that was conflicting so that it stays below the
- * baseDN of the replicationDomain.
+ * baseDn of the replicationDomain.
*
* @param conflictOp The Operation that caused the conflict.
* @param dn The DN of the entry to be renamed.
@@ -1991,7 +2275,7 @@
InternalClientConnection.getRootConnection();
ModifyDNOperation newOp = conn.processModifyDN(
- dn, generateDeleteConflictDn(uid, dn),false, baseDN);
+ dn, generateDeleteConflictDn(uid, dn),false, baseDn);
if (newOp.getResultCode() != ResultCode.SUCCESS)
{
@@ -2022,11 +2306,10 @@
InternalClientConnection conn =
InternalClientConnection.getRootConnection();
- AttributeType attrType =
- DirectoryServer.getAttributeType(DS_SYNC_CONFLICT, true);
- LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
- values.add(new AttributeValue(attrType, conflictDN.toString()));
- Attribute attr = new Attribute(attrType, DS_SYNC_CONFLICT, values);
+ AttributeType attrType = DirectoryServer.getAttributeType(DS_SYNC_CONFLICT,
+ true);
+ Attribute attr = Attributes.create(attrType, new AttributeValue(
+ attrType, conflictDN.toString()));
List<Modification> mods = new ArrayList<Modification>();
Modification mod = new Modification(ModificationType.REPLACE, attr);
mods.add(mod);
@@ -2042,7 +2325,7 @@
logError(mb.toMessage());
}
- // Generate an alert to let the administratot know that some
+ // Generate an alert to let the administration know that some
// conflict could not be solved.
Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN.toString());
DirectoryServer.sendAlertNotification(this,
@@ -2055,12 +2338,12 @@
*
* @param msg The conflicting Add Operation.
*
- * @throws ASN1Exception When an encoding error happenned manipulating the
+ * @throws ASN1Exception When an encoding error happened manipulating the
* msg.
*/
private void addConflict(AddMsg msg) throws ASN1Exception
{
- // Generate an alert to let the administratot know that some
+ // Generate an alert to let the administrator know that some
// conflict could not be solved.
Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(msg.getDn());
DirectoryServer.sendAlertNotification(this,
@@ -2215,7 +2498,7 @@
* Get the server ID.
* @return The server ID.
*/
- public int getServerId()
+ public short getServerId()
{
return serverId;
}
@@ -2273,7 +2556,7 @@
/**
* Enable back the domain after a previous disable.
* The domain will connect back to a replication Server and
- * will recreate threads to listen for messages from the Sycnhronization
+ * will recreate threads to listen for messages from the Synchronization
* server.
* The generationId will be retrieved or computed if necessary.
* The ServerState will also be read again from the local database.
@@ -2291,7 +2574,7 @@
* should we stop the modifications ?
*/
logError(ERR_LOADING_GENERATION_ID.get(
- baseDN.toNormalizedString(), e.getLocalizedMessage()));
+ baseDn.toNormalizedString(), e.getLocalizedMessage()));
return;
}
@@ -2349,7 +2632,7 @@
public ResultCode saveGenerationId(long generationId)
{
// The generationId is stored in the root entry of the domain.
- ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
+ ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDn.toString());
ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
ASN1OctetString value = new ASN1OctetString(Long.toString(generationId));
@@ -2383,7 +2666,7 @@
Message message = ERR_UPDATING_GENERATION_ID.get(
op.getResultCode().getResultCodeName() + " " +
op.getErrorMessage(),
- baseDN.toString());
+ baseDn.toString());
logError(message);
}
}
@@ -2410,9 +2693,9 @@
if (debugEnabled())
TRACER.debugInfo(
- "Attempt to read generation ID from DB " + baseDN.toString());
+ "Attempt to read generation ID from DB " + baseDn.toString());
- ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
+ ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDn.toString());
boolean found = false;
LDAPFilter filter;
try
@@ -2442,7 +2725,7 @@
Message message = ERR_SEARCHING_GENERATION_ID.get(
search.getResultCode().getResultCodeName() + " " +
search.getErrorMessage(),
- baseDN.toString());
+ baseDn.toString());
logError(message);
}
@@ -2460,27 +2743,26 @@
if (attrs != null)
{
Attribute attr = attrs.get(0);
- LinkedHashSet<AttributeValue> values = attr.getValues();
- if (values.size()>1)
+ if (attr.size()>1)
{
Message message = ERR_LOADING_GENERATION_ID.get(
- baseDN.toString(), "#Values=" + values.size() +
+ baseDn.toString(), "#Values=" + attr.size() +
" Must be exactly 1 in entry " +
resultEntry.toLDIFString());
logError(message);
}
- else if (values.size() == 1)
+ else if (attr.size() == 1)
{
found=true;
try
{
- generationId = Long.decode(values.iterator().next().
+ generationId = Long.decode(attr.iterator().next().
getStringValue());
}
catch(Exception e)
{
Message message = ERR_LOADING_GENERATION_ID.get(
- baseDN.toString(), e.getLocalizedMessage());
+ baseDn.toString(), e.getLocalizedMessage());
logError(message);
}
}
@@ -2495,7 +2777,7 @@
if (debugEnabled())
TRACER.debugInfo("Generation ID created for domain base DN=" +
- baseDN.toString() +
+ baseDn.toString() +
" generationId=" + generationId);
}
else
@@ -2503,7 +2785,7 @@
generationIdSavedStatus = true;
if (debugEnabled())
TRACER.debugInfo(
- "Generation ID successfully read from domain base DN=" + baseDN +
+ "Generation ID successfully read from domain base DN=" + baseDn +
" generationId=" + generationId);
}
return generationId;
@@ -2528,19 +2810,20 @@
{
ResultCode resultCode = ResultCode.OTHER;
Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
- baseDN.toNormalizedString());
+ baseDn.toNormalizedString());
throw new DirectoryException(
resultCode, message);
}
- ResetGenerationId genIdMessage = null;
+ ResetGenerationIdMsg genIdMessage = null;
+
if (generationIdNewValue == null)
{
- genIdMessage = new ResetGenerationId(this.generationId);
+ genIdMessage = new ResetGenerationIdMsg(this.generationId);
}
else
{
- genIdMessage = new ResetGenerationId(generationIdNewValue);
+ genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
}
broker.publish(genIdMessage);
}
@@ -2574,7 +2857,7 @@
*/
public byte[] receiveEntryBytes()
{
- ReplicationMessage msg;
+ ReplicationMsg msg;
while (true)
{
try
@@ -2584,7 +2867,7 @@
if (debugEnabled())
TRACER.debugVerbose(
" sid:" + this.serverId +
- " base DN:" + this.baseDN +
+ " base DN:" + this.baseDn +
" Import EntryBytes received " + msg);
if (msg == null)
{
@@ -2592,26 +2875,26 @@
return null;
}
- if (msg instanceof EntryMessage)
+ if (msg instanceof EntryMsg)
{
- EntryMessage entryMsg = (EntryMessage)msg;
+ EntryMsg entryMsg = (EntryMsg)msg;
byte[] entryBytes = entryMsg.getEntryBytes();
ieContext.updateCounters();
return entryBytes;
}
- else if (msg instanceof DoneMessage)
+ else if (msg instanceof DoneMsg)
{
// This is the normal termination of the import
// No error is stored and the import is ended
// by returning null
return null;
}
- else if (msg instanceof ErrorMessage)
+ else if (msg instanceof ErrorMsg)
{
// This is an error termination during the import
// The error is stored and the import is ended
// by returning null
- ErrorMessage errorMsg = (ErrorMessage)msg;
+ ErrorMsg errorMsg = (ErrorMsg)msg;
ieContext.exception = new DirectoryException(
ResultCode.OTHER,
errorMsg.getDetails());
@@ -2637,7 +2920,7 @@
* on going.
* @param errorMsg The error message received.
*/
- protected void abandonImportExport(ErrorMessage errorMsg)
+ protected void abandonImportExport(ErrorMsg errorMsg)
{
// FIXME TBD Treat the case where the error happens while entries
// are being exported
@@ -2645,7 +2928,7 @@
if (debugEnabled())
TRACER.debugVerbose(
" abandonImportExport:" + this.serverId +
- " base DN:" + this.baseDN +
+ " base DN:" + this.baseDn +
" Error Msg received " + errorMsg);
if (ieContext != null)
@@ -2730,8 +3013,8 @@
throws DirectoryException
{
long genID = 0;
- Backend backend = retrievesBackend(this.baseDN);
- long bec = backend.numSubordinates(baseDN, true) + 1;
+ Backend backend = retrievesBackend(this.baseDn);
+ long bec = backend.numSubordinates(baseDn, true) + 1;
long entryCount = (bec<1000?bec:1000);
// Acquire a shared lock for the backend.
@@ -2764,10 +3047,10 @@
if (checksumOutput)
{
ros = new ReplLDIFOutputStream(this, entryCount);
- os = new CheckedOutputStream(ros, new Adler32());
+ os = new CheckedOutputStream(ros, new GenerationIdChecksum());
try
{
- os.write((Long.toString(backend.numSubordinates(baseDN, true) + 1)).
+ os.write((Long.toString(backend.numSubordinates(baseDn, true) + 1)).
getBytes());
}
catch(Exception e)
@@ -2782,9 +3065,9 @@
}
LDIFExportConfig exportConfig = new LDIFExportConfig(os);
- // baseDN branch is the only one included in the export
+ // baseDn branch is the only one included in the export
List<DN> includeBranches = new ArrayList<DN>(1);
- includeBranches.add(this.baseDN);
+ includeBranches.add(this.baseDn);
exportConfig.setIncludeBranches(includeBranches);
// For the checksum computing mode, only consider the 'stable' attributes
@@ -2838,19 +3121,15 @@
}
finally
{
+ // Clean up after the export by closing the export config.
+ // Will also flush the export and export the remaining entries.
+ exportConfig.close();
if (checksumOutput)
{
genID =
((CheckedOutputStream)os).getChecksum().getValue();
}
- else
- {
- // Clean up after the export by closing the export config.
- // Will also flush the export and export the remaining entries.
- // This is a real export where writer has been initialized.
- exportConfig.close();
- }
// Release the shared lock on the backend.
try
@@ -2882,12 +3161,12 @@
* Retrieves the backend related to the domain.
*
* @return The backend of that domain.
- * @param baseDN The baseDN to retrieve the backend
+ * @param baseDn The baseDn to retrieve the backend
*/
- protected static Backend retrievesBackend(DN baseDN)
+ protected static Backend retrievesBackend(DN baseDn)
{
// Retrieves the backend related to this domain
- return DirectoryServer.getBackend(baseDN);
+ return DirectoryServer.getBackend(baseDn);
}
/**
@@ -2909,7 +3188,7 @@
*/
public void exportLDIFEntry(String lDIFEntry) throws IOException
{
- // If an error was raised - like receiving an ErrorMessage
+ // If an error was raised - like receiving an ErrorMsg
// we just let down the export.
if (ieContext.exception != null)
{
@@ -2918,7 +3197,7 @@
throw ioe;
}
- EntryMessage entryMessage = new EntryMessage(
+ EntryMsg entryMessage = new EntryMsg(
serverId, ieContext.exportTarget, lDIFEntry.getBytes());
broker.publish(entryMessage);
@@ -2949,8 +3228,8 @@
acquireIEContext();
ieContext.initializeTask = initTask;
- InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
- baseDN, serverId, source);
+ InitializeRequestMsg initializeMsg = new InitializeRequestMsg(
+ baseDn, serverId, source);
// Publish Init request msg
broker.publish(initializeMsg);
@@ -3014,7 +3293,7 @@
Throwable cause;
if (targetString.equalsIgnoreCase("all"))
{
- return RoutableMessage.ALL_SERVERS;
+ return RoutableMsg.ALL_SERVERS;
}
// So should be a serverID
@@ -3092,10 +3371,17 @@
public void initializeRemote(short target, short requestorID, Task initTask)
throws DirectoryException
{
+ Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
+ Short.toString(serverId),
+ baseDn.toString(),
+ Short.toString(requestorID));
+ logError(msg);
+
boolean contextAcquired=false;
+
try
{
- Backend backend = retrievesBackend(this.baseDN);
+ Backend backend = retrievesBackend(this.baseDn);
if (!backend.supportsLDIFExport())
{
@@ -3110,7 +3396,7 @@
// The number of entries to be exported is the number of entries under
// the base DN entry and the base entry itself.
- long entryCount = backend.numSubordinates(baseDN, true) + 1;
+ long entryCount = backend.numSubordinates(baseDn, true) + 1;
ieContext.exportTarget = target;
if (initTask != null)
{
@@ -3119,15 +3405,15 @@
ieContext.setCounters(entryCount, entryCount);
// Send start message to the peer
- InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
- baseDN, serverId, ieContext.exportTarget, requestorID, entryCount);
+ InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
+ baseDn, serverId, ieContext.exportTarget, requestorID, entryCount);
broker.publish(initializeMessage);
exportBackend(false);
// Notify the peer of the success
- DoneMessage doneMsg = new DoneMessage(serverId,
+ DoneMsg doneMsg = new DoneMsg(serverId,
initializeMessage.getDestination());
broker.publish(doneMsg);
@@ -3136,8 +3422,8 @@
catch(DirectoryException de)
{
// Notify the peer of the failure
- ErrorMessage errorMsg =
- new ErrorMessage(target,
+ ErrorMsg errorMsg =
+ new ErrorMsg(target,
de.getMessageObject());
broker.publish(errorMsg);
@@ -3146,6 +3432,12 @@
throw(de);
}
+
+ msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
+ Short.toString(serverId),
+ baseDn.toString(),
+ Short.toString(requestorID));
+ logError(msg);
}
/**
@@ -3180,13 +3472,48 @@
* @param initializeMessage The message that initiated the import.
* @exception DirectoryException Thrown when an error occurs.
*/
- protected void initialize(InitializeTargetMessage initializeMessage)
+ protected void initialize(InitializeTargetMsg initializeMessage)
throws DirectoryException
{
LDIFImportConfig importConfig = null;
DirectoryException de = null;
- Backend backend = retrievesBackend(baseDN);
+ Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
+ Short.toString(serverId),
+ baseDn.toString(),
+ Long.toString(initializeMessage.getRequestorID()));
+ logError(msg);
+
+ // Go into full update status
+ // Use synchronized as somebody could ask another status change at the same
+ // time
+ synchronized (status)
+ {
+ StatusMachineEvent event = StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT;
+ ServerStatus newStatus =
+ StatusMachine.computeNewStatus(status, event);
+
+ if (newStatus == ServerStatus.INVALID_STATUS)
+ {
+ msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
+ Short.toString(serverId), status.toString(), event.toString());
+ logError(msg);
+ return;
+ }
+
+ // Store new status
+ status = newStatus;
+
+ if (debugEnabled())
+ TRACER.debugInfo("Replication domain " + baseDn +
+ " new status is: " + status);
+
+ // Perform whatever actions are needed to apply properties for being
+ // compliant with new status
+ updateDomainForNewStatus();
+ }
+
+ Backend backend = retrievesBackend(baseDn);
try
{
@@ -3220,7 +3547,7 @@
importConfig =
new LDIFImportConfig(ieContext.ldifImportInputStream);
List<DN> includeBranches = new ArrayList<DN>();
- includeBranches.add(this.baseDN);
+ includeBranches.add(this.baseDn);
importConfig.setIncludeBranches(includeBranches);
importConfig.setAppendToExistingData(false);
@@ -3254,7 +3581,7 @@
// Re-enable backend
closeBackendImport(backend);
- backend = retrievesBackend(baseDN);
+ backend = retrievesBackend(baseDn);
}
try
@@ -3294,6 +3621,12 @@
{
throw de;
}
+
+ msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
+ Short.toString(serverId),
+ baseDn.toString(),
+ Long.toString(initializeMessage.getRequestorID()));
+ logError(msg);
}
/**
@@ -3320,21 +3653,21 @@
}
/**
- * Retrieves a replication domain based on the baseDN.
+ * Retrieves a replication domain based on the baseDn.
*
- * @param baseDN The baseDN of the domain to retrieve
+ * @param baseDn The baseDn of the domain to retrieve
* @return The domain retrieved
* @throws DirectoryException When an error occurred or no domain
- * match the provided baseDN.
+ * match the provided baseDn.
*/
- public static ReplicationDomain retrievesReplicationDomain(DN baseDN)
+ public static ReplicationDomain retrievesReplicationDomain(DN baseDn)
throws DirectoryException
{
ReplicationDomain replicationDomain = null;
// Retrieves the domain
DirectoryServer.getSynchronizationProviders();
- for (SynchronizationProvider provider :
+ for (SynchronizationProvider<?> provider :
DirectoryServer.getSynchronizationProviders())
{
if (!( provider instanceof MultimasterReplication))
@@ -3346,7 +3679,7 @@
// From the domainDN retrieves the replication domain
ReplicationDomain sdomain =
- MultimasterReplication.findDomain(baseDN, null);
+ MultimasterReplication.findDomain(baseDn, null);
if (sdomain == null)
{
break;
@@ -3365,7 +3698,7 @@
{
MessageBuilder mb = new MessageBuilder(ERR_NO_MATCHING_DOMAIN.get());
mb.append(" ");
- mb.append(String.valueOf(baseDN));
+ mb.append(String.valueOf(baseDn));
throw new DirectoryException(ResultCode.OTHER,
mb.toMessage());
}
@@ -3378,11 +3711,11 @@
*/
public Backend getBackend()
{
- return retrievesBackend(baseDN);
+ return retrievesBackend(baseDn);
}
/**
- * Returns a boolean indiciating if an import or export is currently
+ * Returns a boolean indicating if an import or export is currently
* processed.
* @return The status
*/
@@ -3396,7 +3729,7 @@
/**
- * Push the modifications contain the in given parameter has
+ * Push the modifications contained in the given parameter as
* a modification that would happen on a local server.
* The modifications are not applied to the local database,
* historical information is not updated but a ChangeNumber
@@ -3427,7 +3760,7 @@
* @param configuration The configuration to check.
* @param unacceptableReasons When the configuration is not acceptable, this
* table is use to return the reasons why this
- * configuration is not acceptbale.
+ * configuration is not acceptable.
*
* @return true if the configuration is acceptable, false other wise.
*/
@@ -3437,7 +3770,7 @@
// Check that there is not already a domain with the same DN
DN dn = configuration.getBaseDN();
ReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
- if ((domain != null) && (domain.baseDN.equals(dn)))
+ if ((domain != null) && (domain.baseDn.equals(dn)))
{
Message message = ERR_SYNC_INVALID_DN.get();
unacceptableReasons.add(message);
@@ -3557,4 +3890,78 @@
else
return false;
}
+
+ /**
+ * Gets the info for DSs in the topology (except us).
+ * @return The info for DSs in the topology (except us)
+ */
+ public List<DSInfo> getDsList()
+ {
+ return dsList;
+ }
+
+ /**
+ * Gets the info for RSs in the topology (except the one we are connected
+ * to).
+ * @return The info for RSs in the topology (except the one we are connected
+ * to)
+ */
+ public List<RSInfo> getRsList()
+ {
+ return rsList;
+ }
+
+ /**
+ * Tells if assured replication is enabled for this domain.
+ * @return True if assured replication is enabled for this domain.
+ */
+ public boolean isAssured()
+ {
+ return assured;
+ }
+
+ /**
+ * Gives the mode for the assured replication of the domain.
+ * @return The mode for the assured replication of the domain.
+ */
+ public AssuredMode getAssuredMode()
+ {
+ return assuredMode;
+ }
+
+ /**
+ * Gives the assured level of the replication of the domain.
+ * @return The assured level of the replication of the domain.
+ */
+ public byte getAssuredSdLevel()
+ {
+ return assuredSdLevel;
+ }
+
+ /**
+ * Gets the group id for this domain.
+ * @return The group id for this domain.
+ */
+ public byte getGroupId()
+ {
+ return groupId;
+ }
+
+ /**
+ * Gets the referrals URLs this domain publishes.
+ * @return The referrals URLs this domain publishes.
+ */
+ public List<String> getRefUrls()
+ {
+ return refUrls;
+ }
+
+ /**
+ * Gets the status for this domain.
+ * @return The status for this domain.
+ */
+ public ServerStatus getStatus()
+ {
+ return status;
+ }
}
--
Gitblit v1.10.0