From 40cef7d36084fbe86d34cfa497628d8972c4c9e7 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 29 Mar 2007 17:53:41 +0000
Subject: [PATCH]
---
opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java | 1194 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 1,173 insertions(+), 21 deletions(-)
diff --git a/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java b/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
index 7b2e078..11dea9d 100644
--- a/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
+++ b/opends/src/server/org/opends/server/synchronization/plugin/SynchronizationDomain.java
@@ -26,47 +26,59 @@
*/
package org.opends.server.synchronization.plugin;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import static org.opends.server.util.ServerConstants.
- TIME_UNIT_MILLISECONDS_ABBR;
-import static org.opends.server.util.ServerConstants.
- TIME_UNIT_MILLISECONDS_FULL;
-import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_ABBR;
-import static org.opends.server.util.ServerConstants.TIME_UNIT_SECONDS_FULL;
+import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_BASE_DN;
+import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_CLASS;
+import static org.opends.server.config.ConfigConstants.ATTR_BACKEND_ID;
+import static org.opends.server.config.ConfigConstants.DN_BACKEND_BASE;
+import static org.opends.server.loggers.Error.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.debugInfo;
+import static org.opends.server.messages.ConfigMessages.*;
+import static org.opends.server.messages.MessageHandler.getMessage;
+import static org.opends.server.messages.ToolMessages.*;
import static org.opends.server.synchronization.common.LogMessages.*;
-import static org.opends.server.synchronization.plugin.Historical.*;
+import static org.opends.server.synchronization.plugin.Historical.ENTRYUIDNAME;
import static org.opends.server.synchronization.protocol.OperationContext.*;
-import static org.opends.server.loggers.Error.*;
-import static org.opends.server.messages.MessageHandler.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.opends.server.util.StaticUtils.createEntry;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.LinkedHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
+import org.opends.server.api.Backend;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
+import org.opends.server.api.SynchronizationProvider;
+import org.opends.server.backends.jeb.BackendImpl;
+import org.opends.server.backends.task.Task;
+import org.opends.server.backends.task.TaskState;
import org.opends.server.config.BooleanConfigAttribute;
import org.opends.server.config.ConfigAttribute;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.config.DNConfigAttribute;
import org.opends.server.config.IntegerConfigAttribute;
-import org.opends.server.config.StringConfigAttribute;
import org.opends.server.config.IntegerWithUnitConfigAttribute;
+import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
+import org.opends.server.core.LockFileManager;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.messages.MessageHandler;
+import org.opends.server.synchronization.common.LogMessages;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
@@ -77,19 +89,30 @@
import org.opends.server.synchronization.protocol.AckMessage;
import org.opends.server.synchronization.protocol.AddContext;
import org.opends.server.synchronization.protocol.DeleteContext;
+import org.opends.server.synchronization.protocol.DoneMessage;
+import org.opends.server.synchronization.protocol.EntryMessage;
+import org.opends.server.synchronization.protocol.ErrorMessage;
+import org.opends.server.synchronization.protocol.InitializeRequestMessage;
+import org.opends.server.synchronization.protocol.InitializeTargetMessage;
import org.opends.server.synchronization.protocol.ModifyContext;
import org.opends.server.synchronization.protocol.ModifyDNMsg;
import org.opends.server.synchronization.protocol.ModifyDnContext;
import org.opends.server.synchronization.protocol.OperationContext;
+import org.opends.server.synchronization.protocol.RoutableMessage;
import org.opends.server.synchronization.protocol.SynchronizationMessage;
import org.opends.server.synchronization.protocol.UpdateMessage;
+import org.opends.server.tasks.InitializeTargetTask;
+import org.opends.server.tasks.InitializeTask;
+import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
+import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.LDIFExportConfig;
+import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
@@ -137,8 +160,96 @@
* server. Zero means heartbeats are off.
*/
private long heartbeatInterval = 0;
+ short serverId;
- private short serverId;
+ /**
+ * This class contain the context related to an import or export
+ * launched on the domain.
+ */
+ private class IEContext
+ {
+ // The task that initiated the operation.
+ Task initializeTask;
+ // The input stream for the import
+ SynchroLDIFInputStream ldifImportInputStream = null;
+ // The target in the case of an export
+ short exportTarget = RoutableMessage.UNKNOWN_SERVER;
+ // The source in the case of an import
+ short importSource = RoutableMessage.UNKNOWN_SERVER;
+
+ // The total entry count expected to be processed
+ long entryCount = 0;
+ // The count for the entry left to be processed
+ long entryLeftCount = 0;
+
+ // The exception raised when any
+ DirectoryException exception = null;
+
+ /**
+ * Initializes the counters of the task with the provider value.
+ * @param count The value with which to initialize the counters.
+ */
+ public void initTaskCounters(long count)
+ {
+ entryCount = count;
+ entryLeftCount = count;
+
+ if (initializeTask != null)
+ {
+ if (initializeTask instanceof InitializeTask)
+ {
+ ((InitializeTask)initializeTask).setTotal(entryCount);
+ ((InitializeTask)initializeTask).setLeft(entryCount);
+ }
+ else if (initializeTask instanceof InitializeTargetTask)
+ {
+ ((InitializeTargetTask)initializeTask).setTotal(entryCount);
+ ((InitializeTargetTask)initializeTask).setLeft(entryCount);
+ }
+ }
+ }
+
+ /**
+ * Update the counters of the task for each entry processed during
+ * an import or export.
+ */
+ public void updateTaskCounters()
+ {
+ entryLeftCount--;
+
+ if (initializeTask != null)
+ {
+ if (initializeTask instanceof InitializeTask)
+ {
+ ((InitializeTask)initializeTask).setLeft(entryLeftCount);
+ }
+ else if (initializeTask instanceof InitializeTargetTask)
+ {
+ ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
+ }
+ }
+ }
+
+ /**
+ * Update the state of the task.
+ */
+ protected TaskState updateTaskCompletionState()
+ {
+ if (exception == null)
+ return TaskState.COMPLETED_SUCCESSFULLY;
+ else
+ return TaskState.STOPPED_BY_ERROR;
+ }
+ }
+
+ // The context related to an import or export being processed
+ // Null when none is being processed.
+ private IEContext ieContext = null;
+
+ // The backend informations necessary to make an import or export.
+ private Backend backend;
+ private ConfigEntry backendConfigEntry;
+ private List<DN> branches = new ArrayList<DN>(0);
private int listenerThreadNumber = 10;
private boolean receiveStatus = true;
@@ -160,6 +271,7 @@
private boolean solveConflictFlag = true;
private boolean disabled = false;
+ private boolean stateSavingDisabled = false;
static final String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
static final String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
@@ -205,8 +317,6 @@
timeUnits.put(TIME_UNIT_SECONDS_FULL, 1000D);
}
-
-
/**
* Creates a new SynchronizationDomain using configuration from configEntry.
*
@@ -217,6 +327,7 @@
public SynchronizationDomain(ConfigEntry configEntry) throws ConfigException
{
super("Synchronization flush");
+
/*
* read the centralized changelog server configuration
* this is a multivalued attribute
@@ -397,6 +508,10 @@
if (!receiveStatus)
broker.suspendReceive();
}
+
+ // Retrieves the related backend and its config entry
+ retrievesBackendInfos(baseDN);
+
} catch (Exception e)
{
/* TODO should mark that changelog service is
@@ -803,9 +918,9 @@
}
/**
- * Receive an update message from the changelog.
+ * Receives an update message from the changelog.
* also responsible for updating the list of pending changes
- * @return the received message
+ * @return the received message - null if none
*/
public UpdateMessage receive()
{
@@ -823,7 +938,7 @@
// The server is in the shutdown process
return null;
}
-
+ log("Broker received message :" + msg);
if (msg instanceof AckMessage)
{
AckMessage ack = (AckMessage) msg;
@@ -834,6 +949,56 @@
update = (UpdateMessage) msg;
receiveUpdate(update);
}
+ else if (msg instanceof InitializeRequestMessage)
+ {
+ // Another server requests us to provide entries
+ // for a total update
+ InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
+ try
+ {
+ initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+ null);
+ }
+ catch(DirectoryException de)
+ {
+ // Returns an error message to notify the sender
+ int msgID = de.getErrorMessageID();
+ ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
+ msgID, de.getMessage());
+ broker.publish(errorMsg);
+ }
+ }
+ else if (msg instanceof InitializeTargetMessage)
+ {
+ // Another server is exporting its entries to us
+ InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
+
+ try
+ {
+ importBackend(initMsg);
+ }
+ catch(DirectoryException de)
+ {
+ // Return an error message to notify the sender
+ int msgID = de.getErrorMessageID();
+ ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
+ msgID, de.getMessage());
+ log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
+ broker.publish(errorMsg);
+ }
+ }
+ else if (msg instanceof ErrorMessage)
+ {
+ if (ieContext != null)
+ {
+ // This is an error termination for the 2 following cases :
+ // - either during an export
+ // - or before an import really started
+ // For example, when we publish a request and the
+ // changelog did not find any import source.
+ abandonImportExport((ErrorMessage)msg);
+ }
+ }
} catch (SocketTimeoutException e)
{
// just retry
@@ -876,7 +1041,9 @@
public void receiveAck(AckMessage ack)
{
UpdateMessage update;
- ChangeNumber changeNumber = ack.getChangeNumber();
+ ChangeNumber changeNumber;
+
+ changeNumber = ack.getChangeNumber();
synchronized (pendingChanges)
{
@@ -1105,7 +1272,7 @@
synchronized (this)
{
this.wait(1000);
- if (!disabled )
+ if (!disabled && !stateSavingDisabled )
{
// save the RUV
state.save();
@@ -1151,6 +1318,8 @@
this.notify();
}
+ DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
+
// stop the ChangelogBroker
broker.stop();
}
@@ -1857,6 +2026,7 @@
return broker.getNumLostConnections();
}
+
/**
* Check if the domain solve conflicts.
*
@@ -1933,6 +2103,988 @@
// Nothing is needed at the moment
}
+ /*
+ * Total Update >>
+ */
+
+ /**
+ * Receives bytes related to an entry in the context of an import to
+ * initialize the domain (called by SynchronizationDomainLDIFInputStream).
+ *
+ * @return The bytes. Null when the Done or Err message has been received
+ */
+ public byte[] receiveEntryBytes()
+ {
+ SynchronizationMessage msg;
+ while (true)
+ {
+ try
+ {
+ msg = broker.receive();
+
+ if (msg == null)
+ {
+ // The server is in the shutdown process
+ return null;
+ }
+ log("receiveEntryBytes: received " + msg);
+ if (msg instanceof EntryMessage)
+ {
+ // FIXME
+ EntryMessage entryMsg = (EntryMessage)msg;
+ byte[] entryBytes = entryMsg.getEntryBytes().clone();
+ ieContext.updateTaskCounters();
+ return entryBytes;
+ }
+ else if (msg instanceof DoneMessage)
+ {
+ // This is the normal termination of the import
+ // No error is stored and the import is ended
+ // by returning null
+ return null;
+ }
+ else if (msg instanceof ErrorMessage)
+ {
+ // This is an error termination during the import
+ // The error is stored and the import is ended
+ // by returning null
+ ErrorMessage errorMsg = (ErrorMessage)msg;
+ ieContext.exception = new DirectoryException(ResultCode.OTHER,
+ errorMsg.getDetails() , errorMsg.getMsgID());
+ return null;
+ }
+ else
+ {
+ // Other messages received during an import are trashed
+ }
+ }
+ catch(Exception e)
+ {
+ ieContext.exception = new DirectoryException(ResultCode.OTHER,
+ "received an unexpected message type" , 1, e);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Processes an error message received while an import/export is
+ * on going.
+ * @param errorMsg The error message received.
+ */
+ protected void abandonImportExport(ErrorMessage errorMsg)
+ {
+ // FIXME TBD Treat the case where the error happens while entries
+ // are being exported
+
+ if (ieContext != null)
+ {
+ ieContext.exception = new DirectoryException(ResultCode.OTHER,
+ errorMsg.getDetails() , errorMsg.getMsgID());
+
+ if (ieContext.initializeTask instanceof InitializeTask)
+ {
+ // Update the task that initiated the import
+ ((InitializeTask)ieContext.initializeTask).
+ setState(ieContext.updateTaskCompletionState(),ieContext.exception);
+
+ ieContext = null;
+ }
+ }
+ }
+
+ /**
+ * Clears all the entries from the JE backend determined by the
+ * be id passed into the method.
+ *
+ * @param createBaseEntry Indicate whether to automatically create the base
+ * entry and add it to the backend.
+ * @param beID The be id to clear.
+ * @param dn The suffix of the backend to create if the the createBaseEntry
+ * boolean is true.
+ * @throws Exception If an unexpected problem occurs.
+ */
+ public static void clearJEBackend(boolean createBaseEntry, String beID,
+ String dn) throws Exception
+ {
+ BackendImpl backend = (BackendImpl)DirectoryServer.getBackend(beID);
+ DN[] baseDNs = backend.getBaseDNs();
+
+ // FIXME Should getConfigEntry be part of TaskUtils ?
+ ConfigEntry configEntry = TaskUtils.getConfigEntry(backend);
+
+ // FIXME Should setBackendEnabled be part of TaskUtils ?
+ TaskUtils.setBackendEnabled(configEntry, false);
+
+ try
+ {
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
+
+ if (!LockFileManager.acquireExclusiveLock(lockFile, failureReason))
+ {
+ throw new RuntimeException(failureReason.toString());
+ }
+
+ try
+ {
+ backend.clearBackend(configEntry, baseDNs);
+ }
+ finally
+ {
+ LockFileManager.releaseLock(lockFile, failureReason);
+ }
+ }
+ finally
+ {
+ TaskUtils.setBackendEnabled(configEntry, true);
+ }
+
+ if (createBaseEntry)
+ {
+ DN baseDN = DN.decode(dn);
+ Entry e = createEntry(baseDN);
+ backend = (BackendImpl)DirectoryServer.getBackend(beID);
+ backend.addEntry(e, null);
+ }
+ }
+
+ /**
+ * Log debug message.
+ * @param message The message to log.
+ */
+ private void log(String message)
+ {
+ if (debugEnabled())
+ {
+ debugInfo("DebugInfo" + message);
+ int msgID = MSGID_UNKNOWN_TYPE;
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.NOTICE,
+ "SynchronizationDomain/ " + message, msgID);
+ }
+ }
+
+ /**
+ * Export the entries.
+ * @throws DirectoryException when an error occured
+ */
+ protected void exportBackend() throws DirectoryException
+ {
+ // FIXME Temporary workaround - will probably be fixed when implementing
+ // dynamic config
+ retrievesBackendInfos(this.baseDN);
+
+ // Acquire a shared lock for the backend.
+ try
+ {
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
+ if (! LockFileManager.acquireSharedLock(lockFile, failureReason))
+ {
+ int msgID = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID(),
+ String.valueOf(failureReason));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFEXPORT_CANNOT_LOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID());
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+ message + " " + stackTraceToSingleLineString(e), msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ SynchroLDIFOutputStream os = new SynchroLDIFOutputStream(this);
+
+ LDIFExportConfig exportConfig = new LDIFExportConfig(os);
+
+ // Launch the export.
+ try
+ {
+ DN[] baseDNs = {this.baseDN};
+ backend.exportLDIF(backendConfigEntry, baseDNs, exportConfig);
+ }
+ catch (DirectoryException de)
+ {
+ int msgID = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
+ String message = getMessage(msgID, de.getErrorMessage());
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
+ msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFEXPORT_ERROR_DURING_EXPORT;
+ String message = getMessage(msgID, stackTraceToSingleLineString(e));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, message,
+ msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ finally
+ {
+ // Clean up after the export by closing the export config.
+ exportConfig.close();
+
+ // Release the shared lock on the backend.
+ try
+ {
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
+ if (! LockFileManager.releaseLock(lockFile, failureReason))
+ {
+ int msgID = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID(),
+ String.valueOf(failureReason));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
+ message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFEXPORT_CANNOT_UNLOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID(),
+ stackTraceToSingleLineString(e));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_WARNING,
+ message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ }
+ }
+
+ /**
+ * Retrieves the backend object related to the domain and the backend's
+ * config entry. They will be used for import and export.
+ * TODO This should be in a shared package rather than here.
+ *
+ * @param baseDN The baseDN to retrieve the backend
+ * @throws DirectoryException when an error occired
+ */
+ protected void retrievesBackendInfos(DN baseDN) throws DirectoryException
+ {
+ ArrayList<Backend> backendList = new ArrayList<Backend>();
+ ArrayList<ConfigEntry> entryList = new ArrayList<ConfigEntry>();
+ ArrayList<List<DN>> dnList = new ArrayList<List<DN>>();
+
+ Backend backend = null;
+ ConfigEntry backendConfigEntry = null;
+ List<DN> branches = new ArrayList<DN>(0);
+
+ // Retrieves the backend related to this domain
+ Backend domainBackend = DirectoryServer.getBackend(baseDN);
+ if (domainBackend == null)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
+ String message = getMessage(msgID, DN_BACKEND_BASE);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ // Retrieves its config entry and its DNs
+ int code = getBackends(backendList, entryList, dnList);
+ if (code != 0)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
+ String message = getMessage(msgID, DN_BACKEND_BASE);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ int numBackends = backendList.size();
+ for (int i=0; i < numBackends; i++)
+ {
+ Backend b = backendList.get(i);
+
+ if (domainBackend.getBackendID() != b.getBackendID())
+ {
+ continue;
+ }
+
+ if (backend == null)
+ {
+ backend = domainBackend;
+ backendConfigEntry = entryList.get(i).duplicate();
+ branches = dnList.get(i);
+ }
+ else
+ {
+ int msgID = MSGID_LDIFIMPORT_MULTIPLE_BACKENDS_FOR_ID;
+ String message = getMessage(msgID, domainBackend.getBackendID());
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ }
+
+ if (backend == null)
+ {
+ int msgID = MSGID_LDIFIMPORT_NO_BACKENDS_FOR_ID;
+ String message = getMessage(msgID, domainBackend.getBackendID());
+ logError(ErrorLogCategory.BACKEND,
+ ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ else if (! backend.supportsLDIFExport())
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_IMPORT;
+ String message = getMessage(msgID, 0); // FIXME
+ logError(ErrorLogCategory.BACKEND,
+ ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+
+ this.backend = backend;
+ this.backendConfigEntry = backendConfigEntry;
+ this.branches = branches;
+ }
+
+
+ /**
+ * Sends lDIFEntry entry lines to the export target currently set.
+ *
+ * @param lDIFEntry The lines for the LDIF entry.
+ * @throws IOException when an error occured.
+ */
+ public void sendEntryLines(String lDIFEntry) throws IOException
+ {
+ // If an error was raised - like receiving an ErrorMessage
+ // we just let down the export.
+ if (ieContext.exception != null)
+ {
+ IOException ioe = new IOException(ieContext.exception.getMessage());
+ ieContext = null;
+ throw ioe;
+ }
+
+ // new entry then send the current one
+ EntryMessage entryMessage = new EntryMessage(
+ serverId, ieContext.exportTarget, lDIFEntry.getBytes());
+ broker.publish(entryMessage);
+
+ ieContext.updateTaskCounters();
+ }
+
+ /**
+ * Retrieves information about the backends defined in the Directory Server
+ * configuration.
+ *
+ * @param backendList A list into which instantiated (but not initialized)
+ * backend instances will be placed.
+ * @param entryList A list into which the config entries associated with
+ * the backends will be placed.
+ * @param dnList A list into which the set of base DNs for each backend
+ * will be placed.
+ */
+ private static int getBackends(ArrayList<Backend> backendList,
+ ArrayList<ConfigEntry> entryList,
+ ArrayList<List<DN>> dnList)
+ throws DirectoryException
+ {
+ // Get the base entry for all backend configuration.
+ DN backendBaseDN = null;
+ try
+ {
+ backendBaseDN = DN.decode(DN_BACKEND_BASE);
+ }
+ catch (DirectoryException de)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
+ String message = getMessage(msgID, DN_BACKEND_BASE, de.getErrorMessage());
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_DECODE_BACKEND_BASE_DN;
+ String message = getMessage(msgID, DN_BACKEND_BASE,
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ ConfigEntry baseEntry = null;
+ try
+ {
+ baseEntry = DirectoryServer.getConfigEntry(backendBaseDN);
+ }
+ catch (ConfigException ce)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
+ String message = getMessage(msgID, DN_BACKEND_BASE, ce.getMessage());
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_RETRIEVE_BACKEND_BASE_ENTRY;
+ String message = getMessage(msgID, DN_BACKEND_BASE,
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+
+ // Iterate through the immediate children, attempting to parse them as
+ // backends.
+ for (ConfigEntry configEntry : baseEntry.getChildren().values())
+ {
+ // Get the backend ID attribute from the entry. If there isn't one, then
+ // skip the entry.
+ String backendID = null;
+ try
+ {
+ int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BACKEND_ID;
+ StringConfigAttribute idStub =
+ new StringConfigAttribute(ATTR_BACKEND_ID, getMessage(msgID),
+ true, false, true);
+ StringConfigAttribute idAttr =
+ (StringConfigAttribute) configEntry.getConfigAttribute(idStub);
+ if (idAttr == null)
+ {
+ continue;
+ }
+ else
+ {
+ backendID = idAttr.activeValue();
+ }
+ }
+ catch (ConfigException ce)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
+ String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+ ce.getMessage());
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_ID;
+ String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+
+ // Get the backend class name attribute from the entry. If there isn't
+ // one, then just skip the entry.
+ String backendClassName = null;
+ try
+ {
+ int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_CLASS;
+ StringConfigAttribute classStub =
+ new StringConfigAttribute(ATTR_BACKEND_CLASS, getMessage(msgID),
+ true, false, false);
+ StringConfigAttribute classAttr =
+ (StringConfigAttribute) configEntry.getConfigAttribute(classStub);
+ if (classAttr == null)
+ {
+ continue;
+ }
+ else
+ {
+ backendClassName = classAttr.activeValue();
+ }
+ }
+ catch (ConfigException ce)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
+ String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+ ce.getMessage());
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BACKEND_CLASS;
+ String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ Class backendClass = null;
+ try
+ {
+ backendClass = Class.forName(backendClassName);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_LOAD_BACKEND_CLASS;
+ String message = getMessage(msgID, backendClassName,
+ String.valueOf(configEntry.getDN()),
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null);
+ }
+
+ Backend backend = null;
+ try
+ {
+ backend = (Backend) backendClass.newInstance();
+ backend.setBackendID(backendID);
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_INSTANTIATE_BACKEND_CLASS;
+ String message = getMessage(msgID, backendClassName,
+ String.valueOf(configEntry.getDN()),
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null); }
+
+
+ // Get the base DN attribute from the entry. If there isn't one, then
+ // just skip this entry.
+ List<DN> baseDNs = null;
+ try
+ {
+ int msgID = MSGID_CONFIG_BACKEND_ATTR_DESCRIPTION_BASE_DNS;
+ DNConfigAttribute baseDNStub =
+ new DNConfigAttribute(ATTR_BACKEND_BASE_DN, getMessage(msgID),
+ true, true, true);
+ DNConfigAttribute baseDNAttr =
+ (DNConfigAttribute) configEntry.getConfigAttribute(baseDNStub);
+ if (baseDNAttr == null)
+ {
+ msgID = MSGID_LDIFIMPORT_NO_BASES_FOR_BACKEND;
+ String message = getMessage(msgID,
+ String.valueOf(configEntry.getDN()));
+ throw new DirectoryException(
+ DirectoryServer.getServerErrorResultCode(), message,msgID, null);
+ }
+ else
+ {
+ baseDNs = baseDNAttr.activeValues();
+ }
+ }
+ catch (Exception e)
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_DETERMINE_BASES_FOR_BACKEND;
+ String message = getMessage(msgID, String.valueOf(configEntry.getDN()),
+ stackTraceToSingleLineString(e));
+ throw new DirectoryException(
+ ResultCode.OTHER, message, msgID, null); }
+
+
+ backendList.add(backend);
+ entryList.add(configEntry);
+ dnList.add(baseDNs);
+ }
+ return 0;
+ }
+
+ /**
+ * Initializes this domain from another source server.
+ *
+ * @param source The source from which to initialize
+ * @param initTask The task that launched the initialization
+ * and should be updated of its progress.
+ * @throws DirectoryException when an error occurs
+ */
+ public void initialize(short source, Task initTask)
+ throws DirectoryException
+ {
+ acquireIEContext();
+ ieContext.initializeTask = initTask;
+
+ InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
+ baseDN, serverId, source);
+
+ // Publish Init request msg
+ broker.publish(initializeMsg);
+
+ // .. we expect to receive entries or err after that
+ }
+
+ /**
+ * Verifies that the given string represents a valid source
+ * from which this server can be initialized.
+ * @param sourceString The string representaing the source
+ * @return The source as a short value
+ * @throws DirectoryException if the string is not valid
+ */
+ public short decodeSource(String sourceString)
+ throws DirectoryException
+ {
+ short source = 0;
+ Throwable cause = null;
+ try
+ {
+ source = Integer.decode(sourceString).shortValue();
+ if (source >= -1)
+ {
+ // TODO Verifies serverID is in the domain
+ // We shold check here that this is a server implied
+ // in the current domain.
+
+ log("Source decoded for import:" + source);
+ return source;
+ }
+ }
+ catch(Exception e)
+ {
+ cause = e;
+ }
+
+ ResultCode resultCode = ResultCode.OTHER;
+ int errorMessageID = MSGID_INVALID_IMPORT_SOURCE;
+ String message = getMessage(errorMessageID);
+
+ if (cause != null)
+ throw new DirectoryException(
+ resultCode, message, errorMessageID, cause);
+ else
+ throw new DirectoryException(
+ resultCode, message, errorMessageID);
+ }
+
+ /**
+ * Verifies that the given string represents a valid source
+ * from which this server can be initialized.
+ * @param targetString The string representing the source
+ * @return The source as a short value
+ * @throws DirectoryException if the string is not valid
+ */
+ public short decodeTarget(String targetString)
+ throws DirectoryException
+ {
+ short target = 0;
+ Throwable cause;
+ if (targetString.equalsIgnoreCase("all"))
+ {
+ return RoutableMessage.ALL_SERVERS;
+ }
+
+ // So should be a serverID
+ try
+ {
+ target = Integer.decode(targetString).shortValue();
+ if (target >= 0)
+ {
+ // FIXME Could we check now that it is a know server in the domain ?
+ }
+ return target;
+ }
+ catch(Exception e)
+ {
+ cause = e;
+ }
+ ResultCode resultCode = ResultCode.OTHER;
+ int errorMessageID = MSGID_INVALID_EXPORT_TARGET;
+ String message = getMessage(errorMessageID);
+
+ if (cause != null)
+ throw new DirectoryException(
+ resultCode, message, errorMessageID, cause);
+ else
+ throw new DirectoryException(
+ resultCode, message, errorMessageID);
+
+ }
+
+ private synchronized void acquireIEContext()
+ throws DirectoryException
+ {
+ if (ieContext != null)
+ {
+ // Rejects 2 simultaneous exports
+ int msgID = MSGID_SIMULTANEOUS_IMPORT_EXPORT_REJECTED;
+ String message = getMessage(msgID);
+ throw new DirectoryException(ResultCode.OTHER,
+ message, msgID);
+ }
+
+ ieContext = new IEContext();
+ }
+
+ private synchronized void releaseIEContext()
+ {
+ ieContext = null;
+ }
+
+ /**
+ * Process the initialization of some other server or servers in the topology
+ * specified by the target argument.
+ * @param target The target that should be initialized
+ * @param initTask The task that triggers this initialization and that should
+ * be updated with its progress.
+ * @exception DirectoryException When an error occurs.
+ */
+ public void initializeTarget(short target, Task initTask)
+ throws DirectoryException
+ {
+ initializeTarget(target, serverId, initTask);
+ }
+
+ /**
+ * Process the initialization of some other server or servers in the topology
+ * specified by the target argument when this initialization has been
+ * initiated by another server than this one.
+ * @param target The target that should be initialized.
+ * @param requestorID The server that initiated the export.
+ * @param initTask The task that triggers this initialization and that should
+ * be updated with its progress.
+ * @exception DirectoryException When an error occurs.
+ */
+ public void initializeTarget(short target, short requestorID, Task initTask)
+ throws DirectoryException
+ {
+ acquireIEContext();
+
+ ieContext.exportTarget = target;
+ ieContext.initializeTask = initTask;
+ ieContext.initTaskCounters(backend.getEntryCount());
+
+ // Send start message
+ InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
+ baseDN, serverId, ieContext.exportTarget, requestorID,
+ ieContext.entryLeftCount);
+
+ log("SD : publishes " + initializeMessage +
+ " for #entries=" + ieContext.entryCount);
+
+ broker.publish(initializeMessage);
+
+ // make an export and send entries
+ exportBackend();
+
+ // Successfull termnation
+ DoneMessage doneMsg = new DoneMessage(serverId,
+ initializeMessage.getDestination());
+ broker.publish(doneMsg);
+
+ if (ieContext != null)
+ {
+ ieContext.updateTaskCompletionState();
+ ieContext = null;
+ }
+ }
+
+ /**
+ * Process backend before import.
+ * @param backend The backend.
+ * @param backendConfigEntry The config entry of the backend.
+ * @throws Exception
+ */
+ private void preBackendImport(Backend backend,
+ ConfigEntry backendConfigEntry)
+ throws Exception
+ {
+ // Stop saving state
+ stateSavingDisabled = true;
+
+ // Clear the backend
+ clearJEBackend(false,backend.getBackendID(),null);
+
+ // FIXME setBackendEnabled should be part of TaskUtils ?
+ TaskUtils.setBackendEnabled(backendConfigEntry, false);
+
+ // Acquire an exclusive lock for the backend.
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
+ if (! LockFileManager.acquireExclusiveLock(lockFile, failureReason))
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_LOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID(),
+ String.valueOf(failureReason));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ throw new DirectoryException(ResultCode.OTHER, message, msgID);
+ }
+ }
+
+ /**
+ * Initializes the domain's backend with received entries.
+ * @param initializeMessage The message that initiated the import.
+ * @exception DirectoryException Thrown when an error occurs.
+ */
+ protected void importBackend(InitializeTargetMessage initializeMessage)
+ throws DirectoryException
+ {
+ LDIFImportConfig importConfig = null;
+ try
+ {
+ log("startImport");
+
+ if (initializeMessage.getRequestorID() == serverId)
+ {
+ // The import responds to a request we did so the IEContext
+ // is already acquired
+ }
+ else
+ {
+ acquireIEContext();
+ }
+
+ ieContext.importSource = initializeMessage.getsenderID();
+ ieContext.entryLeftCount = initializeMessage.getEntryCount();
+ ieContext.initTaskCounters(initializeMessage.getEntryCount());
+
+ preBackendImport(this.backend, this.backendConfigEntry);
+
+ DN[] baseDNs = {baseDN};
+ ieContext.ldifImportInputStream = new SynchroLDIFInputStream(this);
+ importConfig =
+ new LDIFImportConfig(ieContext.ldifImportInputStream);
+ importConfig.setIncludeBranches(this.branches);
+
+ // TODO How to deal with rejected entries during the import
+ // importConfig.writeRejectedEntries("rejectedImport",
+ // ExistingFileBehavior.OVERWRITE);
+
+ // Process import
+ this.backend.importLDIF(this.backendConfigEntry, baseDNs, importConfig);
+
+ stateSavingDisabled = false;
+
+ // Re-exchange state with SS
+ broker.stop();
+ broker.start(changelogServers);
+
+ }
+ catch(Exception e)
+ {
+ throw new DirectoryException(ResultCode.OTHER, e.getLocalizedMessage(),
+ 2);// FIXME
+ }
+ finally
+ {
+ // Cleanup
+ importConfig.close();
+
+ // Re-enable backend
+ closeBackendImport(this.backend, this.backendConfigEntry);
+
+ // Update the task that initiated the import
+ if ((ieContext != null ) && (ieContext.initializeTask != null))
+ {
+ ((InitializeTask)ieContext.initializeTask).
+ setState(ieContext.updateTaskCompletionState(),ieContext.exception);
+ }
+
+ releaseIEContext();
+
+ log("End importBackend");
+ }
+ // Success
+ }
+
+ /**
+ * Make post import operations.
+ * @param backend The backend implied in the import.
+ * @param backendConfigEntry The config entry of the backend.
+ * @exception DirectoryException Thrown when an error occurs.
+ */
+ protected void closeBackendImport(Backend backend,
+ ConfigEntry backendConfigEntry)
+ throws DirectoryException
+ {
+ String lockFile = LockFileManager.getBackendLockFileName(backend);
+ StringBuilder failureReason = new StringBuilder();
+
+ // Release lock
+ if (!LockFileManager.releaseLock(lockFile, failureReason))
+ {
+ int msgID = MSGID_LDIFIMPORT_CANNOT_UNLOCK_BACKEND;
+ String message = getMessage(msgID, backend.getBackendID(),
+ String.valueOf(failureReason));
+ logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ new DirectoryException(ResultCode.OTHER, message, msgID);
+ }
+
+ // FIXME setBackendEnabled should be part taskUtils ?
+ TaskUtils.setBackendEnabled(backendConfigEntry, true);
+ }
+
+ /**
+ * Retrieves a synchronization domain based on the baseDN.
+ *
+ * @param baseDN The baseDN of the domain to retrieve
+ * @return The domain retrieved
+ * @throws DirectoryException When an error occured.
+ */
+ public static SynchronizationDomain retrievesSynchronizationDomain(DN baseDN)
+ throws DirectoryException
+ {
+ SynchronizationDomain synchronizationDomain = null;
+
+ // Retrieves the domain
+ DirectoryServer.getSynchronizationProviders();
+ for (SynchronizationProvider provider :
+ DirectoryServer.getSynchronizationProviders())
+ {
+ if (!( provider instanceof MultimasterSynchronization))
+ {
+ int msgID = LogMessages.MSGID_INVALID_PROVIDER;
+ String message = getMessage(msgID);
+ throw new DirectoryException(ResultCode.OTHER,
+ message, msgID);
+ }
+
+ // From the domainDN retrieves the synchronization domain
+ SynchronizationDomain sdomain =
+ MultimasterSynchronization.findDomain(baseDN, null);
+ if (sdomain == null)
+ {
+ int msgID = LogMessages.MSGID_NO_MATCHING_DOMAIN;
+ String message = getMessage(msgID) + " " + baseDN;
+ throw new DirectoryException(ResultCode.OTHER,
+ message, msgID);
+ }
+
+ if (synchronizationDomain != null)
+ {
+ // Should never happen
+ int msgID = LogMessages.MSGID_MULTIPLE_MATCHING_DOMAIN;
+ String message = getMessage(msgID);
+ throw new DirectoryException(ResultCode.OTHER,
+ message, msgID);
+ }
+ synchronizationDomain = sdomain;
+ }
+ return synchronizationDomain;
+ }
+
+ /**
+ * Returns the backend associated to this domain.
+ * @return The associated backend.
+ */
+ public Backend getBackend()
+ {
+ return backend;
+ }
+
+ /**
+ * Returns a boolean indiciating if an import or export is currently
+ * processed.
+ * @return The status
+ */
+ public boolean ieRunning()
+ {
+ return (ieContext != null);
+ }
+ /*
+ * <<Total Update
+ */
+
+
/**
* Push the modifications contain the in given parameter has
* a modification that would happen on a local server.
--
Gitblit v1.10.0