opends/src/messages/messages/replication.properties
@@ -80,7 +80,7 @@ changes that this server has already processed on suffix %s NOTICE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER_19=More than one replication \ server should be configured NOTICE_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \ SEVERE_ERR_EXCEPTION_STARTING_SESSION_20=Caught Exception during initial \ communication on domain %s with replication server %s : %s MILD_ERR_CANNOT_RECOVER_CHANGES_21=Error when searching old changes from the \ database for base DN %s @@ -224,4 +224,9 @@ SEVERE_ERR_ERROR_CLEARING_DB_87=While clearing the database %s , the following \ error happened: %s NOTICE_ERR_ROUTING_TO_SERVER_88=Protocol error : a replication server is not expected \ to be the destination of a message of type %s to be the destination of a message of type %s SEVERE_ERR_CHECK_CREATE_REPL_BACKEND_FAILED_89=An unexpected error occured when \ testing existence or creating the replication backend : %s SEVERE_ERR_DELETE_REPL_BACKEND_FAILED_90=An unexpected error occured when \ deleting the replication backend : %s opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -26,10 +26,14 @@ */ package org.opends.server.replication.plugin; import static org.opends.server.replication.plugin. ReplicationRepairRequestControl.*; import java.util.HashMap; import java.util.List; import java.util.Map; import org.opends.messages.Message; import org.opends.server.admin.server.ConfigurationAddListener; import org.opends.server.admin.server.ConfigurationDeleteListener; import org.opends.server.admin.std.server.MultimasterDomainCfg; @@ -42,7 +46,6 @@ import org.opends.server.api.SynchronizationProvider; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.types.BackupConfig; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.Control; @@ -66,10 +69,6 @@ import org.opends.server.types.operation.PreOperationDeleteOperation; import org.opends.server.types.operation.PreOperationModifyDNOperation; import org.opends.server.types.operation.PreOperationModifyOperation; import org.opends.messages.Message; import static org.opends.server.replication.plugin. ReplicationRepairRequestControl.*; /** * This class is used to load the Replication code inside the JVM @@ -86,7 +85,7 @@ BackupTaskListener, RestoreTaskListener, ImportTaskListener, ExportTaskListener { private ReplicationServerListener replicationServer = null; private ReplicationServerListener replicationServerListener = null; private static Map<DN, ReplicationDomain> domains = new HashMap<DN, ReplicationDomain>() ; @@ -193,7 +192,7 @@ MultimasterSynchronizationProviderCfg configuration) throws ConfigException { replicationServer = new ReplicationServerListener(configuration); replicationServerListener = new ReplicationServerListener(configuration); // Register as an add and delete listener with the root configuration so we // can be notified if Multimaster domain entries are added or removed. @@ -438,8 +437,8 @@ } // shutdown the ReplicationServer Service if necessary if (replicationServer != null) replicationServer.shutdown(); if (replicationServerListener != null) replicationServerListener.shutdown(); DirectoryServer.deregisterBackupTaskListener(this); DirectoryServer.deregisterRestoreTaskListener(this); opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -392,7 +392,7 @@ } catch (Exception e) { Message message = NOTE_EXCEPTION_STARTING_SESSION.get( Message message = ERR_EXCEPTION_STARTING_SESSION.get( baseDn.toNormalizedString(), server, e.getLocalizedMessage() + stackTraceToSingleLineString(e)); logError(message); @@ -733,7 +733,7 @@ if (debugEnabled()) { debugInfo("ReplicationBroker is stopping. and will" + "close the connection"); " close the connection"); } if (session != null) opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -212,9 +212,6 @@ // Null when none is being processed. private IEContext ieContext = null; // The backend information necessary to make an import or export. private Backend backend; private int listenerThreadNumber = 10; private Collection<String> replicationServers; @@ -383,7 +380,7 @@ monitor = new ReplicationMonitor(this); DirectoryServer.registerMonitorProvider(monitor); backend = retrievesBackend(baseDN); Backend backend = retrievesBackend(baseDN); if (backend == null) { throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get( @@ -855,8 +852,6 @@ de.getMessageObject()); MessageBuilder mb = new MessageBuilder(); mb.append(de.getMessageObject()); mb.append("Backend ID: "); mb.append(backend.getBackendID()); TRACER.debugInfo(Message.toString(mb.toMessage())); broker.publish(errorMsg); } @@ -2224,10 +2219,8 @@ */ public long computeGenerationId() throws DirectoryException { Backend backend = this.retrievesBackend(baseDN); long bec = backend.getEntryCount(); if (bec<0) backend = this.retrievesBackend(baseDN); bec = backend.getEntryCount(); this.acquireIEContext(); ieContext.checksumOutput = true; ieContext.entryCount = (bec<1000?bec:1000); @@ -2598,9 +2591,7 @@ protected void exportBackend() throws DirectoryException { // FIXME Temporary workaround - will probably be fixed when implementing // dynamic config backend = retrievesBackend(this.baseDN); Backend backend = retrievesBackend(this.baseDN); // Acquire a shared lock for the backend. try @@ -2938,9 +2929,7 @@ { try { // FIXME Temporary workaround - will probably be fixed when implementing // dynamic config backend = retrievesBackend(this.baseDN); Backend backend = retrievesBackend(this.baseDN); if (!backend.supportsLDIFExport()) { @@ -3027,6 +3016,8 @@ LDIFImportConfig importConfig = null; DirectoryException de = null; Backend backend = this.retrievesBackend(baseDN); if (!backend.supportsLDIFImport()) { Message message = ERR_INIT_IMPORT_NOT_SUPPORTED.get( @@ -3051,7 +3042,7 @@ ieContext.entryLeftCount = initializeMessage.getEntryCount(); ieContext.initImportExportCounters(initializeMessage.getEntryCount()); preBackendImport(this.backend); preBackendImport(backend); ieContext.ldifImportInputStream = new ReplLDIFInputStream(this); importConfig = @@ -3066,7 +3057,7 @@ // ExistingFileBehavior.OVERWRITE); // Process import this.backend.importLDIF(importConfig); backend.importLDIF(importConfig); TRACER.debugInfo("The import has ended successfully."); stateSavingDisabled = false; @@ -3083,7 +3074,7 @@ importConfig.close(); // Re-enable backend closeBackendImport(this.backend); closeBackendImport(backend); // Update the task that initiated the import if ((ieContext != null ) && (ieContext.initializeTask != null)) @@ -3202,7 +3193,7 @@ */ public Backend getBackend() { return backend; return retrievesBackend(baseDN); } /** opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
@@ -25,6 +25,7 @@ * Portions Copyright 2007 Sun Microsystems, Inc. */ package org.opends.server.replication.plugin; import org.opends.messages.Message; import java.util.List; @@ -105,7 +106,7 @@ } /** * Shutdown the Replication servers. * Shutdown the replication server. */ public void shutdown() { @@ -123,7 +124,7 @@ // replicationServer currently configured. if (replicationServer != null) { replicationServer.shutdown(); replicationServer.remove(); } return new ConfigChangeResult(ResultCode.SUCCESS, false); } opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
New file @@ -0,0 +1,493 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import static org.opends.messages.BackendMessages.*; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.getExceptionMessage; import java.util.HashMap; import java.util.HashSet; import org.opends.messages.Message; import org.opends.server.admin.Configuration; import org.opends.server.admin.std.server.BackendCfg; import org.opends.server.admin.std.server.JEBackendCfg; import org.opends.server.api.Backend; import org.opends.server.backends.jeb.BackupManager; import org.opends.server.config.ConfigException; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyDNOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.core.SearchOperation; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.types.BackupConfig; import org.opends.server.types.BackupDirectory; import org.opends.server.types.ConditionResult; import org.opends.server.types.DN; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.InitializationException; import org.opends.server.types.LDIFExportConfig; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.LDIFImportResult; import org.opends.server.types.RestoreConfig; import org.opends.server.types.ResultCode; import org.opends.server.util.Validator; /** * This class defines a backend that stores its information in an * associated replication server object. * This is primarily intended to take advantage of the backup/restore/ * import/export of the backend API, and to provide an LDAP access * to the replication server database. * <BR><BR> * Entries stored in this backend are held in the DB associated with * the replication server. * <BR><BR> * Currently are only implemented the create and restore backup features. * */ public class ReplicationBackend extends Backend { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); // The base DNs for this backend. private DN[] baseDNs; // The mapping between parent DNs and their immediate children. private HashMap<DN,HashSet<DN>> childDNs; // The base DNs for this backend, in a hash set. private HashSet<DN> baseDNSet; // The set of supported controls for this backend. private HashSet<String> supportedControls; // The set of supported features for this backend. private HashSet<String> supportedFeatures; // The directory associated with this backend. private BackupDirectory backendDirectory; ReplicationServer server; /** * The configuration of this backend. */ private JEBackendCfg cfg; /** * Creates a new backend with the provided information. All backend * implementations must implement a default constructor that use * <CODE>super()</CODE> to invoke this constructor. */ public ReplicationBackend() { super(); // Perform all initialization in initializeBackend. } /** * Set the base DNs for this backend. This is used by the unit tests * to set the base DNs without having to provide a configuration * object when initializing the backend. * @param baseDNs The set of base DNs to be served by this memory backend. */ public void setBaseDNs(DN[] baseDNs) { this.baseDNs = baseDNs; } /** * {@inheritDoc} */ public void configureBackend(Configuration config) throws ConfigException { if (config != null) { Validator.ensureTrue(config instanceof BackendCfg); cfg = (JEBackendCfg)config; DN[] baseDNs = new DN[cfg.getBackendBaseDN().size()]; cfg.getBackendBaseDN().toArray(baseDNs); setBaseDNs(baseDNs); backendDirectory = new BackupDirectory( cfg.getBackendDirectory(), null); } } /** * {@inheritDoc} */ public synchronized void initializeBackend() throws ConfigException, InitializationException { if ((baseDNs == null) || (baseDNs.length != 1)) { Message message = ERR_MEMORYBACKEND_REQUIRE_EXACTLY_ONE_BASE.get(); throw new ConfigException(message); } baseDNSet = new HashSet<DN>(); for (DN dn : baseDNs) { baseDNSet.add(dn); } childDNs = new HashMap<DN,HashSet<DN>>(); supportedControls = new HashSet<String>(); supportedFeatures = new HashSet<String>(); for (DN dn : baseDNs) { try { DirectoryServer.registerBaseDN(dn, this, false, false); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } Message message = ERR_BACKEND_CANNOT_REGISTER_BASEDN.get( dn.toString(), getExceptionMessage(e)); throw new InitializationException(message, e); } } } /** * Removes any data that may have been stored in this backend. */ public synchronized void clearMemoryBackend() { childDNs.clear(); } /** * {@inheritDoc} */ public synchronized void finalizeBackend() { for (DN dn : baseDNs) { try { DirectoryServer.deregisterBaseDN(dn, false); } catch (Exception e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } } } /** * {@inheritDoc} */ public DN[] getBaseDNs() { return baseDNs; } /** * {@inheritDoc} */ public synchronized long getEntryCount() { return -1; } /** * {@inheritDoc} */ public boolean isLocal() { return true; } /** * {@inheritDoc} */ public synchronized Entry getEntry(DN entryDN) { return null; } /** * {@inheritDoc} */ public synchronized boolean entryExists(DN entryDN) { return false; } /** * {@inheritDoc} */ public synchronized void addEntry(Entry entry, AddOperation addOperation) throws DirectoryException { Message message = ERR_BACKUP_ADD_NOT_SUPPORTED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * {@inheritDoc} */ public synchronized void deleteEntry(DN entryDN, DeleteOperation deleteOperation) throws DirectoryException { Message message = ERR_BACKUP_DELETE_NOT_SUPPORTED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * {@inheritDoc} */ public synchronized void replaceEntry(Entry entry, ModifyOperation modifyOperation) throws DirectoryException { Message message = ERR_BACKUP_MODIFY_NOT_SUPPORTED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * {@inheritDoc} */ public synchronized void renameEntry(DN currentDN, Entry entry, ModifyDNOperation modifyDNOperation) throws DirectoryException { Message message = ERR_BACKUP_MODIFY_DN_NOT_SUPPORTED.get(); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * {@inheritDoc} */ public synchronized void search(SearchOperation searchOperation) throws DirectoryException { DN matchedDN = baseDNs[0]; DN baseDN = searchOperation.getBaseDN(); // FIXME Remove this error message or replace when implementing // the search. Message message = ERR_MEMORYBACKEND_ENTRY_DOESNT_EXIST.get(String.valueOf(baseDN)); throw new DirectoryException( ResultCode.NO_SUCH_OBJECT, message, matchedDN, null); } /** * {@inheritDoc} */ public HashSet<String> getSupportedControls() { return supportedControls; } /** * {@inheritDoc} */ public HashSet<String> getSupportedFeatures() { return supportedFeatures; } /** * {@inheritDoc} */ public boolean supportsLDIFExport() { return false; } /** * {@inheritDoc} */ public synchronized void exportLDIF(LDIFExportConfig exportConfig) throws DirectoryException { // TODO } /** * {@inheritDoc} */ public boolean supportsLDIFImport() { return false; } /** * {@inheritDoc} */ public synchronized LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws DirectoryException { return new LDIFImportResult(0, 0, 0); } /** * {@inheritDoc} */ public boolean supportsBackup() { // This backend does not provide a backup/restore mechanism. return true; } /** * {@inheritDoc} */ public boolean supportsBackup(BackupConfig backupConfig, StringBuilder unsupportedReason) { return true; } /** * {@inheritDoc} */ public void createBackup(BackupConfig backupConfig) throws DirectoryException { BackupManager backupManager = new BackupManager(getBackendID()); backupManager.createBackup(cfg, backupConfig); } /** * {@inheritDoc} */ public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException { BackupManager backupManager = new BackupManager(getBackendID()); backupManager.removeBackup(this.backendDirectory, backupID); } /** * {@inheritDoc} */ public boolean supportsRestore() { return true; } /** * {@inheritDoc} */ public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException { BackupManager backupManager = new BackupManager(getBackendID()); backupManager.restoreBackup(cfg, restoreConfig); } /** * Retrieves the number of subordinates for the requested entry. * * @param entryDN The distinguished name of the entry. * * @return The number of subordinate entries for the requested entry * or -1 if it can not be determined. * * @throws DirectoryException If a problem occurs while trying to * retrieve the entry. */ public long numSubordinates(DN entryDN) throws DirectoryException { Message message = WARN_ROOTDSE_GET_ENTRY_NONROOT. get(entryDN.toNormalizedString()); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * Indicates whether the requested entry has any subordinates. * * @param entryDN The distinguished name of the entry. * * @return {@code ConditionResult.TRUE} if the entry has one or more * subordinates or {@code ConditionResult.FALSE} otherwise * or {@code ConditionResult.UNDEFINED} if it can not be * determined. * * @throws DirectoryException If a problem occurs while trying to * retrieve the entry. */ public ConditionResult hasSubordinates(DN entryDN) throws DirectoryException { Message message = WARN_ROOTDSE_GET_ENTRY_NONROOT. get(entryDN.toNormalizedString()); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, message); } /** * Set the replication server associated with this backend. * @param server The replication server. */ public void setServer(ReplicationServer server) { this.server = server; } } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -25,16 +25,16 @@ * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.replication.server; import org.opends.messages.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.messages.ReplicationMessages.*; import org.opends.messages.MessageBuilder; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.ServerConstants.EOL; import static org.opends.server.util.StaticUtils.getFileForPath; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import java.io.File; import java.io.IOException; import java.io.StringReader; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -47,22 +47,35 @@ import java.util.List; import java.util.Set; import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.admin.std.server.ReplicationServerCfg; import org.opends.server.api.Backend; import org.opends.server.api.BackupTaskListener; import org.opends.server.api.ExportTaskListener; import org.opends.server.api.ImportTaskListener; import org.opends.server.api.MonitorProvider; import org.opends.server.api.RestoreTaskListener; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.loggers.LogLevel; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.protocol.ProtocolSession; import org.opends.server.replication.protocol.ReplSessionSecurity; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeType; import org.opends.server.types.AttributeValue; import org.opends.server.types.BackupConfig; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.DN; import org.opends.server.types.Entry; import org.opends.server.types.LDIFExportConfig; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.RestoreConfig; import org.opends.server.types.ResultCode; import static org.opends.server.loggers.debug.DebugLogger.*; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.util.LDIFReader; import com.sleepycat.je.DatabaseException; @@ -77,7 +90,9 @@ * It is responsible for creating the replication server cache and managing it */ public class ReplicationServer extends MonitorProvider<MonitorProviderCfg> implements Runnable, ConfigurationChangeListener<ReplicationServerCfg> implements Runnable, ConfigurationChangeListener<ReplicationServerCfg>, BackupTaskListener, RestoreTaskListener, ImportTaskListener, ExportTaskListener { private short serverId; private String serverURL; @@ -108,6 +123,12 @@ private boolean stopListen = false; private ReplSessionSecurity replSessionSecurity; // For the backend associated to this replication server, // DN of the config entry of the backend private DN backendConfigEntryDN; // ID of the backend private static final String backendId = "replicationChanges"; /** * The tracer object for the debug logger. */ @@ -120,11 +141,10 @@ * @throws ConfigException When Configuration is invalid. */ public ReplicationServer(ReplicationServerCfg configuration) throws ConfigException throws ConfigException { super("Replication Server" + configuration.getReplicationPort()); shutdown = false; replicationPort = configuration.getReplicationPort(); replicationServerId = (short) configuration.getReplicationServerId(); replicationServers = configuration.getReplicationServer(); @@ -162,6 +182,21 @@ initialize(replicationServerId, replicationPort); configuration.addChangeListener(this); DirectoryServer.registerMonitorProvider(this); try { backendConfigEntryDN = DN.decode( "ds-cfg-backend-id=" + backendId + ",cn=Backends,cn=config"); } catch (Exception e) {} // Creates the backend associated to this ReplicationServer // if it does not exist. createBackend(); DirectoryServer.registerBackupTaskListener(this); DirectoryServer.registerRestoreTaskListener(this); DirectoryServer.registerExportTaskListener(this); DirectoryServer.registerImportTaskListener(this); } @@ -315,6 +350,8 @@ */ private void initialize(short changelogId, int changelogPort) { shutdown = false; try { /* @@ -458,7 +495,7 @@ dbEnv.shutdown(); } DirectoryServer.deregisterMonitorProvider(getMonitorInstanceName()); } } /** @@ -492,10 +529,7 @@ } catch(Exception e) { TRACER.debugInfo( "In RS <" + getMonitorInstanceName() + " Exception in clearGenerationId" + stackTraceToSingleLineString(e) + e.getLocalizedMessage()); TRACER.debugCaught(LogLevel.ALL, e); } } @@ -730,4 +764,180 @@ { return serverId; } /** * Creates the backend associated to this replication server. * @throws ConfigException */ private void createBackend() throws ConfigException { try { String ldif = makeLdif( "dn: ds-cfg-backend-id="+backendId+",cn=Backends,cn=config", "objectClass: top", "objectClass: ds-cfg-backend", "objectClass: ds-cfg-je-backend", "ds-cfg-backend-base-dn: dc="+backendId, "ds-cfg-backend-enabled: true", "ds-cfg-backend-writability-mode: enabled", "ds-cfg-backend-class: " + "org.opends.server.replication.server.ReplicationBackend", "ds-cfg-backend-id: " + backendId, "ds-cfg-backend-import-temp-directory: importTmp", "ds-cfg-backend-directory: " + getFileForPath(dbDirname)); LDIFImportConfig ldifImportConfig = new LDIFImportConfig( new StringReader(ldif)); LDIFReader reader = new LDIFReader(ldifImportConfig); Entry backendConfigEntry = reader.readEntry(); if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN)) { // Add the replication backend DirectoryServer.getConfigHandler().addEntry(backendConfigEntry, null); } } catch(Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(e.getLocalizedMessage()); Message msg = ERR_CHECK_CREATE_REPL_BACKEND_FAILED.get(mb.toString()); throw new ConfigException(msg, e); } } private static String makeLdif(String... lines) { StringBuilder buffer = new StringBuilder(); for (String line : lines) { buffer.append(line).append(EOL); } // Append an extra line so we can append LDIF Strings. buffer.append(EOL); return buffer.toString(); } /** * Do what needed when the config object related to this replication server * is deleted from the server configuration. */ public void remove() { if (debugEnabled()) TRACER.debugInfo("RS " +getMonitorInstanceName()+ " starts removing"); shutdown(); removeBackend(); DirectoryServer.deregisterBackupTaskListener(this); DirectoryServer.deregisterRestoreTaskListener(this); DirectoryServer.deregisterExportTaskListener(this); DirectoryServer.deregisterImportTaskListener(this); } /** * Removes the backend associated to this Replication Server that has been * created when this replication server was created. */ protected void removeBackend() { try { if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN)) { // Delete the replication backend DirectoryServer.getConfigHandler().deleteEntry(backendConfigEntryDN, null); } } catch(Exception e) { MessageBuilder mb = new MessageBuilder(); mb.append(e.getLocalizedMessage()); Message msg = ERR_DELETE_REPL_BACKEND_FAILED.get(mb.toString()); logError(msg); } } /** * {@inheritDoc} */ public void processBackupBegin(Backend backend, BackupConfig config) { // Nothing is needed at the moment } /** * {@inheritDoc} */ public void processBackupEnd(Backend backend, BackupConfig config, boolean successful) { // Nothing is needed at the moment } /** * {@inheritDoc} */ public void processRestoreBegin(Backend backend, RestoreConfig config) { if (backend.getBackendID().equals(backendId)) shutdown(); } /** * {@inheritDoc} */ public void processRestoreEnd(Backend backend, RestoreConfig config, boolean successful) { if (backend.getBackendID().equals(backendId)) initialize(this.replicationServerId, this.replicationPort); } /** * {@inheritDoc} */ public void processImportBegin(Backend backend, LDIFImportConfig config) { // Nothing is needed at the moment } /** * {@inheritDoc} */ public void processImportEnd(Backend backend, LDIFImportConfig config, boolean successful) { // Nothing is needed at the moment } /** * {@inheritDoc} */ public void processExportBegin(Backend backend, LDIFExportConfig config) { if (debugEnabled()) TRACER.debugInfo("RS " +getMonitorInstanceName()+ " Export starts"); if (backend.getBackendID().equals(backendId)) { // Retrieves the backend related to this domain // backend = ReplicationBackend b = (ReplicationBackend)DirectoryServer.getBackend(backendId); b.setServer(this); } } /** * {@inheritDoc} */ public void processExportEnd(Backend backend, LDIFExportConfig config, boolean successful) { // Nothing is needed at the moment } } opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -100,7 +100,7 @@ if (debugEnabled()) { TRACER.debugInfo( "In RS <" + replicationCache.getReplicationServer(). "In RS " + replicationCache.getReplicationServer(). getMonitorInstanceName() + (handler.isReplicationServer()?" RS ":" LS")+ " reader starting for serverId=" + serverId); @@ -117,22 +117,11 @@ if (debugEnabled()) { if (handler.isReplicationServer()) { TRACER.debugInfo( "In RS <" + replicationCache.getReplicationServer(). getMonitorInstanceName() + "> from RS server with serverId=" + serverId + " receives " + msg); } else { TRACER.debugInfo( "In RS <" + replicationCache.getReplicationServer(). getMonitorInstanceName() + "> from LDAP server with serverId=" + serverId + " receives " + msg); } TRACER.debugInfo( "In RS " + replicationCache.getReplicationServer(). getMonitorInstanceName() + (handler.isReplicationServer()?" From RS ":" From LS")+ " with serverId=" + serverId + " receives " + msg); } if (msg instanceof AckMessage) { @@ -271,11 +260,10 @@ */ if (debugEnabled()) TRACER.debugInfo( "In RS <" + replicationCache.getReplicationServer(). "In RS " + replicationCache.getReplicationServer(). getMonitorInstanceName() + " reader IO EXCEPTION serverID=" + serverId + stackTraceToSingleLineString(e) + e.getLocalizedMessage() + e.getCause()); " reader IO EXCEPTION for serverID=" + serverId + stackTraceToSingleLineString(e) + " " + e.getLocalizedMessage()); Message message = NOTE_SERVER_DISCONNECT.get(handler.toString()); logError(message); } catch (ClassNotFoundException e) @@ -316,10 +304,10 @@ */ if (debugEnabled()) TRACER.debugInfo( "In RS <" + replicationCache.getReplicationServer(). "In RS " + replicationCache.getReplicationServer(). getMonitorInstanceName() + " reader CLOSE serverID=" + serverId + stackTraceToSingleLineString(new Exception())); " server reader for serverID=" + serverId + " is closing the session"); try { session.close(); @@ -331,10 +319,9 @@ } if (debugEnabled()) TRACER.debugInfo( "In RS <" + replicationCache.getReplicationServer(). "In RS " + replicationCache.getReplicationServer(). getMonitorInstanceName() + (handler.isReplicationServer()?"RS":"LDAP") + " server reader stopped for serverID=" + serverId + stackTraceToSingleLineString(new Exception())); (handler.isReplicationServer()?" RS":" LDAP") + " server reader stopped for serverID=" + serverId); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -300,186 +300,6 @@ return found; } /** * Add a task to the configuration of the current running DS. * @param taskEntry The task to add. * @param expectedResult The expected result code for the ADD. * @param errorMessageID The expected error messageID when the expected * result code is not SUCCESS */ private void addTask(Entry taskEntry, ResultCode expectedResult, Message errorMessage) { try { debugInfo("AddTask/" + taskEntry); // Change config of DS to launch the total update task InternalClientConnection connection = InternalClientConnection.getRootConnection(); // Add the task. AddOperation addOperation = connection.processAdd(taskEntry.getDN(), taskEntry.getObjectClasses(), taskEntry.getUserAttributes(), taskEntry.getOperationalAttributes()); assertEquals(addOperation.getResultCode(), expectedResult, "Result of ADD operation of the task is: " + addOperation.getResultCode() + " Expected:" + expectedResult + " Details:" + addOperation.getErrorMessage() + addOperation.getAdditionalLogMessage()); if (expectedResult != ResultCode.SUCCESS) { assertTrue(addOperation.getErrorMessage().toString(). startsWith(errorMessage.toString()), "Error MsgID of the task <" + addOperation.getErrorMessage() + "> equals <" + errorMessage + ">"); debugInfo("Create config task: <"+ errorMessage.getDescriptor().getId() + addOperation.getErrorMessage() + ">"); } else { waitTaskState(taskEntry, TaskState.RUNNING, null); } // Entry will be removed at the end of the test entryList.addLast(taskEntry.getDN()); debugInfo("AddedTask/" + taskEntry.getDN()); } catch(Exception e) { fail("Exception when adding task:"+ e.getMessage()); } } private void waitTaskState(Entry taskEntry, TaskState expectedTaskState, Message expectedMessage) { TaskState taskState = null; try { SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)"); Entry resultEntry = null; do { InternalSearchOperation searchOperation = connection.processSearch(taskEntry.getDN(), SearchScope.BASE_OBJECT, filter); try { resultEntry = searchOperation.getSearchEntries().getFirst(); } catch (Exception e) { fail("Task entry was not returned from the search."); continue; } try { // Check that the task state is as expected. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); String stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); taskState = TaskState.fromString(stateString); } catch(Exception e) { fail("Exception"+ e.getMessage()+e.getStackTrace()); } Thread.sleep(500); } while ((taskState != expectedTaskState) && (taskState != TaskState.STOPPED_BY_ERROR)); // Check that the task contains some log messages. AttributeType logMessagesType = DirectoryServer.getAttributeType( ATTR_TASK_LOG_MESSAGES.toLowerCase()); ArrayList<String> logMessages = new ArrayList<String>(); resultEntry.getAttributeValues(logMessagesType, DirectoryStringSyntax.DECODER, logMessages); if ((taskState != TaskState.COMPLETED_SUCCESSFULLY) && (taskState != TaskState.RUNNING)) { if (logMessages.size() == 0) { fail("No log messages were written to the task entry on a failed task"); } else { if (expectedMessage != null) { debugInfo(logMessages.get(0)); debugInfo(expectedMessage.toString()); assertTrue(logMessages.get(0).indexOf( expectedMessage.toString())>0); } } } assertEquals(taskState, expectedTaskState, "Task State:" + taskState + " Expected task state:" + expectedTaskState); } catch(Exception e) { fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } /** * Add to the current DB the entries necessary to the test */ private void addTestEntriesToDB(String[] ldifEntries) { try { // Change config of DS to launch the total update task InternalClientConnection connection = InternalClientConnection.getRootConnection(); for (String ldifEntry : ldifEntries) { Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry); AddOperationBasis addOp = new AddOperationBasis( connection, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), null, entry.getDN(), entry.getObjectClasses(), entry.getUserAttributes(), entry.getOperationalAttributes()); addOp.setInternalOperation(true); addOp.run(); if (addOp.getResultCode() != ResultCode.SUCCESS) { debugInfo("addEntry: Failed" + addOp.getResultCode()); } // They will be removed at the end of the test entryList.addLast(entry.getDN()); } } catch(Exception e) { fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } /* * Creates entries necessary to the test. */ @@ -1373,11 +1193,11 @@ broker3 = null; if (replServer1 != null) replServer1.shutdown(); replServer1.remove(); if (replServer2 != null) replServer2.shutdown(); if (replServer2 != null) replServer2.shutdown(); replServer2.remove(); if (replServer3 != null) replServer3.remove(); replServer1 = null; replServer2 = null; replServer3 = null; opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -278,67 +278,6 @@ } /** * Add a task to the configuration of the current running DS. * @param taskEntry The task to add. * @param expectedResult The expected result code for the ADD. * @param errorMessage The expected error messageID when the expected * result code is not SUCCESS */ private void addTask(Entry taskEntry, ResultCode expectedResult, Message errorMessage) { try { log("AddTask/" + taskEntry); // Change config of DS to launch the total update task InternalClientConnection connection = InternalClientConnection.getRootConnection(); // Add the task. AddOperation addOperation = connection.processAdd(taskEntry.getDN(), taskEntry.getObjectClasses(), taskEntry.getUserAttributes(), taskEntry.getOperationalAttributes()); assertEquals(addOperation.getResultCode(), expectedResult, "Result of ADD operation of the task is: " + addOperation.getResultCode() + " Expected:" + expectedResult + " Details:" + addOperation.getErrorMessage() + addOperation.getAdditionalLogMessage()); if (expectedResult != ResultCode.SUCCESS) { assertTrue(addOperation.getErrorMessage().toString(). startsWith(errorMessage.toString()), "Error MsgID of the task <" + addOperation.getErrorMessage() + "> equals <" + errorMessage + ">"); log("Create config task: <"+ errorMessage.getDescriptor().getId() + addOperation.getErrorMessage() + ">"); } else { waitTaskState(taskEntry, TaskState.RUNNING, null); } // Entry will be removed at the end of the test entryList.addLast(taskEntry.getDN()); log("AddedTask/" + taskEntry.getDN()); } catch(Exception e) { fail("Exception when adding task:"+ e.getMessage()); } } /** * Wait a task to be completed and check the expected state and expected * stats. * @param taskEntry The task to process. @@ -454,107 +393,6 @@ } } private void waitTaskState(Entry taskEntry, TaskState expectedTaskState, Message expectedMessage) { TaskState taskState = null; try { SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)"); Entry resultEntry = null; do { InternalSearchOperation searchOperation = connection.processSearch(taskEntry.getDN(), SearchScope.BASE_OBJECT, filter); try { resultEntry = searchOperation.getSearchEntries().getFirst(); } catch (Exception e) { // FIXME How is this possible? Must be issue 858. fail("Task entry was not returned from the search."); continue; } try { // Check that the task state is as expected. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); String stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); taskState = TaskState.fromString(stateString); } catch(Exception e) { fail("Exception"+ e.getMessage()+e.getStackTrace()); } try { // Check that the left counter. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true); resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); // Check that the total counter. taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true); resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); } catch(Exception e) { fail("Exception"+ e.getMessage()+e.getStackTrace()); } Thread.sleep(2000); } while ((taskState != expectedTaskState) && (taskState != TaskState.STOPPED_BY_ERROR)); // Check that the task contains some log messages. AttributeType logMessagesType = DirectoryServer.getAttributeType( ATTR_TASK_LOG_MESSAGES.toLowerCase()); ArrayList<String> logMessages = new ArrayList<String>(); resultEntry.getAttributeValues(logMessagesType, DirectoryStringSyntax.DECODER, logMessages); if ((taskState != TaskState.COMPLETED_SUCCESSFULLY) && (taskState != TaskState.RUNNING)) { if (logMessages.size() == 0) { fail("No log messages were written to the task entry on a failed task"); } else { if (expectedMessage != null) { log(logMessages.get(0)); log(expectedMessage.toString()); assertTrue(logMessages.get(0).indexOf( expectedMessage.toString())>=0); } } } assertEquals(taskState, expectedTaskState, "Task State:" + taskState + " Expected task state:" + expectedTaskState); } catch(Exception e) { fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } /** * Add to the current DB the entries necessary to the test */ opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -26,9 +26,10 @@ */ package org.opends.server.replication; import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME; import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE; import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -51,8 +52,10 @@ import org.opends.server.backends.task.TaskState; import org.opends.server.config.ConfigException; import org.opends.server.core.AddOperation; import org.opends.server.core.AddOperationBasis; import org.opends.server.core.DeleteOperationBasis; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.protocols.ldap.LDAPFilter; @@ -90,6 +93,9 @@ public abstract class ReplicationTestCase extends DirectoryServerTestCase { // The tracer object for the debug logger private static final DebugTracer TRACER = getTracer(); /** * The internal connection used for operation */ @@ -726,4 +732,187 @@ return new ReplSessionSecurity(null, null, null, true); } /** * Add a task to the configuration of the current running DS. * @param taskEntry The task to add. * @param expectedResult The expected result code for the ADD. * @param errorMessageID The expected error messageID when the expected * result code is not SUCCESS */ protected void addTask(Entry taskEntry, ResultCode expectedResult, Message errorMessage) { try { TRACER.debugInfo("AddTask/" + taskEntry); // Change config of DS to launch the total update task InternalClientConnection connection = InternalClientConnection.getRootConnection(); // Add the task. AddOperation addOperation = connection.processAdd(taskEntry.getDN(), taskEntry.getObjectClasses(), taskEntry.getUserAttributes(), taskEntry.getOperationalAttributes()); assertEquals(addOperation.getResultCode(), expectedResult, "Result of ADD operation of the task is: " + addOperation.getResultCode() + " Expected:" + expectedResult + " Details:" + addOperation.getErrorMessage() + addOperation.getAdditionalLogMessage()); if (expectedResult != ResultCode.SUCCESS) { assertTrue(addOperation.getErrorMessage().toString(). startsWith(errorMessage.toString()), "Error MsgID of the task <" + addOperation.getErrorMessage() + "> equals <" + errorMessage + ">"); TRACER.debugInfo("Create config task: <"+ errorMessage.getDescriptor().getId() + addOperation.getErrorMessage() + ">"); } else { waitTaskState(taskEntry, TaskState.RUNNING, null); } // Entry will be removed at the end of the test entryList.addLast(taskEntry.getDN()); TRACER.debugInfo("AddedTask/" + taskEntry.getDN()); } catch(Exception e) { fail("Exception when adding task:"+ e.getMessage()); } } protected void waitTaskState(Entry taskEntry, TaskState expectedTaskState, Message expectedMessage) { TaskState taskState = null; int cpt=10; try { SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)"); Entry resultEntry = null; do { InternalSearchOperation searchOperation = connection.processSearch(taskEntry.getDN(), SearchScope.BASE_OBJECT, filter); try { resultEntry = searchOperation.getSearchEntries().getFirst(); } catch (Exception e) { fail("Task entry was not returned from the search."); continue; } try { // Check that the task state is as expected. AttributeType taskStateType = DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase()); String stateString = resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER); taskState = TaskState.fromString(stateString); } catch(Exception e) { fail("Exception"+ e.getMessage()+e.getStackTrace()); } Thread.sleep(500); cpt--; } while ((taskState != expectedTaskState) && (taskState != TaskState.STOPPED_BY_ERROR) && (taskState != TaskState.COMPLETED_SUCCESSFULLY) && (cpt > 0)); // Check that the task contains some log messages. AttributeType logMessagesType = DirectoryServer.getAttributeType( ATTR_TASK_LOG_MESSAGES.toLowerCase()); ArrayList<String> logMessages = new ArrayList<String>(); resultEntry.getAttributeValues(logMessagesType, DirectoryStringSyntax.DECODER, logMessages); if ((taskState != TaskState.COMPLETED_SUCCESSFULLY) && (taskState != TaskState.RUNNING)) { if (logMessages.size() == 0) { fail("No log messages were written to the task entry on a failed task"); } else { TRACER.debugInfo(logMessages.get(0)); if (expectedMessage != null) { TRACER.debugInfo(expectedMessage.toString()); assertTrue(logMessages.get(0).indexOf( expectedMessage.toString())>0); } } } assertEquals(taskState, expectedTaskState, "Task State:" + taskState + " Expected task state:" + expectedTaskState); } catch(Exception e) { fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } /** * Add to the current DB the entries necessary to the test */ protected void addTestEntriesToDB(String[] ldifEntries) { try { // Change config of DS to launch the total update task InternalClientConnection connection = InternalClientConnection.getRootConnection(); for (String ldifEntry : ldifEntries) { Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry); AddOperationBasis addOp = new AddOperationBasis( connection, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(), null, entry.getDN(), entry.getObjectClasses(), entry.getUserAttributes(), entry.getOperationalAttributes()); addOp.setInternalOperation(true); addOp.run(); if (addOp.getResultCode() != ResultCode.SUCCESS) { TRACER.debugInfo("Failed to add entry " + entry.getDN() + "Result code = : " + addOp.getResultCode()); } // They will be removed at the end of the test entryList.addLast(entry.getDN()); } } catch(Exception e) { fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e)); } } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -34,6 +34,7 @@ import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.replication.protocol.OperationContext.*; import java.io.File; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.ServerSocket; @@ -42,11 +43,10 @@ import java.util.List; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.TestCaseUtils; import org.opends.server.backends.task.TaskState; import org.opends.server.core.ModifyDNOperationBasis; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.ReplicationTestCase; @@ -62,6 +62,7 @@ import org.opends.server.types.ModificationType; import org.opends.server.types.RDN; import org.opends.server.types.DirectoryConfig; import org.opends.server.types.ResultCode; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation; import org.testng.annotations.AfterClass; @@ -413,7 +414,7 @@ @Test(enabled=true, dependsOnMethods = { "changelogBasic" }) public void stopChangelog() throws Exception { replicationServer.shutdown(); replicationServer.remove(); configure(); newClient(); newClientWithFirstChanges(); @@ -628,7 +629,7 @@ ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0, changelogIds[i], 0, 100, servers); replicationServer = new ReplicationServer(conf); changelogs[i] = new ReplicationServer(conf); } ReplicationBroker broker1 = null; @@ -763,9 +764,9 @@ finally { if (changelogs[0] != null) changelogs[0].shutdown(); changelogs[0].remove(); if (changelogs[1] != null) changelogs[1].shutdown(); changelogs[1].remove(); if (broker1 != null) broker1.stop(); if (broker2 != null) @@ -972,4 +973,53 @@ } } } /* * Test backup and restore of the Replication server backend */ @Test(enabled=true) public void backupRestore() throws Exception { debugInfo("Starting backupRestore"); Entry backupTask = createBackupTask(); Entry restoreTask = createRestoreTask(); addTask(backupTask, ResultCode.SUCCESS, null); waitTaskState(backupTask, TaskState.COMPLETED_SUCCESSFULLY, null); addTask(restoreTask, ResultCode.SUCCESS, null); waitTaskState(restoreTask, TaskState.COMPLETED_SUCCESSFULLY, null); debugInfo("Ending backupRestore"); } private Entry createBackupTask() throws Exception { return TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-backup", "ds-task-class-name: org.opends.server.tasks.BackupTask", "ds-backup-directory-path: bak" + File.separator + "replicationChanges", "ds-task-backup-backend-id: replicationChanges"); } private Entry createRestoreTask() throws Exception { return TestCaseUtils.makeEntry( "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks", "objectclass: top", "objectclass: ds-task", "objectclass: ds-task-restore", "ds-task-class-name: org.opends.server.tasks.RestoreTask", "ds-backup-directory-path: bak" + File.separator + "replicationChanges"); } }