From b48ce50fdf4d73e8be3799e3a7c6c2bf9d1b2965 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Sun, 02 Sep 2007 17:58:07 +0000
Subject: [PATCH] fix for #1733 & #845 - Initialization of replication
---
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 673 ++++++++++++++++++++++++++++++++++++++++++-------------
1 files changed, 514 insertions(+), 159 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 788f72d..0724851 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -27,8 +27,6 @@
package org.opends.server.replication.plugin;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
-
-import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
@@ -38,12 +36,14 @@
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import org.opends.server.protocols.asn1.ASN1OctetString;
import static org.opends.server.util.ServerConstants.*;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
@@ -52,12 +52,14 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
+import java.util.zip.Adler32;
+import java.io.OutputStream;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.MultimasterDomainCfgDefn.*;
import org.opends.server.admin.std.server.MultimasterDomainCfg;
-import org.opends.server.admin.std.server.BackendCfg;
import org.opends.server.api.AlertGenerator;
import org.opends.server.api.Backend;
import org.opends.server.api.DirectoryThread;
@@ -77,6 +79,9 @@
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPAttribute;
+import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
@@ -89,6 +94,7 @@
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ConfigChangeResult;
+import org.opends.server.types.Control;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
@@ -100,6 +106,7 @@
import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.RDN;
+import org.opends.server.types.RawModification;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
@@ -172,6 +179,9 @@
private int maxSendQueue = 0;
private int maxReceiveDelay = 0;
private int maxSendDelay = 0;
+ private long generationId = -1;
+ private long rejectedGenerationId = -1;
+ private boolean requestedResetSinceLastStart = false;
/**
* This object is used to store the list of update currently being
@@ -224,7 +234,7 @@
private int window = 100;
/**
- * The isoalation policy that this domain is going to use.
+ * The isolation policy that this domain is going to use.
* This field describes the behavior of the domain when an update is
* attempted and the domain could not connect to any Replication Server.
* Possible values are accept-updates or deny-updates, but other values
@@ -254,17 +264,20 @@
// The total entry count expected to be processed
long entryCount = 0;
- // The count for the entry left to be processed
+ // The count for the entry not yet processed
long entryLeftCount = 0;
+ boolean checksumOutput = false;
+
// The exception raised when any
DirectoryException exception = null;
+ long checksumOutputValue = (long)0;
/**
- * Initializes the counters of the task with the provider value.
+ * Initializes the import/export counters with the provider value.
* @param count The value with which to initialize the counters.
*/
- public void initTaskCounters(long count)
+ public void initImportExportCounters(long count)
{
entryCount = count;
entryLeftCount = count;
@@ -288,7 +301,7 @@
* Update the counters of the task for each entry processed during
* an import or export.
*/
- public void updateTaskCounters()
+ public void updateCounters()
{
entryLeftCount--;
@@ -342,12 +355,12 @@
configDn = configuration.dn();
/*
- * Modify conflicts are solved for all suffixes but the schema suffix
- * because we don't want to store extra information in the schema
- * ldif files.
- * This has no negative impact because the changes on schema should
- * not produce conflicts.
- */
+ * Modify conflicts are solved for all suffixes but the schema suffix
+ * because we don't want to store extra information in the schema
+ * ldif files.
+ * This has no negative impact because the changes on schema should
+ * not produce conflicts.
+ */
if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
{
solveConflictFlag = false;
@@ -370,27 +383,33 @@
monitor = new ReplicationMonitor(this);
DirectoryServer.registerMonitorProvider(monitor);
+ backend = retrievesBackend(baseDN);
+ if (backend == null)
+ {
+ throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
+ baseDN.toNormalizedString()));
+ }
+
+ try
+ {
+ generationId = loadGenerationId();
+ }
+ catch (DirectoryException e)
+ {
+ logError(ERR_LOADING_GENERATION_ID.get(
+ baseDN.toNormalizedString(), e.getLocalizedMessage()));
+ }
+
/*
* create the broker object used to publish and receive changes
*/
broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue,
maxReceiveDelay, maxSendQueue, maxSendDelay, window,
- heartbeatInterval, new ReplSessionSecurity(configuration));
+ heartbeatInterval, generationId,
+ new ReplSessionSecurity(configuration));
broker.start(replicationServers);
- // Retrieves the related backend and its config entry
- try
- {
- retrievesBackendInfos(baseDN);
- } catch (DirectoryException e)
- {
- // The backend associated to this suffix is not able to
- // perform export and import.
- // The replication can continue but this replicationDomain
- // won't be able to use total update.
- }
-
/*
* ChangeNumberGenerator is used to create new unique ChangeNumbers
* for each operation done on the replication domain.
@@ -558,7 +577,7 @@
* If not set the ResultCode and the response message,
* interrupt the operation, and return false
*
- * @param op The Operation that needs to be checked.
+ * @param Operation The Operation that needs to be checked.
*
* @return true when it OK to process the Operation, false otherwise.
* When false is returned the resultCode and the reponse message
@@ -798,7 +817,11 @@
// The server is in the shutdown process
return null;
}
- log(Message.raw("Broker received message :" + msg));
+
+ if (debugEnabled())
+ if (!(msg instanceof HeartbeatMessage))
+ TRACER.debugInfo("Message received <" + msg + ">");
+
if (msg instanceof AckMessage)
{
AckMessage ack = (AckMessage) msg;
@@ -808,7 +831,7 @@
{
// Another server requests us to provide entries
// for a total update
- initMsg = (InitializeRequestMessage) msg;
+ initMsg = (InitializeRequestMessage)msg;
}
else if (msg instanceof InitializeTargetMessage)
{
@@ -822,20 +845,19 @@
// bunch of entries from the remote server and we
// want the import thread to catch them and
// not the ListenerThread.
- importBackend(importMsg);
+ initialize(importMsg);
}
catch(DirectoryException de)
{
+ // Returns an error message to notify the sender
+ ErrorMessage errorMsg =
+ new ErrorMessage(importMsg.getsenderID(),
+ de.getMessageObject());
MessageBuilder mb = new MessageBuilder();
mb.append(de.getMessageObject());
mb.append("Backend ID: ");
mb.append(backend.getBackendID());
- log(mb.toMessage());
-
- // Return an error message to notify the sender
- ErrorMessage errorMsg =
- new ErrorMessage(importMsg.getsenderID(),
- de.getMessageObject());
+ TRACER.debugInfo(Message.toString(mb.toMessage()));
broker.publish(errorMsg);
}
}
@@ -850,6 +872,39 @@
// replicationServer did not find any import source.
abandonImportExport((ErrorMessage)msg);
}
+ else
+ {
+ /* We can receive an error message from the replication server
+ * in the following cases :
+ * - we connected with an incorrect generation id
+ */
+ ErrorMessage errorMsg = (ErrorMessage)msg;
+ logError(ERR_ERROR_MSG_RECEIVED.get(
+ errorMsg.getDetails()));
+
+ if (errorMsg.getMsgID() == NOTE_RESET_GENERATION_ID.getId())
+ {
+ TRACER.debugInfo("requestedResetSinceLastStart=" +
+ requestedResetSinceLastStart +
+ "rejectedGenerationId=" + rejectedGenerationId);
+
+ if (requestedResetSinceLastStart && (rejectedGenerationId>0))
+ {
+ // When the last generation presented was refused and we are
+ // the 'reseter' server then restart automatically to become
+ // the 'master'
+ state.clear();
+ rejectedGenerationId = -1;
+ requestedResetSinceLastStart = false;
+ broker.stop();
+ broker.start(replicationServers);
+ }
+ }
+ if (errorMsg.getMsgID() == NOTE_BAD_GENERATION_ID.getId())
+ {
+ rejectedGenerationId = generationId;
+ }
+ }
}
else if (msg instanceof UpdateMessage)
{
@@ -875,7 +930,7 @@
{
try
{
- initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+ initializeRemote(initMsg.getsenderID(), initMsg.getsenderID(),
null);
}
catch(DirectoryException de)
@@ -2100,7 +2155,7 @@
public void disable()
{
state.save();
- state.clear();
+ state.clearInMemory();
disabled = true;
// stop the listener threads
for (ListenerThread thread : synchroThreads)
@@ -2126,20 +2181,252 @@
* The domain will connect back to a replication Server and
* will recreate threads to listen for messages from the Sycnhronization
* server.
+ * The generationId will be retrieved or computed if necessary.
* The ServerState will also be read again from the local database.
*/
public void enable()
{
- state.clear();
+ state.clearInMemory();
state.loadState();
disabled = false;
+
+ try
+ {
+ generationId = loadGenerationId();
+ }
+ catch (Exception e)
+ {
+ /* TODO should mark that replicationServer service is
+ * not available, log an error and retry upon timeout
+ * should we stop the modifications ?
+ */
+ logError(ERR_LOADING_GENERATION_ID.get(
+ baseDN.toNormalizedString(), e.getLocalizedMessage()));
+ return;
+ }
+
+ // After an on-line import, the value of the generationId is new
+ // and it is necessary for the broker to send this new value as part
+ // of the serverStart message.
+ broker.setGenerationId(generationId);
+
broker.start(replicationServers);
createListeners();
}
/**
+ * Compute the data generationId associated with the current data present
+ * in the backend for this domain.
+ * @return The computed generationId.
+ * @throws DirectoryException When an error occurs.
+ */
+ public long computeGenerationId() throws DirectoryException
+ {
+ long bec = backend.getEntryCount();
+ if (bec<0)
+ backend = this.retrievesBackend(baseDN);
+ bec = backend.getEntryCount();
+ this.acquireIEContext();
+ ieContext.checksumOutput = true;
+ ieContext.entryCount = (bec<1000?bec:1000);
+ ieContext.entryLeftCount = ieContext.entryCount;
+ exportBackend();
+ long genId = ieContext.checksumOutputValue;
+
+ if (debugEnabled())
+ TRACER.debugInfo("Computed generationId: #entries=" + bec +
+ " generationId=" + ieContext.checksumOutputValue);
+ ieContext.checksumOutput = false;
+ this.releaseIEContext();
+ return genId;
+ }
+
+ /**
+ * Returns the generationId set for this domain.
+ *
+ * @return The generationId.
+ */
+ public long getGenerationId()
+ {
+ return generationId;
+ }
+
+ /**
+ * The attribute name used to store the state in the backend.
+ */
+ protected static final String REPLICATION_GENERATION_ID =
+ "ds-sync-generation-id";
+
+ /**
+ * Stores the value of the generationId.
+ * @param generationId The value of the generationId.
+ * @return a ResultCode indicating if the method was successfull.
+ */
+ public ResultCode saveGenerationId(long generationId)
+ {
+ // The generationId is stored in the root entry of the domain.
+ ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
+
+ ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
+ ASN1OctetString value = new ASN1OctetString(Long.toString(generationId));
+ values.add(value);
+
+ LDAPAttribute attr =
+ new LDAPAttribute(REPLICATION_GENERATION_ID, values);
+ LDAPModification mod = new LDAPModification(ModificationType.REPLACE, attr);
+ ArrayList<RawModification> mods = new ArrayList<RawModification>(1);
+ mods.add(mod);
+
+ ModifyOperationBasis op =
+ new ModifyOperationBasis(conn, InternalClientConnection.nextOperationID(),
+ InternalClientConnection.nextMessageID(),
+ new ArrayList<Control>(0), asn1BaseDn,
+ mods);
+ op.setInternalOperation(true);
+ op.setSynchronizationOperation(true);
+ op.setDontSynchronize(true);
+
+ op.run();
+
+ ResultCode result = op.getResultCode();
+ if (result != ResultCode.SUCCESS)
+ {
+ Message message = ERR_UPDATING_GENERATION_ID.get(
+ op.getResultCode().getResultCodeName() + " " +
+ op.getErrorMessage(),
+ baseDN.toString());
+ logError(message);
+ }
+ return result;
+ }
+
+
+ /**
+ * Load the GenerationId from the root entry of the domain
+ * from the REPLICATION_GENERATION_ID attribute in database
+ * to memory, or compute it if not found.
+ *
+ * @return generationId The retrieved value of generationId
+ * @throws DirectoryException When an error occurs.
+ */
+ public long loadGenerationId()
+ throws DirectoryException
+ {
+ long generationId=-1;
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "Attempt to read generation ID from DB " + baseDN.toString());
+
+ ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
+ boolean found = false;
+ LDAPFilter filter;
+ try
+ {
+ filter = LDAPFilter.decode("objectclass=*");
+ }
+ catch (LDAPException e)
+ {
+ // can not happen
+ return -1;
+ }
+
+ /*
+ * Search the database entry that is used to periodically
+ * save the ServerState
+ */
+ InternalSearchOperation search = null;
+ LinkedHashSet<String> attributes = new LinkedHashSet<String>(1);
+ attributes.add(REPLICATION_GENERATION_ID);
+ search = conn.processSearch(asn1BaseDn,
+ SearchScope.BASE_OBJECT,
+ DereferencePolicy.DEREF_ALWAYS, 0, 0, false,
+ filter,attributes);
+ if (((search.getResultCode() != ResultCode.SUCCESS)) &&
+ ((search.getResultCode() != ResultCode.NO_SUCH_OBJECT)))
+ {
+ Message message = ERR_SEARCHING_GENERATION_ID.get(
+ search.getResultCode().getResultCodeName() + " " +
+ search.getErrorMessage(),
+ baseDN.toString());
+ logError(message);
+ }
+
+ SearchResultEntry resultEntry = null;
+ if (search.getResultCode() == ResultCode.SUCCESS)
+ {
+ LinkedList<SearchResultEntry> result = search.getSearchEntries();
+ resultEntry = result.getFirst();
+ if (resultEntry != null)
+ {
+ AttributeType synchronizationGenIDType =
+ DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
+ List<Attribute> attrs =
+ resultEntry.getAttribute(synchronizationGenIDType);
+ if (attrs != null)
+ {
+ Attribute attr = attrs.get(0);
+ LinkedHashSet<AttributeValue> values = attr.getValues();
+ if (values.size()!=1)
+ {
+ Message message = ERR_LOADING_GENERATION_ID.get(
+ baseDN.toString(), "#Values != 1");
+ logError(message);
+ }
+ else
+ {
+ found=true;
+ try
+ {
+ generationId = Long.decode(values.iterator().next().
+ getStringValue());
+ }
+ catch(Exception e)
+ {
+ Message message = ERR_LOADING_GENERATION_ID.get(
+ baseDN.toString(), e.getLocalizedMessage());
+ logError(message);
+ }
+ }
+ }
+ }
+ }
+
+ if (!found)
+ {
+ generationId = computeGenerationId();
+ saveGenerationId(generationId);
+
+ if (debugEnabled())
+ TRACER.debugInfo("Generation ID created for domain base DN=" +
+ baseDN.toString() +
+ " generationId=" + generationId);
+ }
+ else
+ {
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "Generation ID successfully read from domain base DN=" + baseDN +
+ " generationId=" + generationId);
+ }
+ return generationId;
+ }
+
+ /**
+ * Reset the generationId of this domain in the whole topology.
+ * A message is sent to the Replication Servers for them to reset
+ * their change dbs.
+ */
+ public void resetGenerationId()
+ {
+ requestedResetSinceLastStart = true;
+ ResetGenerationId genIdMessage = new ResetGenerationId();
+ broker.publish(genIdMessage);
+ }
+
+ /**
* Do whatever is needed when a backup is started.
* We need to make sure that the serverState is correclty save.
*/
@@ -2175,18 +2462,20 @@
{
msg = broker.receive();
+ if (debugEnabled())
+ TRACER.debugInfo("Import: EntryBytes received " + msg);
if (msg == null)
{
// The server is in the shutdown process
return null;
}
- log(Message.raw("receiveEntryBytes: received " + msg));
+
if (msg instanceof EntryMessage)
{
// FIXME
EntryMessage entryMsg = (EntryMessage)msg;
byte[] entryBytes = entryMsg.getEntryBytes().clone();
- ieContext.updateTaskCounters();
+ ieContext.updateCounters();
return entryBytes;
}
else if (msg instanceof DoneMessage)
@@ -2202,8 +2491,9 @@
// The error is stored and the import is ended
// by returning null
ErrorMessage errorMsg = (ErrorMessage)msg;
- ieContext.exception = new DirectoryException(ResultCode.OTHER,
- errorMsg.getDetails());
+ ieContext.exception = new DirectoryException(
+ ResultCode.OTHER,
+ errorMsg.getDetails());
return null;
}
else
@@ -2213,9 +2503,10 @@
}
catch(Exception e)
{
+ // TODO: i18n
ieContext.exception = new DirectoryException(ResultCode.OTHER,
- Message.raw("received an unexpected message type"), e);
- return null;
+ Message.raw("received an unexpected message type" +
+ e.getLocalizedMessage()));
}
}
}
@@ -2299,26 +2590,17 @@
}
/**
- * Log debug message.
- * @param message The message to log.
+ * Export the entries from the backend.
+ * The ieContext must have been set before calling.
+ *
+ * @throws DirectoryException when an error occured
*/
- private void log(Message message)
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("DebugInfo" + message);
- }
- }
-
- /**
- * Export the entries.
- * @throws DirectoryException when an error occurred
- */
- protected void exportBackend() throws DirectoryException
+ protected void exportBackend()
+ throws DirectoryException
{
// FIXME Temporary workaround - will probably be fixed when implementing
// dynamic config
- retrievesBackendInfos(this.baseDN);
+ backend = retrievesBackend(this.baseDN);
// Acquire a shared lock for the backend.
try
@@ -2344,13 +2626,53 @@
ResultCode.OTHER, message, null);
}
- ReplLDIFOutputStream os = new ReplLDIFOutputStream(this);
+ OutputStream os;
+ ReplLDIFOutputStream ros;
+ if (ieContext.checksumOutput)
+ {
+ ros = new ReplLDIFOutputStream(this, ieContext.entryCount);
+ os = new CheckedOutputStream(ros, new Adler32());
+ try
+ {
+ os.write((Long.toString(ieContext.entryCount)).getBytes());
+ }
+ catch(Exception e)
+ {
+ // Should never happen
+ }
+ }
+ else
+ {
+ ros = new ReplLDIFOutputStream(this, (short)-1);
+ os = ros;
+ }
LDIFExportConfig exportConfig = new LDIFExportConfig(os);
+
+ // baseDN branch is the only one included in the export
List<DN> includeBranches = new ArrayList<DN>(1);
includeBranches.add(this.baseDN);
exportConfig.setIncludeBranches(includeBranches);
+ // For the checksum computing mode, only consider the 'stable' attributes
+ if (ieContext.checksumOutput)
+ {
+ String includeAttributeStrings[] =
+ {"objectclass", "sn", "cn", "entryuuid"};
+ HashSet<AttributeType> includeAttributes;
+ includeAttributes = new HashSet<AttributeType>();
+ for (String attrName : includeAttributeStrings)
+ {
+ AttributeType attrType = DirectoryServer.getAttributeType(attrName);
+ if (attrType == null)
+ {
+ attrType = DirectoryServer.getDefaultAttributeType(attrName);
+ }
+ includeAttributes.add(attrType);
+ }
+ exportConfig.setIncludeAttributes(includeAttributes);
+ }
+
// Launch the export.
try
{
@@ -2374,8 +2696,19 @@
}
finally
{
- // Clean up after the export by closing the export config.
- exportConfig.close();
+
+ if ((ieContext != null) && (ieContext.checksumOutput))
+ {
+ ieContext.checksumOutputValue =
+ ((CheckedOutputStream)os).getChecksum().getValue();
+ }
+ else
+ {
+ // Clean up after the export by closing the export config.
+ // Will also flush the export and export the remaining entries.
+ // This is a real export where writer has been initialized.
+ exportConfig.close();
+ }
// Release the shared lock on the backend.
try
@@ -2403,54 +2736,26 @@
}
/**
- * Retrieves the backend object related to the domain and the backend's
- * config entry. They will be used for import and export.
- * TODO This should be in a shared package rather than here.
+ * Retrieves the backend related to the domain.
*
+ * @return The backend of that domain.
* @param baseDN The baseDN to retrieve the backend
- * @throws DirectoryException when an error occired
*/
- protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
+ protected Backend retrievesBackend(DN baseDN)
{
// Retrieves the backend related to this domain
- Backend domainBackend = DirectoryServer.getBackend(baseDN);
- if (domainBackend == null)
- {
- Message message = ERR_CANNOT_DECODE_BASE_DN.get(DN_BACKEND_BASE, "");
- throw new DirectoryException(
- ResultCode.OTHER, message, null);
- }
-
- // Retrieves its configuration
- BackendCfg backendCfg = TaskUtils.getConfigEntry(domainBackend);
- if (backendCfg == null)
- {
- Message message =
- ERR_LDIFIMPORT_NO_BACKENDS_FOR_ID.get();
- logError(message);
- throw new DirectoryException(
- ResultCode.OTHER, message, null);
- }
-
- this.backend = domainBackend;
- if (! domainBackend.supportsLDIFImport())
- {
- Message message = ERR_LDIFIMPORT_CANNOT_IMPORT.get(
- String.valueOf(baseDN));
- logError(message);
- throw new DirectoryException(
- ResultCode.OTHER, message, null);
- }
+ return DirectoryServer.getBackend(baseDN);
}
/**
- * Sends lDIFEntry entry lines to the export target currently set.
+ * Exports an entry in LDIF format.
*
- * @param lDIFEntry The lines for the LDIF entry.
+ * @param lDIFEntry The entry to be exported..
+ *
* @throws IOException when an error occurred.
*/
- public void sendEntryLines(String lDIFEntry) throws IOException
+ public void exportLDIFEntry(String lDIFEntry) throws IOException
{
// If an error was raised - like receiving an ErrorMessage
// we just let down the export.
@@ -2461,12 +2766,14 @@
throw ioe;
}
- // new entry then send the current one
- EntryMessage entryMessage = new EntryMessage(
+ if (ieContext.checksumOutput == false)
+ {
+ // Actually send the entry
+ EntryMessage entryMessage = new EntryMessage(
serverId, ieContext.exportTarget, lDIFEntry.getBytes());
- broker.publish(entryMessage);
-
- ieContext.updateTaskCounters();
+ broker.publish(entryMessage);
+ }
+ ieContext.updateCounters();
}
/**
@@ -2477,9 +2784,11 @@
* and should be updated of its progress.
* @throws DirectoryException when an error occurs
*/
- public void initialize(short source, Task initTask)
+ public void initializeFromRemote(short source, Task initTask)
throws DirectoryException
{
+ // TRACER.debugInfo("Entering initializeFromRemote");
+
acquireIEContext();
ieContext.initializeTask = initTask;
@@ -2495,13 +2804,14 @@
/**
* Verifies that the given string represents a valid source
* from which this server can be initialized.
- * @param sourceString The string representaing the source
+ * @param sourceString The string representing the source
* @return The source as a short value
* @throws DirectoryException if the string is not valid
*/
public short decodeSource(String sourceString)
throws DirectoryException
{
+ TRACER.debugInfo("Entering decodeSource");
short source = 0;
Throwable cause = null;
try
@@ -2512,8 +2822,6 @@
// TODO Verifies serverID is in the domain
// We shold check here that this is a server implied
// in the current domain.
-
- log(Message.raw("Source decoded for import:" + source));
return source;
}
}
@@ -2525,11 +2833,15 @@
ResultCode resultCode = ResultCode.OTHER;
Message message = ERR_INVALID_IMPORT_SOURCE.get();
if (cause != null)
+ {
throw new DirectoryException(
resultCode, message, cause);
+ }
else
+ {
throw new DirectoryException(
resultCode, message);
+ }
}
/**
@@ -2600,57 +2912,65 @@
* @param target The target that should be initialized
* @param initTask The task that triggers this initialization and that should
* be updated with its progress.
+ *
* @exception DirectoryException When an error occurs.
*/
- public void initializeTarget(short target, Task initTask)
+ public void initializeRemote(short target, Task initTask)
throws DirectoryException
{
- initializeTarget(target, serverId, initTask);
+ initializeRemote(target, serverId, initTask);
}
/**
* Process the initialization of some other server or servers in the topology
- * specified by the target argument when this initialization has been
- * initiated by another server than this one.
+ * specified by the target argument when this initialization specifying the
+ * server that requests the initialization.
+ *
* @param target The target that should be initialized.
* @param requestorID The server that initiated the export.
* @param initTask The task that triggers this initialization and that should
* be updated with its progress.
+ *
* @exception DirectoryException When an error occurs.
*/
- public void initializeTarget(short target, short requestorID, Task initTask)
+ public void initializeRemote(short target, short requestorID, Task initTask)
throws DirectoryException
{
- // FIXME Temporary workaround - will probably be fixed when implementing
- // dynamic config
- retrievesBackendInfos(this.baseDN);
-
- acquireIEContext();
-
- ieContext.exportTarget = target;
- if (initTask != null)
- {
- ieContext.initializeTask = initTask;
- ieContext.initTaskCounters(backend.getEntryCount());
- }
-
- // Send start message to the peer
- InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
- baseDN, serverId, ieContext.exportTarget, requestorID,
- backend.getEntryCount());
-
- log(Message.raw("SD : publishes " + initializeMessage +
- " for #entries=" + backend.getEntryCount() + ieContext.entryLeftCount));
-
- broker.publish(initializeMessage);
-
try
{
+ // FIXME Temporary workaround - will probably be fixed when implementing
+ // dynamic config
+ backend = retrievesBackend(this.baseDN);
+
+ if (!backend.supportsLDIFExport())
+ {
+ Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
+ backend.getBackendID().toString());
+ logError(message);
+ throw new DirectoryException(ResultCode.OTHER, message);
+ }
+
+ acquireIEContext();
+
+ ieContext.exportTarget = target;
+ if (initTask != null)
+ {
+ ieContext.initializeTask = initTask;
+ ieContext.initImportExportCounters(backend.getEntryCount());
+ }
+
+ // Send start message to the peer
+ InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
+ baseDN, serverId, ieContext.exportTarget, requestorID,
+ backend.getEntryCount());
+
+ broker.publish(initializeMessage);
+
exportBackend();
// Notify the peer of the success
DoneMessage doneMsg = new DoneMessage(serverId,
- initializeMessage.getDestination());
+ initializeMessage.getDestination());
broker.publish(doneMsg);
releaseIEContext();
@@ -2658,7 +2978,9 @@
catch(DirectoryException de)
{
// Notify the peer of the failure
- ErrorMessage errorMsg = new ErrorMessage(target, de.getMessageObject());
+ ErrorMessage errorMsg =
+ new ErrorMessage(target,
+ de.getMessageObject());
broker.publish(errorMsg);
releaseIEContext();
@@ -2686,8 +3008,9 @@
StringBuilder failureReason = new StringBuilder();
if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
{
- Message message = ERR_LDIFIMPORT_CANNOT_LOCK_BACKEND.get(
- backend.getBackendID(), String.valueOf(failureReason));
+ Message message = ERR_INIT_CANNOT_LOCK_BACKEND.get(
+ backend.getBackendID(),
+ String.valueOf(failureReason));
logError(message);
throw new DirectoryException(ResultCode.OTHER, message);
}
@@ -2698,14 +3021,22 @@
* @param initializeMessage The message that initiated the import.
* @exception DirectoryException Thrown when an error occurs.
*/
- protected void importBackend(InitializeTargetMessage initializeMessage)
+ protected void initialize(InitializeTargetMessage initializeMessage)
throws DirectoryException
{
LDIFImportConfig importConfig = null;
+ DirectoryException de = null;
+
+ if (!backend.supportsLDIFImport())
+ {
+ Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get(
+ backend.getBackendID().toString());
+ logError(message);
+ throw new DirectoryException(ResultCode.OTHER, message);
+ }
+
try
{
- log(Message.raw("startImport"));
-
if (initializeMessage.getRequestorID() == serverId)
{
// The import responds to a request we did so the IEContext
@@ -2718,7 +3049,7 @@
ieContext.importSource = initializeMessage.getsenderID();
ieContext.entryLeftCount = initializeMessage.getEntryCount();
- ieContext.initTaskCounters(initializeMessage.getEntryCount());
+ ieContext.initImportExportCounters(initializeMessage.getEntryCount());
preBackendImport(this.backend);
@@ -2737,20 +3068,14 @@
// Process import
this.backend.importLDIF(importConfig);
+ TRACER.debugInfo("The import has ended successfully.");
stateSavingDisabled = false;
- // Re-exchange state with SS
- broker.stop();
- broker.start(replicationServers);
-
}
catch(Exception e)
{
- DirectoryException de =
- new DirectoryException(
- ResultCode.OTHER, Message.raw(e.getLocalizedMessage()));
- ieContext.exception = de;
- throw (de);
+ de = new DirectoryException(ResultCode.OTHER,
+ Message.raw(e.getLocalizedMessage()));
}
finally
{
@@ -2766,12 +3091,33 @@
((InitializeTask)ieContext.initializeTask).
setState(ieContext.updateTaskCompletionState(),ieContext.exception);
}
-
releaseIEContext();
- log(Message.raw("End importBackend"));
+ // Retrieves the generation ID associated with the data imported
+ try
+ {
+ generationId = loadGenerationId();
+ }
+ catch (DirectoryException e)
+ {
+ logError(ERR_LOADING_GENERATION_ID.get(
+ baseDN.toNormalizedString(),
+ e.getLocalizedMessage()));
+ }
+ rejectedGenerationId = -1;
+
+ if (debugEnabled())
+ TRACER.debugInfo(
+ "After import, the replication plugin restarts connections" +
+ " to all RSs to provide new generation ID=" + generationId);
+ broker.setGenerationId(generationId);
+
+ // Re-exchange generationID and state with RS
+ broker.reStart();
}
- // Success
+ // Sends up the root error.
+ if (de != null)
+ throw de;
}
/**
@@ -2794,7 +3140,6 @@
throw new DirectoryException(ResultCode.OTHER, message);
}
- // FIXME setBackendEnabled should be part taskUtils ?
TaskUtils.enableBackend(backend.getBackendID());
}
@@ -2988,6 +3333,16 @@
}
/**
+ * Check if the domain is connected to a ReplicationServer.
+ *
+ * @return true if the server is connected, false if not.
+ */
+ public boolean isConnected()
+ {
+ return broker.isConnected();
+ }
+
+ /**
* Determine whether the connection to the replication server is encrypted.
* @return true if the connection is encrypted, false otherwise.
*/
--
Gitblit v1.10.0