From d04fb0f282e0fd9a4bc80d3f9d5ee15506a3b83b Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk
---
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 1874 ++++++++++++----------------------------------------------
1 files changed, 399 insertions(+), 1,475 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
similarity index 65%
rename from opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
rename to opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index ed583c6..87cb100 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -25,6 +25,7 @@
* Copyright 2006-2008 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
+
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -36,28 +37,31 @@
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import static org.opends.server.replication.common.StatusMachine.*;
+
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
+
+import org.opends.server.replication.service.ReplicationMonitor;
+
+import java.util.Collection;
+
+import org.opends.server.types.Attributes;
import java.io.File;
-import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
-import java.net.SocketTimeoutException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import java.util.TreeMap;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
-
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.server.ConfigurationChangeListener;
@@ -68,7 +72,6 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.jeb.BackendImpl;
-import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
@@ -82,49 +85,31 @@
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchListener;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
+import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachine;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.AddContext;
import org.opends.server.replication.protocol.AddMsg;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.DeleteContext;
-import org.opends.server.replication.protocol.DoneMsg;
-import org.opends.server.replication.protocol.EntryMsg;
-import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.HeartbeatMsg;
-import org.opends.server.replication.protocol.InitializeRequestMsg;
-import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ModifyContext;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.OperationContext;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.tasks.InitializeTargetTask;
-import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
-import org.opends.server.types.AttributeBuilder;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.ExistingFileBehavior;
import org.opends.server.types.AbstractOperation;
import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ConfigChangeResult;
@@ -133,6 +118,7 @@
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
+import org.opends.server.types.ExistingFileBehavior;
import org.opends.server.types.LDAPException;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
@@ -144,6 +130,7 @@
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
+import org.opends.server.types.SearchResultReference;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SynchronizationProviderResult;
import org.opends.server.types.operation.PluginOperation;
@@ -163,9 +150,9 @@
* handle conflict resolution,
* handle protocol messages from the replicationServer.
*/
-public class ReplicationDomain extends DirectoryThread
+public class LDAPReplicationDomain extends ReplicationDomain
implements ConfigurationChangeListener<ReplicationDomainCfg>,
- AlertGenerator
+ AlertGenerator, InternalSearchListener
{
/**
* The fully-qualified name of this class.
@@ -185,37 +172,20 @@
*/
private static final DebugTracer TRACER = getTracer();
- private ReplicationMonitor monitor;
-
- private ReplicationBroker broker;
- // Thread waiting for incoming update messages for this domain and pushing
- // them to the global incoming update message queue for later processing by
- // replay threads.
- private ListenerThread listenerThread;
// The update to replay message queue where the listener thread is going to
// push incoming update messages.
private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
- private SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs =
- new TreeMap<ChangeNumber, UpdateMsg>();
- private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
- private AtomicInteger numSentUpdates = new AtomicInteger(0);
- private AtomicInteger numProcessedUpdates = new AtomicInteger();
private AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
private AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
private AtomicInteger numUnresolvedNamingConflicts = new AtomicInteger();
private int debugCount = 0;
- private PersistentServerState state;
+ private final PersistentServerState state;
private int numReplayedPostOpCalled = 0;
- private int maxReceiveQueue = 0;
- private int maxSendQueue = 0;
- private int maxReceiveDelay = 0;
- private int maxSendDelay = 0;
-
private long generationId = -1;
private boolean generationIdSavedStatus = false;
- ChangeNumberGenerator generator;
+ private ChangeNumberGenerator generator;
/**
* This object is used to store the list of update currently being
@@ -235,19 +205,8 @@
*/
private RemotePendingChanges remotePendingChanges;
- /**
- * The time in milliseconds between heartbeats from the replication
- * server. Zero means heartbeats are off.
- */
- private long heartbeatInterval = 0;
private short serverId;
- // The context related to an import or export being processed
- // Null when none is being processed.
- private IEContext ieContext = null;
-
- private Collection<String> replicationServers;
-
private DN baseDn;
private boolean shutdown = false;
@@ -260,37 +219,10 @@
private boolean disabled = false;
private boolean stateSavingDisabled = false;
- private int window = 100;
-
- /*
- * Assured mode properties
- */
- // Is assured mode enabled or not for this domain ?
- private boolean assured = false;
- // Assured sub mode (used when assured is true)
- private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
- // Safe Data level (used when assuredMode is SAFE_DATA)
- private byte assuredSdLevel = (byte)1;
- // Timeout (in milliseconds) when waiting for acknowledgments
- private long assuredTimeout = 1000;
-
- // Group id
- private byte groupId = (byte)1;
- // Referrals urls to be published to other servers of the topology
- // TODO: fill that with all currently opened urls if no urls configured
- private List<String> refUrls = new ArrayList<String>();
-
- // Current status for this replicated domain
- private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
-
- /*
- * Properties for the last topology info received from the network.
- */
- // Info for other DSs.
- // Warning: does not contain info for us (for our server id)
- private List<DSInfo> dsList = new ArrayList<DSInfo>();
- // Info for other RSs.
- private List<RSInfo> rsList = new ArrayList<RSInfo>();
+ // This list is used to temporary store operations that needs
+ // to be replayed at session establishment time.
+ private TreeSet<FakeOperation> replayOperations =
+ new TreeSet<FakeOperation>(new FakeOperationComparator());;
/**
* The isolation policy that this domain is going to use.
@@ -312,131 +244,47 @@
*/
private boolean done = true;
+ private ServerStateFlush flushThread;
+
/**
- * This class contain the context related to an import or export
- * launched on the domain.
+ * The thread that periodically saves the ServerState of this
+ * LDAPReplicationDomain in the database.
*/
- private class IEContext
+ private class ServerStateFlush extends DirectoryThread
{
- // The task that initiated the operation.
- Task initializeTask;
- // The input stream for the import
- ReplLDIFInputStream ldifImportInputStream = null;
- // The target in the case of an export
- short exportTarget = RoutableMsg.UNKNOWN_SERVER;
- // The source in the case of an import
- short importSource = RoutableMsg.UNKNOWN_SERVER;
-
- // The total entry count expected to be processed
- long entryCount = 0;
- // The count for the entry not yet processed
- long entryLeftCount = 0;
-
- // The exception raised when any
- DirectoryException exception = null;
-
- /**
- * Initializes the import/export counters with the provider value.
- * @param total
- * @param left
- * @throws DirectoryException
- */
- public void setCounters(long total, long left)
- throws DirectoryException
+ protected ServerStateFlush()
{
- entryCount = total;
- entryLeftCount = left;
-
- if (initializeTask != null)
- {
- if (initializeTask instanceof InitializeTask)
- {
- ((InitializeTask)initializeTask).setTotal(entryCount);
- ((InitializeTask)initializeTask).setLeft(entryCount);
- }
- else if (initializeTask instanceof InitializeTargetTask)
- {
- ((InitializeTargetTask)initializeTask).setTotal(entryCount);
- ((InitializeTargetTask)initializeTask).setLeft(entryCount);
- }
- }
- }
-
- /**
- * Update the counters of the task for each entry processed during
- * an import or export.
- * @throws DirectoryException
- */
- public void updateCounters()
- throws DirectoryException
- {
- entryLeftCount--;
-
- if (initializeTask != null)
- {
- if (initializeTask instanceof InitializeTask)
- {
- ((InitializeTask)initializeTask).setLeft(entryLeftCount);
- }
- else if (initializeTask instanceof InitializeTargetTask)
- {
- ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
- }
- }
+ super("Replication State Saver for server id " +
+ serverId + " and domain " + baseDn.toString());
}
/**
* {@inheritDoc}
*/
- public String toString()
- {
- return new String("[ Entry count=" + this.entryCount +
- ", Entry left count=" + this.entryLeftCount + "]");
- }
- }
-
- /**
- * This thread is launched when we want to export data to another server that
- * has requested to be initialized with the data of our backend.
- */
- private class ExportThread extends DirectoryThread
- {
- // Id of server that will receive updates
- private short target;
-
- /**
- * Constructor for the ExportThread.
- *
- * @param target Id of server that will receive updates
- */
- public ExportThread(short target)
- {
- super("Export thread " + serverId);
- this.target = target;
- }
-
- /**
- * Run method for this class.
- */
+ @Override
public void run()
{
- if (debugEnabled())
- {
- TRACER.debugInfo("Export thread starting.");
- }
+ done = false;
- try
+ while (shutdown == false)
{
- initializeRemote(target, target, null);
- } catch (DirectoryException de)
- {
- // An error message has been sent to the peer
- // Nothing more to do locally
+ try
+ {
+ synchronized (this)
+ {
+ this.wait(1000);
+ if (!disabled && !stateSavingDisabled )
+ {
+ // save the ServerState
+ state.save();
+ }
+ }
+ } catch (InterruptedException e)
+ { }
}
- if (debugEnabled())
- {
- TRACER.debugInfo("Export thread stopping.");
- }
+ state.save();
+
+ done = true;
}
}
@@ -447,18 +295,24 @@
* @param updateToReplayQueue The queue for update messages to replay.
* @throws ConfigException In case of invalid configuration.
*/
- public ReplicationDomain(ReplicationDomainCfg configuration,
+ public LDAPReplicationDomain(ReplicationDomainCfg configuration,
LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
throws ConfigException
{
- super("Replication State Saver for server id " + configuration.getServerId()
- + " and domain " + configuration.getBaseDN());
+ super(configuration.getBaseDN().toNormalizedString(),
+ (short) configuration.getServerId());
+
+ /**
+ * The time in milliseconds between heartbeats from the replication
+ * server. Zero means heartbeats are off.
+ */
+ long heartbeatInterval = 0;
// Read the configuration parameters.
- replicationServers = configuration.getReplicationServer();
+ Set<String> replicationServers = configuration.getReplicationServer();
serverId = (short) configuration.getServerId();
baseDn = configuration.getBaseDN();
- window = configuration.getWindowSize();
+ int window = configuration.getWindowSize();
heartbeatInterval = configuration.getHeartbeatInterval();
isolationpolicy = configuration.getIsolationPolicy();
configDn = configuration.dn();
@@ -471,28 +325,22 @@
switch (assuredType)
{
case NOT_ASSURED:
- assured = false;
+ setAssured(false);
break;
case SAFE_DATA:
- assured = true;
- this.assuredMode = AssuredMode.SAFE_DATA_MODE;
+ setAssured(true);
+ setAssuredMode(AssuredMode.SAFE_DATA_MODE);
break;
case SAFE_READ:
- assured = true;
- this.assuredMode = AssuredMode.SAFE_READ_MODE;
+ setAssured(true);
+ setAssuredMode(AssuredMode.SAFE_READ_MODE);
break;
}
- this.assuredSdLevel = (byte)configuration.getAssuredSdLevel();
- this.groupId = (byte)configuration.getGroupId();
- this.assuredTimeout = configuration.getAssuredTimeout();
- SortedSet<String> urls = configuration.getReferralsUrl();
- if (urls != null)
- {
- for (String url : urls)
- {
- this.refUrls.add(url);
- }
- }
+ setAssuredSdLevel((byte)configuration.getAssuredSdLevel());
+ setAssuredTimeout(configuration.getAssuredTimeout());
+
+ setGroupId((byte)configuration.getGroupId());
+ setURLs(configuration.getReferralsUrl());
/*
* Modify conflicts are solved for all suffixes but the schema suffix
@@ -510,19 +358,6 @@
solveConflictFlag = true;
}
- /*
- * Create a new Persistent Server State that will be used to store
- * the last ChangeNmber seen from all LDAP servers in the topology.
- */
- state = new PersistentServerState(baseDn, serverId);
-
- /*
- * Create a replication monitor object responsible for publishing
- * monitoring information below cn=monitor.
- */
- monitor = new ReplicationMonitor(this);
- DirectoryServer.registerMonitorProvider(monitor);
-
Backend backend = retrievesBackend(baseDn);
if (backend == null)
{
@@ -541,14 +376,12 @@
}
/*
- * create the broker object used to publish and receive changes
+ * Create a new Persistent Server State that will be used to store
+ * the last ChangeNmber seen from all LDAP servers in the topology.
*/
- broker = new ReplicationBroker(this, state, baseDn, serverId,
- maxReceiveQueue, maxReceiveDelay, maxSendQueue, maxSendDelay, window,
- heartbeatInterval, generationId,
- new ReplSessionSecurity(configuration),getGroupId());
+ state = new PersistentServerState(baseDn, serverId, getServerState());
- broker.start(replicationServers);
+ startPublishService(replicationServers, window, heartbeatInterval);
/*
* ChangeNumberGenerator is used to create new unique ChangeNumbers
@@ -557,14 +390,12 @@
* The generator time is adjusted to the time of the last CN received from
* remote other servers.
*/
- generator =
- new ChangeNumberGenerator(serverId, state);
+ generator = getGenerator();
pendingChanges =
- new PendingChanges(generator,
- broker, state);
+ new PendingChanges(generator, this);
- remotePendingChanges = new RemotePendingChanges(generator, state);
+ remotePendingChanges = new RemotePendingChanges(getServerState());
// listen for changes on the configuration
configuration.addChangeListener(this);
@@ -573,7 +404,6 @@
DirectoryServer.registerAlertGenerator(this);
}
-
/**
* Returns the base DN of this ReplicationDomain.
*
@@ -741,7 +571,7 @@
{
// this isolation policy specifies that the updates are denied
// when the broker is not connected.
- return broker.isConnected();
+ return isConnected();
}
// we should never get there as the only possible policies are
// ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
@@ -931,322 +761,6 @@
}
/**
- * Receives an update message from the replicationServer.
- * also responsible for updating the list of pending changes
- * @return the received message - null if none
- */
- public UpdateMsg receive()
- {
- UpdateMsg update = null;
-
- while ( (update == null) && (!shutdown) )
- {
- InitializeRequestMsg initMsg = null;
- ReplicationMsg msg;
- try
- {
- msg = broker.receive();
- if (msg == null)
- {
- // The server is in the shutdown process
- return null;
- }
-
- if (debugEnabled())
- if (!(msg instanceof HeartbeatMsg))
- TRACER.debugVerbose("Message received <" + msg + ">");
-
- if (msg instanceof AckMsg)
- {
- AckMsg ack = (AckMsg) msg;
- receiveAck(ack);
- }
- else if (msg instanceof InitializeRequestMsg)
- {
- // Another server requests us to provide entries
- // for a total update
- initMsg = (InitializeRequestMsg)msg;
- }
- else if (msg instanceof InitializeTargetMsg)
- {
- // Another server is exporting its entries to us
- InitializeTargetMsg importMsg = (InitializeTargetMsg) msg;
-
- try
- {
- // This must be done while we are still holding the
- // broker lock because we are now going to receive a
- // bunch of entries from the remote server and we
- // want the import thread to catch them and
- // not the ListenerThread.
- initialize(importMsg);
- }
- catch(DirectoryException de)
- {
- // Returns an error message to notify the sender
- ErrorMsg errorMsg =
- new ErrorMsg(importMsg.getsenderID(),
- de.getMessageObject());
- MessageBuilder mb = new MessageBuilder();
- mb.append(de.getMessageObject());
- TRACER.debugInfo(Message.toString(mb.toMessage()));
- broker.publish(errorMsg);
- }
- }
- else if (msg instanceof ErrorMsg)
- {
- if (ieContext != null)
- {
- // This is an error termination for the 2 following cases :
- // - either during an export
- // - or before an import really started
- // For example, when we publish a request and the
- // replicationServer did not find any import source.
- abandonImportExport((ErrorMsg)msg);
- }
- else
- {
- /*
- * Log error message
- */
- ErrorMsg errorMsg = (ErrorMsg)msg;
- logError(ERR_ERROR_MSG_RECEIVED.get(
- errorMsg.getDetails()));
- }
- }
- if (msg instanceof TopologyMsg)
- {
- TopologyMsg topoMsg = (TopologyMsg)msg;
- receiveTopo(topoMsg);
- }
- if (msg instanceof ChangeStatusMsg)
- {
- ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
- receiveChangeStatus(csMsg);
- }
- else if (msg instanceof UpdateMsg)
- {
- update = (UpdateMsg) msg;
- receiveUpdate(update);
- }
- }
- catch (SocketTimeoutException e)
- {
- // just retry
- }
- // Test if we have received and export request message and
- // if that's the case handle it now.
- // This must be done outside of the portion of code protected
- // by the broker lock so that we keep receiveing update
- // when we are doing and export and so that a possible
- // closure of the socket happening when we are publishing the
- // entries to the remote can be handled by the other
- // replay thread when they call this method and therefore the
- // broker.receive() method.
- if (initMsg != null)
- {
- // Do this work in a thread to allow replay thread continue working
- ExportThread exportThread = new ExportThread(initMsg.getsenderID());
- exportThread.start();
- }
- }
- return update;
- }
-
- /**
- * Processes an incoming TopologyMsg.
- * Updates the structures for the local view of the topology.
- *
- * @param topoMsg The topology information received from RS.
- */
- public void receiveTopo(TopologyMsg topoMsg)
- {
-
- if (debugEnabled())
- TRACER.debugInfo("Replication domain " + baseDn
- + " received topology info update:\n" + topoMsg);
-
- // Store new lists
- synchronized(getDsList())
- {
- synchronized(getRsList())
- {
- dsList = topoMsg.getDsList();
- rsList = topoMsg.getRsList();
- }
- }
- }
-
- /**
- * Set the initial status of the domain, once he is connected to the topology.
- * @param initStatus The status to enter the state machine with
- */
- public void setInitialStatus(ServerStatus initStatus)
- {
- // Sanity check: is it a valid initial status?
- if (!isValidInitialStatus(initStatus))
- {
- Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
- baseDn.toString(), Short.toString(serverId));
- logError(msg);
- } else
- {
- status = initStatus;
- }
- }
-
- /**
- * Processes an incoming ChangeStatusMsg. Compute new status according to
- * given order. Then update domain for being compliant with new status
- * definition.
- * @param csMsg The received status message
- */
- private void receiveChangeStatus(ChangeStatusMsg csMsg)
- {
- if (debugEnabled())
- TRACER.debugInfo("Replication domain " + baseDn +
- " received change status message:\n" + csMsg);
-
- ServerStatus reqStatus = csMsg.getRequestedStatus();
-
- // Translate requested status to a state machine event
- StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
- if (event == StatusMachineEvent.INVALID_EVENT)
- {
- Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
- baseDn.toString(), Short.toString(serverId));
- logError(msg);
- return;
- }
-
- // Compute new status and do matching tasks
- // Use synchronized as admin task (thread) could order to go in admin status
- // for instance (concurrent with receive thread).
- synchronized (status)
- {
- ServerStatus newStatus =
- StatusMachine.computeNewStatus(status, event);
-
- if (newStatus == ServerStatus.INVALID_STATUS)
- {
- Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
- Short.toString(serverId), status.toString(), event.toString());
- logError(msg);
- return;
- }
-
- // Store new status
- status = newStatus;
-
- if (debugEnabled())
- TRACER.debugInfo("Replication domain " + baseDn +
- " new status is: " + status);
-
- // Perform whatever actions are needed to apply properties for being
- // compliant with new status
- updateDomainForNewStatus();
- }
- }
-
- /**
- * Called when first connection or disconnection detected.
- */
- public void toNotConnectedStatus()
- {
- // Go into not connected status
- // Use synchronized as somebody could ask another status change at the same
- // time
- synchronized (status)
- {
- StatusMachineEvent event =
- StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT;
- ServerStatus newStatus =
- StatusMachine.computeNewStatus(status, event);
-
- if (newStatus == ServerStatus.INVALID_STATUS)
- {
- Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
- Short.toString(serverId), status.toString(), event.toString());
- logError(msg);
- return;
- }
-
- // Store new status
- status = newStatus;
-
- if (debugEnabled())
- TRACER.debugInfo("Replication domain " + baseDn +
- " new status is: " + status);
-
- // Perform whatever actions are needed to apply properties for being
- // compliant with new status
- updateDomainForNewStatus();
- }
- }
-
- /**
- * Perform whatever actions are needed to apply properties for being
- * compliant with new status. Must be called in synchronized section for
- * status. The new status is already set in status variable.
- */
- private void updateDomainForNewStatus()
- {
- switch (status)
- {
- case NOT_CONNECTED_STATUS:
- break;
- case NORMAL_STATUS:
- break;
- case DEGRADED_STATUS:
- break;
- case FULL_UPDATE_STATUS:
- // Signal RS we just entered the full update status
- broker.signalStatusChange(status);
- break;
- case BAD_GEN_ID_STATUS:
- break;
- default:
- if (debugEnabled())
- TRACER.debugInfo("updateDomainForNewStatus: unexpected status: " +
- status);
- }
- }
-
- /**
- * Do the necessary processing when an UpdateMsg was received.
- *
- * @param update The received UpdateMsg.
- */
- public void receiveUpdate(UpdateMsg update)
- {
- remotePendingChanges.putRemoteUpdate(update);
- numRcvdUpdates.incrementAndGet();
- }
-
- /**
- * Do the necessary processing when an AckMsg is received.
- *
- * @param ack The AckMsg that was received.
- */
- public void receiveAck(AckMsg ack)
- {
- UpdateMsg update;
- ChangeNumber changeNumber = ack.getChangeNumber();
-
- synchronized (waitingAckMsgs)
- {
- update = waitingAckMsgs.remove(changeNumber);
- }
- if (update != null)
- {
- synchronized (update)
- {
- update.notify();
- }
- }
- }
-
- /**
* Check if an operation must be synchronized.
* Also update the list of pending changes and the server RUV
* @param op the operation
@@ -1258,19 +772,17 @@
{
numReplayedPostOpCalled++;
}
- UpdateMsg msg = null;
+ LDAPUpdateMsg msg = null;
// Note that a failed non-replication operation might not have a change
// number.
ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
- boolean isAssured = isAssured(op);
-
if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
{
// Generate a replication message for a successful non-replication
// operation.
- msg = UpdateMsg.generateMsg(op);
+ msg = LDAPUpdateMsg.generateMsg(op);
if (msg == null)
{
@@ -1307,16 +819,6 @@
return;
}
- if (msg != null && isAssured)
- {
- synchronized (waitingAckMsgs)
- {
- // Add the assured message to the list of update that are
- // waiting acknowledgements
- waitingAckMsgs.put(curChangeNumber, msg);
- }
- }
-
if (generationIdSavedStatus != true)
{
this.saveGenerationId(generationId);
@@ -1334,53 +836,8 @@
if (!op.isSynchronizationOperation())
{
- int pushedChanges = pendingChanges.pushCommittedChanges();
- numSentUpdates.addAndGet(pushedChanges);
+ pendingChanges.pushCommittedChanges();
}
-
- // Wait for acknowledgement of an assured message.
- if (msg != null && isAssured)
- {
- synchronized (msg)
- {
- while (waitingAckMsgs.containsKey(msg.getChangeNumber()))
- {
- // TODO : should have a configurable timeout to get
- // out of this loop
- try
- {
- msg.wait(1000);
- } catch (InterruptedException e)
- { }
- }
- }
- }
- }
-
- /**
- * get the number of updates received by the replication plugin.
- *
- * @return the number of updates received
- */
- public int getNumRcvdUpdates()
- {
- if (numRcvdUpdates != null)
- return numRcvdUpdates.get();
- else
- return 0;
- }
-
- /**
- * Get the number of updates sent by the replication plugin.
- *
- * @return the number of updates sent
- */
- public int getNumSentUpdates()
- {
- if (numSentUpdates != null)
- return numSentUpdates.get();
- else
- return 0;
}
/**
@@ -1397,27 +854,6 @@
}
/**
- * Increment the number of processed updates.
- */
- public void incProcessedUpdates()
- {
- numProcessedUpdates.incrementAndGet();
- }
-
- /**
- * get the number of updates replayed by the replication.
- *
- * @return The number of updates replayed by the replication
- */
- public int getNumProcessedUpdates()
- {
- if (numProcessedUpdates != null)
- return numProcessedUpdates.get();
- else
- return 0;
- }
-
- /**
* get the number of updates replayed successfully by the replication.
*
* @return The number of updates replayed successfully
@@ -1428,16 +864,6 @@
}
/**
- * get the ServerState.
- *
- * @return the ServerState
- */
- public ServerState getServerState()
- {
- return state;
- }
-
- /**
* Get the debugCount.
*
* @return Returns the debugCount.
@@ -1448,49 +874,6 @@
}
/**
- * Send an Ack message.
- *
- * @param changeNumber The ChangeNumber for which the ack must be sent.
- */
- public void ack(ChangeNumber changeNumber)
- {
- broker.publish(new AckMsg(changeNumber));
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run()
- {
- done = false;
-
- // Create the listener thread
- listenerThread = new ListenerThread(this, updateToReplayQueue);
- listenerThread.start();
-
- while (shutdown == false)
- {
- try
- {
- synchronized (this)
- {
- this.wait(1000);
- if (!disabled && !stateSavingDisabled )
- {
- // save the RUV
- state.save();
- }
- }
- } catch (InterruptedException e)
- { }
- }
- state.save();
-
- done = true;
- }
-
- /**
* Shutdown this ReplicationDomain.
*/
public void shutdown()
@@ -1498,27 +881,19 @@
// stop the flush thread
shutdown = true;
- // Stop the listener thread
- if (listenerThread != null)
+ // stop the thread in charge of flushing the ServerState.
+ if (flushThread != null)
{
- listenerThread.shutdown();
+ synchronized (flushThread)
+ {
+ flushThread.notify();
+ }
}
- synchronized (this)
- {
- this.notify();
- }
-
- DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
-
DirectoryServer.deregisterAlertGenerator(this);
- // stop the ReplicationBroker
- broker.stop();
-
- // Wait for the listener thread to stop
- if (listenerThread != null)
- listenerThread.waitForShutdown();
+ // stop the ReplicationDomain
+ stopDomain();
// wait for completion of the persistentServerState thread.
try
@@ -1534,26 +909,11 @@
}
/**
- * Get the name of the replicationServer to which this domain is currently
- * connected.
- *
- * @return the name of the replicationServer to which this domain
- * is currently connected.
- */
- public String getReplicationServer()
- {
- if (broker != null)
- return broker.getReplicationServer();
- else
- return "Not connected";
- }
-
- /**
* Create and replay a synchronized Operation from an UpdateMsg.
*
* @param msg The UpdateMsg to be replayed.
*/
- public void replay(UpdateMsg msg)
+ public void replay(LDAPUpdateMsg msg)
{
Operation op = null;
boolean done = false;
@@ -1565,6 +925,7 @@
// whose dependency has been replayed until no more left.
do
{
+ String replayErrorMsg = null;
try
{
op = msg.createOperation(conn);
@@ -1572,12 +933,12 @@
while ((!dependency) && (!done) && (retryCount-- > 0))
{
+ // Try replay the operation
op.setInternalOperation(true);
op.setSynchronizationOperation(true);
changeNumber = OperationContext.getChangeNumber(op);
((AbstractOperation) op).run();
- // Try replay the operation
ResultCode result = op.getResultCode();
if (result != ResultCode.SUCCESS)
@@ -1638,7 +999,7 @@
op.getErrorMessage().toString());
logError(message);
numUnresolvedNamingConflicts.incrementAndGet();
-
+ replayErrorMsg = message.toString();
updateError(changeNumber);
}
} catch (ASN1Exception e)
@@ -1646,16 +1007,19 @@
Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
String.valueOf(msg) + stackTraceToSingleLineString(e));
logError(message);
+ replayErrorMsg = message.toString();
} catch (LDAPException e)
{
Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
String.valueOf(msg) + stackTraceToSingleLineString(e));
logError(message);
+ replayErrorMsg = message.toString();
} catch (DataFormatException e)
{
Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
String.valueOf(msg) + stackTraceToSingleLineString(e));
logError(message);
+ replayErrorMsg = message.toString();
} catch (Exception e)
{
if (changeNumber != null)
@@ -1669,21 +1033,20 @@
Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
stackTraceToSingleLineString(e), op.toString());
logError(message);
+ replayErrorMsg = message.toString();
updateError(changeNumber);
} else
{
Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
String.valueOf(msg) + stackTraceToSingleLineString(e));
logError(message);
+ replayErrorMsg = message.toString();
}
} finally
{
if (!dependency)
{
- broker.updateWindowAfterReplay();
- if (msg.isAssured())
- ack(msg.getChangeNumber());
- incProcessedUpdates();
+ processUpdateDone(msg, replayErrorMsg);
}
}
@@ -1913,7 +1276,7 @@
* @return true if the process is completed, false if it must continue..
*/
private boolean solveNamingConflict(DeleteOperation op,
- UpdateMsg msg)
+ LDAPUpdateMsg msg)
{
ResultCode result = op.getResultCode();
DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
@@ -1983,7 +1346,7 @@
* @throws Exception When the operation is not valid.
*/
private boolean solveNamingConflict(ModifyDNOperation op,
- UpdateMsg msg) throws Exception
+ LDAPUpdateMsg msg) throws Exception
{
ResultCode result = op.getResultCode();
ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
@@ -2391,83 +1754,6 @@
}
/**
- * Check if an operation must be processed as an assured operation.
- *
- * @param op the operation to be checked.
- * @return true if the operations must be processed as an assured operation.
- */
- private boolean isAssured(PostOperationOperation op)
- {
- // TODO : should have a filtering mechanism for checking
- // operation that are assured and operations that are not.
- return false;
- }
-
- /**
- * Get the maximum receive window size.
- *
- * @return The maximum receive window size.
- */
- public int getMaxRcvWindow()
- {
- if (broker != null)
- return broker.getMaxRcvWindow();
- else
- return 0;
- }
-
- /**
- * Get the current receive window size.
- *
- * @return The current receive window size.
- */
- public int getCurrentRcvWindow()
- {
- if (broker != null)
- return broker.getCurrentRcvWindow();
- else
- return 0;
- }
-
- /**
- * Get the maximum send window size.
- *
- * @return The maximum send window size.
- */
- public int getMaxSendWindow()
- {
- if (broker != null)
- return broker.getMaxSendWindow();
- else
- return 0;
- }
-
- /**
- * Get the current send window size.
- *
- * @return The current send window size.
- */
- public int getCurrentSendWindow()
- {
- if (broker != null)
- return broker.getCurrentSendWindow();
- else
- return 0;
- }
-
- /**
- * Get the number of times the replication connection was lost.
- * @return The number of times the replication connection was lost.
- */
- public int getNumLostConnections()
- {
- if (broker != null)
- return broker.getNumLostConnections();
- else
- return 0;
- }
-
- /**
* Get the number of modify conflicts successfully resolved.
* @return The number of modify conflicts successfully resolved.
*/
@@ -2477,7 +1763,7 @@
}
/**
- * Get the number of namign conflicts successfully resolved.
+ * Get the number of naming conflicts successfully resolved.
* @return The number of naming conflicts successfully resolved.
*/
public int getNumResolvedNamingConflicts()
@@ -2495,18 +1781,9 @@
}
/**
- * Get the server ID.
- * @return The server ID.
- */
- public short getServerId()
- {
- return serverId;
- }
-
- /**
* Check if the domain solve conflicts.
*
- * @return a boolean indicating if the domain should sove conflicts.
+ * @return a boolean indicating if the domain should solve conflicts.
*/
public boolean solveConflict()
{
@@ -2526,16 +1803,7 @@
state.save();
state.clearInMemory();
disabled = true;
-
- // Stop the listener thread
- if (listenerThread != null)
- listenerThread.shutdown();
-
- broker.stop(); // This will cut the session and wake up the listener
-
- // Wait for the listener thread to stop
- if (listenerThread != null)
- listenerThread.waitForShutdown();
+ disableService(); // This will cut the session and wake up the listener
}
/**
@@ -2578,16 +1846,7 @@
return;
}
- // After an on-line import, the value of the generationId is new
- // and it is necessary for the broker to send this new value as part
- // of the serverStart message.
- broker.setGenerationId(generationId);
-
- broker.start(replicationServers);
-
- // Create the listener thread
- listenerThread = new ListenerThread(this, updateToReplayQueue);
- listenerThread.start();
+ enableService();
disabled = false;
}
@@ -2600,7 +1859,7 @@
*/
public long computeGenerationId() throws DirectoryException
{
- long genId = exportBackend(true);
+ long genId = exportBackend(null, true);
if (debugEnabled())
TRACER.debugInfo("Computed generationId: generationId=" + genId);
@@ -2609,11 +1868,9 @@
}
/**
- * Returns the generationId set for this domain.
- *
- * @return The generationId.
+ * {@inheritDoc}
*/
- public long getGenerationId()
+ public long getGenerationID()
{
return generationId;
}
@@ -2627,7 +1884,7 @@
/**
* Stores the value of the generationId.
* @param generationId The value of the generationId.
- * @return a ResultCode indicating if the method was successfull.
+ * @return a ResultCode indicating if the method was successful.
*/
public ResultCode saveGenerationId(long generationId)
{
@@ -2792,45 +2049,8 @@
}
/**
- * Reset the generationId of this domain in the whole topology.
- * A message is sent to the Replication Servers for them to reset
- * their change dbs.
- *
- * @param generationIdNewValue The new value of the generation Id.
- * @throws DirectoryException when an error occurs
- */
- public void resetGenerationId(Long generationIdNewValue)
- throws DirectoryException
- {
- if (debugEnabled())
- TRACER.debugInfo(
- this.getName() + "resetGenerationId" + generationIdNewValue);
-
- if (!isConnected())
- {
- ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
- baseDn.toNormalizedString());
- throw new DirectoryException(
- resultCode, message);
- }
-
- ResetGenerationIdMsg genIdMessage = null;
-
- if (generationIdNewValue == null)
- {
- genIdMessage = new ResetGenerationIdMsg(this.generationId);
- }
- else
- {
- genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
- }
- broker.publish(genIdMessage);
- }
-
- /**
* Do whatever is needed when a backup is started.
- * We need to make sure that the serverState is correclty save.
+ * We need to make sure that the serverState is correctly save.
*/
public void backupStart()
{
@@ -2849,103 +2069,7 @@
* Total Update >>
*/
- /**
- * Receives bytes related to an entry in the context of an import to
- * initialize the domain (called by ReplLDIFInputStream).
- *
- * @return The bytes. Null when the Done or Err message has been received
- */
- public byte[] receiveEntryBytes()
- {
- ReplicationMsg msg;
- while (true)
- {
- try
- {
- msg = broker.receive();
- if (debugEnabled())
- TRACER.debugVerbose(
- " sid:" + this.serverId +
- " base DN:" + this.baseDn +
- " Import EntryBytes received " + msg);
- if (msg == null)
- {
- // The server is in the shutdown process
- return null;
- }
-
- if (msg instanceof EntryMsg)
- {
- EntryMsg entryMsg = (EntryMsg)msg;
- byte[] entryBytes = entryMsg.getEntryBytes();
- ieContext.updateCounters();
- return entryBytes;
- }
- else if (msg instanceof DoneMsg)
- {
- // This is the normal termination of the import
- // No error is stored and the import is ended
- // by returning null
- return null;
- }
- else if (msg instanceof ErrorMsg)
- {
- // This is an error termination during the import
- // The error is stored and the import is ended
- // by returning null
- ErrorMsg errorMsg = (ErrorMsg)msg;
- ieContext.exception = new DirectoryException(
- ResultCode.OTHER,
- errorMsg.getDetails());
- return null;
- }
- else
- {
- // Other messages received during an import are trashed
- }
- }
- catch(Exception e)
- {
- // TODO: i18n
- ieContext.exception = new DirectoryException(ResultCode.OTHER,
- Message.raw("received an unexpected message type" +
- e.getLocalizedMessage()));
- }
- }
- }
-
- /**
- * Processes an error message received while an import/export is
- * on going.
- * @param errorMsg The error message received.
- */
- protected void abandonImportExport(ErrorMsg errorMsg)
- {
- // FIXME TBD Treat the case where the error happens while entries
- // are being exported
-
- if (debugEnabled())
- TRACER.debugVerbose(
- " abandonImportExport:" + this.serverId +
- " base DN:" + this.baseDn +
- " Error Msg received " + errorMsg);
-
- if (ieContext != null)
- {
- ieContext.exception = new DirectoryException(ResultCode.OTHER,
- errorMsg.getDetails());
-
- if (ieContext.initializeTask instanceof InitializeTask)
- {
- // Update the task that initiated the import
- ((InitializeTask)ieContext.initializeTask).
- updateTaskCompletionState(ieContext.exception);
-
- releaseIEContext();
- }
- }
- }
/**
* Clears all the entries from the JE backend determined by the
@@ -3000,16 +2124,31 @@
}
/**
+ * This method trigger an export of the replicated data.
+ *
+ * @param output The OutputStream where the export should
+ * be produced.
+ * @throws DirectoryException When needed.
+ */
+ protected void exportBackend(OutputStream output) throws DirectoryException
+ {
+ exportBackend(output, false);
+ }
+
+ /**
* Export the entries from the backend and/or compute the generation ID.
* The ieContext must have been set before calling.
- * @param checksumOutput true is the exportBackend is called to compute
- * the generationID
*
- * @return The computed generationID.
+ * @param output The OutputStream where the export should
+ * be produced.
+ * @param checksumOutput A boolean indicating if this export is
+ * invoked to perform a checksum only
+ *
+ * @return The computed GenerationID.
*
* @throws DirectoryException when an error occurred
*/
- protected long exportBackend(boolean checksumOutput)
+ protected long exportBackend(OutputStream output, boolean checksumOutput)
throws DirectoryException
{
long genID = 0;
@@ -3042,7 +2181,7 @@
}
OutputStream os;
- ReplLDIFOutputStream ros;
+ ReplLDIFOutputStream ros = null;
if (checksumOutput)
{
@@ -3060,8 +2199,7 @@
}
else
{
- ros = new ReplLDIFOutputStream(this, (short)-1);
- os = ros;
+ os = output;
}
LDIFExportConfig exportConfig = new LDIFExportConfig(os);
@@ -3096,7 +2234,7 @@
}
catch (DirectoryException de)
{
- if ((checksumOutput) &&
+ if ((ros != null) &&
(ros.getNumExportedEntries() >= entryCount))
{
// This is the normal end when computing the generationId
@@ -3170,277 +2308,6 @@
}
/**
- * Get the internal broker to perform some operations on it.
- *
- * @return The broker for this domain.
- */
- ReplicationBroker getBroker()
- {
- return broker;
- }
-
- /**
- * Exports an entry in LDIF format.
- *
- * @param lDIFEntry The entry to be exported..
- *
- * @throws IOException when an error occurred.
- */
- public void exportLDIFEntry(String lDIFEntry) throws IOException
- {
- // If an error was raised - like receiving an ErrorMsg
- // we just let down the export.
- if (ieContext.exception != null)
- {
- IOException ioe = new IOException(ieContext.exception.getMessage());
- ieContext = null;
- throw ioe;
- }
-
- EntryMsg entryMessage = new EntryMsg(
- serverId, ieContext.exportTarget, lDIFEntry.getBytes());
- broker.publish(entryMessage);
-
- try
- {
- ieContext.updateCounters();
- }
- catch (DirectoryException de)
- {
- throw new IOException(de.getMessage());
- }
- }
-
- /**
- * Initializes this domain from another source server.
- *
- * @param source The source from which to initialize
- * @param initTask The task that launched the initialization
- * and should be updated of its progress.
- * @throws DirectoryException when an error occurs
- */
- public void initializeFromRemote(short source, Task initTask)
- throws DirectoryException
- {
- if (debugEnabled())
- TRACER.debugInfo("Entering initializeFromRemote");
-
- acquireIEContext();
- ieContext.initializeTask = initTask;
-
- InitializeRequestMsg initializeMsg = new InitializeRequestMsg(
- baseDn, serverId, source);
-
- // Publish Init request msg
- broker.publish(initializeMsg);
-
- // .. we expect to receive entries or err after that
- }
-
- /**
- * Verifies that the given string represents a valid source
- * from which this server can be initialized.
- * @param sourceString The string representing the source
- * @return The source as a short value
- * @throws DirectoryException if the string is not valid
- */
- public short decodeSource(String sourceString)
- throws DirectoryException
- {
- short source = 0;
- Throwable cause = null;
- try
- {
- source = Integer.decode(sourceString).shortValue();
- if ((source >= -1) && (source != serverId))
- {
- // TODO Verifies serverID is in the domain
- // We shold check here that this is a server implied
- // in the current domain.
- return source;
- }
- }
- catch(Exception e)
- {
- cause = e;
- }
-
- ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_INVALID_IMPORT_SOURCE.get();
- if (cause != null)
- {
- throw new DirectoryException(
- resultCode, message, cause);
- }
- else
- {
- throw new DirectoryException(
- resultCode, message);
- }
- }
-
- /**
- * Verifies that the given string represents a valid source
- * from which this server can be initialized.
- * @param targetString The string representing the source
- * @return The source as a short value
- * @throws DirectoryException if the string is not valid
- */
- public short decodeTarget(String targetString)
- throws DirectoryException
- {
- short target = 0;
- Throwable cause;
- if (targetString.equalsIgnoreCase("all"))
- {
- return RoutableMsg.ALL_SERVERS;
- }
-
- // So should be a serverID
- try
- {
- target = Integer.decode(targetString).shortValue();
- if (target >= 0)
- {
- // FIXME Could we check now that it is a know server in the domain ?
- }
- return target;
- }
- catch(Exception e)
- {
- cause = e;
- }
- ResultCode resultCode = ResultCode.OTHER;
- Message message = ERR_INVALID_EXPORT_TARGET.get();
-
- if (cause != null)
- throw new DirectoryException(
- resultCode, message, cause);
- else
- throw new DirectoryException(
- resultCode, message);
-
- }
-
- private synchronized void acquireIEContext()
- throws DirectoryException
- {
- if (ieContext != null)
- {
- // Rejects 2 simultaneous exports
- Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
- throw new DirectoryException(ResultCode.OTHER,
- message);
- }
-
- ieContext = new IEContext();
- }
-
- private synchronized void releaseIEContext()
- {
- ieContext = null;
- }
-
- /**
- * Process the initialization of some other server or servers in the topology
- * specified by the target argument.
- * @param target The target that should be initialized
- * @param initTask The task that triggers this initialization and that should
- * be updated with its progress.
- *
- * @exception DirectoryException When an error occurs.
- */
- public void initializeRemote(short target, Task initTask)
- throws DirectoryException
- {
- initializeRemote(target, serverId, initTask);
- }
-
- /**
- * Process the initialization of some other server or servers in the topology
- * specified by the target argument when this initialization specifying the
- * server that requests the initialization.
- *
- * @param target The target that should be initialized.
- * @param requestorID The server that initiated the export.
- * @param initTask The task that triggers this initialization and that should
- * be updated with its progress.
- *
- * @exception DirectoryException When an error occurs.
- */
- public void initializeRemote(short target, short requestorID, Task initTask)
- throws DirectoryException
- {
- Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
- Short.toString(serverId),
- baseDn.toString(),
- Short.toString(requestorID));
- logError(msg);
-
- boolean contextAcquired=false;
-
- try
- {
- Backend backend = retrievesBackend(this.baseDn);
-
- if (!backend.supportsLDIFExport())
- {
- Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
- backend.getBackendID().toString());
- logError(message);
- throw new DirectoryException(ResultCode.OTHER, message);
- }
-
- acquireIEContext();
- contextAcquired = true;
-
- // The number of entries to be exported is the number of entries under
- // the base DN entry and the base entry itself.
- long entryCount = backend.numSubordinates(baseDn, true) + 1;
- ieContext.exportTarget = target;
- if (initTask != null)
- {
- ieContext.initializeTask = initTask;
- }
- ieContext.setCounters(entryCount, entryCount);
-
- // Send start message to the peer
- InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
- baseDn, serverId, ieContext.exportTarget, requestorID, entryCount);
-
- broker.publish(initializeMessage);
-
- exportBackend(false);
-
- // Notify the peer of the success
- DoneMsg doneMsg = new DoneMsg(serverId,
- initializeMessage.getDestination());
- broker.publish(doneMsg);
-
- releaseIEContext();
- }
- catch(DirectoryException de)
- {
- // Notify the peer of the failure
- ErrorMsg errorMsg =
- new ErrorMsg(target,
- de.getMessageObject());
- broker.publish(errorMsg);
-
- if (contextAcquired)
- releaseIEContext();
-
- throw(de);
- }
-
- msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
- Short.toString(serverId),
- baseDn.toString(),
- Short.toString(requestorID));
- logError(msg);
- }
-
- /**
* Process backend before import.
* @param backend The backend.
* @throws Exception
@@ -3468,51 +2335,16 @@
}
/**
- * Initializes the domain's backend with received entries.
- * @param initializeMessage The message that initiated the import.
- * @exception DirectoryException Thrown when an error occurs.
+ * This method should trigger an import of the replicated data.
+ *
+ * @param input The InputStream from which
+ * @throws DirectoryException When needed.
*/
- protected void initialize(InitializeTargetMsg initializeMessage)
- throws DirectoryException
+ public void importBackend(InputStream input) throws DirectoryException
{
LDIFImportConfig importConfig = null;
DirectoryException de = null;
- Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
- Short.toString(serverId),
- baseDn.toString(),
- Long.toString(initializeMessage.getRequestorID()));
- logError(msg);
-
- // Go into full update status
- // Use synchronized as somebody could ask another status change at the same
- // time
- synchronized (status)
- {
- StatusMachineEvent event = StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT;
- ServerStatus newStatus =
- StatusMachine.computeNewStatus(status, event);
-
- if (newStatus == ServerStatus.INVALID_STATUS)
- {
- msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
- Short.toString(serverId), status.toString(), event.toString());
- logError(msg);
- return;
- }
-
- // Store new status
- status = newStatus;
-
- if (debugEnabled())
- TRACER.debugInfo("Replication domain " + baseDn +
- " new status is: " + status);
-
- // Perform whatever actions are needed to apply properties for being
- // compliant with new status
- updateDomainForNewStatus();
- }
-
Backend backend = retrievesBackend(baseDn);
try
@@ -3526,26 +2358,8 @@
}
else
{
- if (initializeMessage.getRequestorID() == serverId)
- {
- // The import responds to a request we did so the IEContext
- // is already acquired
- }
- else
- {
- acquireIEContext();
- }
-
- ieContext.importSource = initializeMessage.getsenderID();
- ieContext.entryLeftCount = initializeMessage.getEntryCount();
- ieContext.setCounters(initializeMessage.getEntryCount(),
- initializeMessage.getEntryCount());
-
- preBackendImport(backend);
-
- ieContext.ldifImportInputStream = new ReplLDIFInputStream(this);
importConfig =
- new LDIFImportConfig(ieContext.ldifImportInputStream);
+ new LDIFImportConfig(input);
List<DN> includeBranches = new ArrayList<DN>();
includeBranches.add(this.baseDn);
importConfig.setIncludeBranches(includeBranches);
@@ -3553,11 +2367,12 @@
// TODO How to deal with rejected entries during the import
importConfig.writeRejectedEntries(
- getFileForPath("logs" + File.separator +
- "replInitRejectedEntries").getAbsolutePath(),
- ExistingFileBehavior.OVERWRITE);
+ getFileForPath("logs" + File.separator +
+ "replInitRejectedEntries").getAbsolutePath(),
+ ExistingFileBehavior.OVERWRITE);
// Process import
+ preBackendImport(backend);
backend.importLDIF(importConfig);
stateSavingDisabled = false;
@@ -3570,9 +2385,6 @@
}
finally
{
- if ((ieContext != null) && (ieContext.exception != null))
- de = ieContext.exception;
-
// Cleanup
if (importConfig != null)
{
@@ -3592,7 +2404,6 @@
TRACER.debugInfo(
"After import, the replication plugin restarts connections" +
" to all RSs to provide new generation ID=" + generationId);
- broker.setGenerationId(generationId);
}
catch (DirectoryException fe)
{
@@ -3604,29 +2415,12 @@
if (de == null)
de = fe;
}
-
- // Re-exchange generationID and state with RS
- broker.reStart();
-
- // Update the task that initiated the import
- if ((ieContext != null ) && (ieContext.initializeTask != null))
- {
- ((InitializeTask)ieContext.initializeTask).
- updateTaskCompletionState(de);
- }
- releaseIEContext();
}
// Sends up the root error.
if (de != null)
{
throw de;
}
-
- msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
- Short.toString(serverId),
- baseDn.toString(),
- Long.toString(initializeMessage.getRequestorID()));
- logError(msg);
}
/**
@@ -3660,10 +2454,10 @@
* @throws DirectoryException When an error occurred or no domain
* match the provided baseDn.
*/
- public static ReplicationDomain retrievesReplicationDomain(DN baseDn)
+ public static LDAPReplicationDomain retrievesReplicationDomain(DN baseDn)
throws DirectoryException
{
- ReplicationDomain replicationDomain = null;
+ LDAPReplicationDomain replicationDomain = null;
// Retrieves the domain
DirectoryServer.getSynchronizationProviders();
@@ -3678,7 +2472,7 @@
}
// From the domainDN retrieves the replication domain
- ReplicationDomain sdomain =
+ LDAPReplicationDomain sdomain =
MultimasterReplication.findDomain(baseDn, null);
if (sdomain == null)
{
@@ -3714,15 +2508,6 @@
return retrievesBackend(baseDn);
}
- /**
- * Returns a boolean indicating if an import or export is currently
- * processed.
- * @return The status
- */
- public boolean ieRunning()
- {
- return (ieContext != null);
- }
/*
* <<Total Update
*/
@@ -3769,7 +2554,7 @@
{
// Check that there is not already a domain with the same DN
DN dn = configuration.getBaseDN();
- ReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
+ LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
if ((domain != null) && (domain.baseDn.equals(dn)))
{
Message message = ERR_SYNC_INVALID_DN.get();
@@ -3793,37 +2578,12 @@
public ConfigChangeResult applyConfigurationChange(
ReplicationDomainCfg configuration)
{
- // server id and base dn are readonly.
- // isolationPolicy can be set immediately and will apply
- // to the next updates.
- // The other parameters needs to be renegociated with the ReplicationServer
- // so that requires restarting the session with the ReplicationServer.
- Boolean needToRestartSession = false;
- Collection<String> newReplServers = configuration.getReplicationServer();
-
- // A new session is necessary only when information regarding
- // the connection is modified
- if ((!(replicationServers.size() == newReplServers.size()
- && replicationServers.containsAll(newReplServers))) ||
- window != configuration.getWindowSize() ||
- heartbeatInterval != configuration.getHeartbeatInterval())
- needToRestartSession = true;
-
- replicationServers = newReplServers;
- window = configuration.getWindowSize();
- heartbeatInterval = configuration.getHeartbeatInterval();
- broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay,
- maxSendQueue, maxSendDelay, window, heartbeatInterval);
isolationpolicy = configuration.getIsolationPolicy();
- // To be able to stop and restart the broker properly just
- // disable and enable the domain. That way a new session
- // with the new configuration is available.
- if (needToRestartSession)
- {
- this.disable();
- this.enable();
- }
+ changeConfig(
+ configuration.getReplicationServer(),
+ configuration.getWindowSize(),
+ configuration.getHeartbeatInterval());
return new ConfigChangeResult(ResultCode.SUCCESS, false);
}
@@ -3867,101 +2627,265 @@
}
/**
- * Check if the domain is connected to a ReplicationServer.
+ * Starts the Replication Domain.
+ */
+ public void start()
+ {
+ // Create the ServerStateFlush thread
+ flushThread = new ServerStateFlush();
+ flushThread.start();
+
+ startListenService();
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ public void sessionInitiated(
+ ServerStatus initStatus,
+ ServerState replicationServerState,
+ ProtocolSession session)
+ {
+ super.sessionInitiated(initStatus, replicationServerState, session);
+ try
+ {
+ /*
+ * We must not publish changes to a replicationServer that has
+ * not seen all our previous changes because this could cause
+ * some other ldap servers to miss those changes.
+ * Check that the ReplicationServer has seen all our previous
+ * changes.
+ */
+ ChangeNumber replServerMaxChangeNumber =
+ replicationServerState.getMaxChangeNumber(serverId);
+
+ if (replServerMaxChangeNumber == null)
+ {
+ replServerMaxChangeNumber = new ChangeNumber(0, 0, serverId);
+ }
+ ChangeNumber ourMaxChangeNumber =
+ state.getMaxChangeNumber(serverId);
+
+ if ((ourMaxChangeNumber != null) &&
+ (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
+ {
+
+ // Replication server is missing some of our changes: let's
+ // send them to him.
+
+ Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
+ logError(message);
+
+ /*
+ * Get all the changes that have not been seen by this
+ * replication server and populate the replayOperations
+ * list.
+ */
+ InternalSearchOperation op = searchForChangedEntries(
+ baseDn, replServerMaxChangeNumber, this);
+ if (op.getResultCode() != ResultCode.SUCCESS)
+ {
+ /*
+ * An error happened trying to search for the updates
+ * This server will start accepting again new updates but
+ * some inconsistencies will stay between servers.
+ * Log an error for the repair tool
+ * that will need to re-synchronize the servers.
+ */
+ message = ERR_CANNOT_RECOVER_CHANGES.get(
+ baseDn.toNormalizedString());
+ logError(message);
+ } else
+ {
+ for (FakeOperation replayOp : replayOperations)
+ {
+ ChangeNumber cn = replayOp.getChangeNumber();
+ /*
+ * Because the entry returned by the search operation
+ * can contain old historical information, it is
+ * possible that some of the FakeOperation are
+ * actually older than the
+ * Only send the Operation if it was newer than
+ * the last ChangeNumber known by the Replication Server.
+ */
+ if (cn.newer(replServerMaxChangeNumber))
+ {
+ message =
+ DEBUG_SENDING_CHANGE.get(
+ replayOp.getChangeNumber().toString());
+ logError(message);
+ session.publish(replayOp.generateMessage());
+ }
+ }
+ message = DEBUG_CHANGES_SENT.get();
+ logError(message);
+ }
+ replayOperations.clear();
+ }
+ } catch (Exception e)
+ {
+ Message message = ERR_PUBLISHING_FAKE_OPS.get(
+ baseDn.toNormalizedString(),
+ e.getLocalizedMessage() + stackTraceToSingleLineString(e));
+ logError(message);
+ }
+ }
+
+ /**
+ * Search for the changes that happened since fromChangeNumber
+ * based on the historical attribute. The only changes that will
+ * be send will be the one generated on the serverId provided in
+ * fromChangeNumber.
+ * @param baseDn the base DN
+ * @param fromChangeNumber The change number from which we want the changes
+ * @param resultListener that will process the entries returned.
+ * @return the internal search operation
+ * @throws Exception when raised.
+ */
+ public static InternalSearchOperation searchForChangedEntries(
+ DN baseDn,
+ ChangeNumber fromChangeNumber,
+ InternalSearchListener resultListener)
+ throws Exception
+ {
+ InternalClientConnection conn =
+ InternalClientConnection.getRootConnection();
+ Short serverId = fromChangeNumber.getServerId();
+
+ String maxValueForId = "ffffffffffffffff" +
+ String.format("%04x", serverId) + "ffffffff";
+
+ LDAPFilter filter = LDAPFilter.decode(
+ "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
+ + fromChangeNumber + ")(" + Historical.HISTORICALATTRIBUTENAME +
+ "<=dummy:" + maxValueForId + "))");
+
+ LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+ attrs.add(Historical.HISTORICALATTRIBUTENAME);
+ attrs.add(Historical.ENTRYUIDNAME);
+ attrs.add("*");
+ return conn.processSearch(
+ new ASN1OctetString(baseDn.toString()),
+ SearchScope.WHOLE_SUBTREE,
+ DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, 0, false, filter,
+ attrs,
+ resultListener);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void handleInternalSearchEntry(
+ InternalSearchOperation searchOperation,
+ SearchResultEntry searchEntry)
+ {
+ /*
+ * This call back is called at session establishment phase
+ * for each entry that has been changed by this server and the changes
+ * have not been sent to any Replication Server.
+ * The role of this method is to build equivalent operation from
+ * the historical information and add them in the replayOperations
+ * table.
+ */
+ Iterable<FakeOperation> updates =
+ Historical.generateFakeOperations(searchEntry);
+ for (FakeOperation op : updates)
+ {
+ replayOperations.add(op);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void handleInternalSearchReference(
+ InternalSearchOperation searchOperation,
+ SearchResultReference searchReference)
+ {
+ // TODO to be implemented
+ }
+
+
+ /**
+ * This method should return the total number of objects in the
+ * replicated domain.
+ * This count will be used for reporting.
*
- * @return true if the server is connected, false if not.
+ * @throws DirectoryException when needed.
+ *
+ * @return The number of objects in the replication domain.
*/
- public boolean isConnected()
+ public long countEntries() throws DirectoryException
{
- if (broker != null)
- return broker.isConnected();
- else
+ Backend backend = retrievesBackend(baseDn);
+
+ if (!backend.supportsLDIFExport())
+ {
+ Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
+ backend.getBackendID().toString());
+ logError(message);
+ throw new DirectoryException(ResultCode.OTHER, message);
+ }
+
+ return backend.numSubordinates(baseDn, true) + 1;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean processUpdate(UpdateMsg updateMsg)
+ {
+ if (updateMsg instanceof LDAPUpdateMsg)
+ {
+ LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg;
+
+ // put the UpdateMsg in the RemotePendingChanges list.
+ remotePendingChanges.putRemoteUpdate(msg);
+
+ // Put update message into the replay queue
+ // (block until some place in the queue is available)
+ UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
+ updateToReplayQueue.offer(updateToReplay);
+
return false;
+ }
+
+ // unknown message type, this should not happen, just ignore it.
+ return true;
}
/**
- * Determine whether the connection to the replication server is encrypted.
- * @return true if the connection is encrypted, false otherwise.
+ * Monitoring information for the LDAPReplicationDomain.
+ *
+ * @return Monitoring attributes specific to the LDAPReplicationDomain.
*/
- public boolean isSessionEncrypted()
+ public Collection<Attribute> getAdditionalMonitoring()
{
- if (broker != null)
- return broker.isSessionEncrypted();
- else
- return false;
- }
+ ArrayList<Attribute> attributes = new ArrayList<Attribute>();
- /**
- * Gets the info for DSs in the topology (except us).
- * @return The info for DSs in the topology (except us)
- */
- public List<DSInfo> getDsList()
- {
- return dsList;
- }
+ /* get number of changes in the pending list */
+ ReplicationMonitor.addMonitorData(
+ attributes, "pending-updates", getPendingUpdatesCount());
- /**
- * Gets the info for RSs in the topology (except the one we are connected
- * to).
- * @return The info for RSs in the topology (except the one we are connected
- * to)
- */
- public List<RSInfo> getRsList()
- {
- return rsList;
- }
+ /* get number of changes successfully */
+ ReplicationMonitor.addMonitorData(attributes, "replayed-updates-ok",
+ getNumReplayedPostOpCalled());
- /**
- * Tells if assured replication is enabled for this domain.
- * @return True if assured replication is enabled for this domain.
- */
- public boolean isAssured()
- {
- return assured;
- }
+ /* get number of modify conflicts */
+ ReplicationMonitor.addMonitorData(attributes, "resolved-modify-conflicts",
+ getNumResolvedModifyConflicts());
- /**
- * Gives the mode for the assured replication of the domain.
- * @return The mode for the assured replication of the domain.
- */
- public AssuredMode getAssuredMode()
- {
- return assuredMode;
- }
+ /* get number of naming conflicts */
+ ReplicationMonitor.addMonitorData(attributes, "resolved-naming-conflicts",
+ getNumResolvedNamingConflicts());
- /**
- * Gives the assured level of the replication of the domain.
- * @return The assured level of the replication of the domain.
- */
- public byte getAssuredSdLevel()
- {
- return assuredSdLevel;
- }
+ /* get number of unresolved naming conflicts */
+ ReplicationMonitor.addMonitorData(attributes, "unresolved-naming-conflicts",
+ getNumUnresolvedNamingConflicts());
- /**
- * Gets the group id for this domain.
- * @return The group id for this domain.
- */
- public byte getGroupId()
- {
- return groupId;
- }
-
- /**
- * Gets the referrals URLs this domain publishes.
- * @return The referrals URLs this domain publishes.
- */
- public List<String> getRefUrls()
- {
- return refUrls;
- }
-
- /**
- * Gets the status for this domain.
- * @return The status for this domain.
- */
- public ServerStatus getStatus()
- {
- return status;
+ return attributes;
}
}
--
Gitblit v1.10.0