opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -67,6 +67,7 @@ import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.common.StatusMachineEvent; import org.opends.server.replication.protocol.*; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.replication.service.ReplicationDomain; import org.opends.server.tasks.PurgeConflictsHistoricalTask; @@ -182,6 +183,7 @@ */ private static final DebugTracer TRACER = getTracer(); private final DSRSShutdownSync dsrsShutdownSync; /** * The update to replay message queue where the listener thread is going to * push incoming update messages. @@ -452,14 +454,17 @@ * * @param configuration The configuration of this ReplicationDomain. * @param updateToReplayQueue The queue for update messages to replay. * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. * @throws ConfigException In case of invalid configuration. */ LDAPReplicationDomain(ReplicationDomainCfg configuration, BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException BlockingQueue<UpdateToReplay> updateToReplayQueue, DSRSShutdownSync dsrsShutdownSync) throws ConfigException { super(configuration, -1); this.updateToReplayQueue = updateToReplayQueue; this.dsrsShutdownSync = dsrsShutdownSync; // Get assured configuration readAssuredConfig(configuration, false); @@ -2017,6 +2022,7 @@ public void publishReplicaOfflineMsg() { pendingChanges.putReplicaOfflineMsg(); dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN()); } /** opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -41,13 +41,13 @@ import org.opends.server.api.*; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.types.*; import org.opends.server.types.operation.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.replication.plugin. ReplicationRepairRequestControl.*; import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*; import static org.opends.server.util.StaticUtils.*; /** @@ -67,9 +67,10 @@ BackupTaskListener, RestoreTaskListener, ImportTaskListener, ExportTaskListener { private ReplicationServerListener replicationServerListener = null; private ReplicationServerListener replicationServerListener; private static final Map<DN, LDAPReplicationDomain> domains = new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ; new ConcurrentHashMap<DN, LDAPReplicationDomain>(4); private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync(); /** * The queue of received update messages, to be treated by the ReplayThread @@ -113,8 +114,7 @@ * Can be null is the request has no associated operation. * @return The domain for this DN. */ public static LDAPReplicationDomain findDomain( DN dn, PluginOperation pluginOp) public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp) { /* * Don't run the special replication code on Operation that are @@ -124,7 +124,9 @@ { final Operation op = (Operation) pluginOp; if (op.dontSynchronize()) { return null; } /* * Check if the provided operation is a repair operation and set the @@ -181,8 +183,8 @@ { try { LDAPReplicationDomain domain = new LDAPReplicationDomain(configuration, updateToReplayQueue); final LDAPReplicationDomain domain = new LDAPReplicationDomain( configuration, updateToReplayQueue, dsrsShutdownSync); if (domains.size() == 0) { // Create the threads that will process incoming update messages @@ -218,9 +220,8 @@ BlockingQueue<UpdateToReplay> queue) throws ConfigException { LDAPReplicationDomain domain = new LDAPReplicationDomain(configuration, queue); final LDAPReplicationDomain domain = new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync); domains.put(domain.getBaseDN(), domain); return domain; } @@ -246,35 +247,30 @@ /** {@inheritDoc} */ @Override public void initializeSynchronizationProvider( ReplicationSynchronizationProviderCfg configuration) throws ConfigException ReplicationSynchronizationProviderCfg cfg) throws ConfigException { domains.clear(); replicationServerListener = new ReplicationServerListener(configuration); replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync); // Register as an add and delete listener with the root configuration so we // can be notified if Multimaster domain entries are added or removed. configuration.addReplicationDomainAddListener(this); configuration.addReplicationDomainDeleteListener(this); cfg.addReplicationDomainAddListener(this); cfg.addReplicationDomainDeleteListener(this); // Register as a root configuration listener so that we can be notified if // number of replay threads is changed and apply changes. configuration.addReplicationChangeListener(this); cfg.addReplicationChangeListener(this); replayThreadNumber = configuration.getNumUpdateReplayThreads(); connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(), Integer.MAX_VALUE); replayThreadNumber = cfg.getNumUpdateReplayThreads(); connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE); // Create the list of domains that are already defined. for (String name : configuration.listReplicationDomains()) for (String name : cfg.listReplicationDomains()) { createNewDomain(configuration.getReplicationDomain(name)); createNewDomain(cfg.getReplicationDomain(name)); } /* * If any schema changes were made with the server offline, then handle them * now. */ // If any schema changes were made with the server offline, then handle them now. List<Modification> offlineSchemaChanges = DirectoryServer.getOfflineSchemaChanges(); if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty()) @@ -402,12 +398,12 @@ public SynchronizationProviderResult handleConflictResolution( PreOperationModifyOperation modifyOperation) { LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation); if (domain == null) return new SynchronizationProviderResult.ContinueProcessing(); return domain.handleConflictResolution(modifyOperation); LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation); if (domain != null) { return domain.handleConflictResolution(modifyOperation); } return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @@ -415,12 +411,12 @@ public SynchronizationProviderResult handleConflictResolution( PreOperationAddOperation addOperation) throws DirectoryException { LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation); if (domain == null) return new SynchronizationProviderResult.ContinueProcessing(); return domain.handleConflictResolution(addOperation); LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation); if (domain != null) { return domain.handleConflictResolution(addOperation); } return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @@ -428,12 +424,12 @@ public SynchronizationProviderResult handleConflictResolution( PreOperationDeleteOperation deleteOperation) throws DirectoryException { LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation); if (domain == null) return new SynchronizationProviderResult.ContinueProcessing(); return domain.handleConflictResolution(deleteOperation); LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation); if (domain != null) { return domain.handleConflictResolution(deleteOperation); } return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @@ -441,12 +437,12 @@ public SynchronizationProviderResult handleConflictResolution( PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException { LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation); if (domain == null) return new SynchronizationProviderResult.ContinueProcessing(); return domain.handleConflictResolution(modifyDNOperation); LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation); if (domain != null) { return domain.handleConflictResolution(modifyDNOperation); } return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @@ -505,7 +501,9 @@ LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation); if (domain == null || !domain.solveConflict()) { return new SynchronizationProviderResult.ContinueProcessing(); } // The historical object is retrieved from the attachment created // in the HandleConflictResolution phase. @@ -537,11 +535,15 @@ LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation); if (domain == null) { return new SynchronizationProviderResult.ContinueProcessing(); } // For LOCAL op only, generate CSN and attach Context if (!addOperation.isSynchronizationOperation()) { domain.doPreOperation(addOperation); } // Add to the operation the historical attribute : "dn:changeNumber:add" EntryHistorical.setHistoricalAttrToOperation(addOperation); @@ -564,7 +566,9 @@ stopReplayThreads(); if (replicationServerListener != null) { replicationServerListener.shutdown(); } DirectoryServer.deregisterBackupTaskListener(this); DirectoryServer.deregisterRestoreTaskListener(this); @@ -585,10 +589,11 @@ @Override public void processSchemaChange(List<Modification> modifications) { LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null); LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null); if (domain != null) { domain.synchronizeModifications(modifications); } } /** {@inheritDoc} */ @@ -599,7 +604,9 @@ { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.backupStart(); } } } @@ -612,7 +619,9 @@ { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.backupEnd(); } } } @@ -624,7 +633,9 @@ { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.disable(); } } } @@ -637,7 +648,9 @@ { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.enable(); } } } @@ -649,7 +662,9 @@ { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.disable(); } } } @@ -662,7 +677,9 @@ { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.enable(); } } } @@ -674,7 +691,9 @@ { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.backupStart(); } } } @@ -687,7 +706,9 @@ { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.backupEnd(); } } } opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -126,7 +126,10 @@ while (firstChange != null && firstChange.isCommitted()) { state.update(firstCSN); if (firstChange.getMsg().contributesToDomainState()) { state.update(firstCSN); } pendingChanges.remove(firstCSN); if (pendingChanges.isEmpty()) opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
@@ -22,34 +22,34 @@ * * * Copyright 2008 Sun Microsystems, Inc. * Portions Copyright 2014 ForgeRock AS */ package org.opends.server.replication.plugin; import org.opends.messages.Message; import java.util.List; import org.opends.messages.Message; import org.opends.server.admin.server.ConfigurationAddListener; import org.opends.server.admin.server.ConfigurationDeleteListener; import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; import org.opends.server.admin.std.server.ReplicationServerCfg; import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; import org.opends.server.config.ConfigException; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.types.ConfigChangeResult; import org.opends.server.types.ResultCode; /** * This class is used to create and object that can * register in the admin framework as a listener for changes, add and delete * on the ReplicationServer configuration objects. * */ public class ReplicationServerListener implements ConfigurationAddListener<ReplicationServerCfg>, ConfigurationDeleteListener<ReplicationServerCfg> { ReplicationServer replicationServer = null; private final DSRSShutdownSync dsrsShutdownSync; private ReplicationServer replicationServer; /** * Build a ReplicationServer Listener from the given Multimaster @@ -57,36 +57,36 @@ * * @param configuration The configuration that will be used to listen * for replicationServer configuration changes. * * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. * @throws ConfigException if the ReplicationServerListener can't register for * listening to changes on the provided configuration * object. */ public ReplicationServerListener( ReplicationSynchronizationProviderCfg configuration) throws ConfigException ReplicationSynchronizationProviderCfg configuration, DSRSShutdownSync dsrsShutdownSync) throws ConfigException { configuration.addReplicationServerAddListener(this); configuration.addReplicationServerDeleteListener(this); this.dsrsShutdownSync = dsrsShutdownSync; if (configuration.hasReplicationServer()) { ReplicationServerCfg server = configuration.getReplicationServer(); replicationServer = new ReplicationServer(server); final ReplicationServerCfg cfg = configuration.getReplicationServer(); replicationServer = new ReplicationServer(cfg, dsrsShutdownSync); } } /** * {@inheritDoc} */ public ConfigChangeResult applyConfigurationAdd( ReplicationServerCfg configuration) /** {@inheritDoc} */ @Override public ConfigChangeResult applyConfigurationAdd(ReplicationServerCfg cfg) { try { replicationServer = new ReplicationServer(configuration); replicationServer = new ReplicationServer(cfg, dsrsShutdownSync); return new ConfigChangeResult(ResultCode.SUCCESS, false); } catch (ConfigException e) } catch (ConfigException e) { // we should never get to this point because the configEntry has // already been validated in configAddisAcceptable @@ -94,14 +94,12 @@ } } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public boolean isConfigurationAddAcceptable( ReplicationServerCfg configuration, List<Message> unacceptableReasons) ReplicationServerCfg cfg, List<Message> unacceptableReasons) { return ReplicationServer.isConfigurationAcceptable( configuration, unacceptableReasons); return ReplicationServer.isConfigurationAcceptable(cfg, unacceptableReasons); } /** @@ -110,14 +108,14 @@ public void shutdown() { if (replicationServer != null) { replicationServer.shutdown(); } } /** * {@inheritDoc} */ public ConfigChangeResult applyConfigurationDelete( ReplicationServerCfg configuration) /** {@inheritDoc} */ @Override public ConfigChangeResult applyConfigurationDelete(ReplicationServerCfg cfg) { // There can be only one replicationServer, just shutdown the // replicationServer currently configured. @@ -128,11 +126,10 @@ return new ConfigChangeResult(ResultCode.SUCCESS, false); } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public boolean isConfigurationDeleteAcceptable( ReplicationServerCfg configuration, List<Message> unacceptableReasons) ReplicationServerCfg cfg, List<Message> unacceptableReasons) { return true; } opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
@@ -106,6 +106,13 @@ /** {@inheritDoc} */ @Override public boolean contributesToDomainState() { return false; // replica offline msg MUST NOT update the ds-sync-state } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " offlineCSN=" + csn; opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -294,4 +294,14 @@ { return payload; } /** * Whether the current message can update the "ds-sync-state" attribute. * * @return true if current message can update the "ds-sync-state" attribute, false otherwise. */ public boolean contributesToDomainState() { return true; } } opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -35,6 +35,7 @@ import org.opends.server.replication.protocol.DoneMsg; import org.opends.server.replication.protocol.ECLUpdateMsg; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; @@ -50,7 +51,7 @@ * This class defines a server writer, which is used to send changes to a * directory server. */ public class ECLServerWriter extends ServerWriter class ECLServerWriter extends ServerWriter { /** * The tracer object for the debug logger. @@ -62,7 +63,7 @@ private final ReplicationServerDomain replicationServerDomain; private boolean suspended; private volatile boolean shutdown; private PersistentSearch mypsearch; private final PersistentSearch mypsearch; /** * Create a ServerWriter. @@ -72,10 +73,10 @@ * @param replicationServerDomain the ReplicationServerDomain of this * ServerWriter. */ public ECLServerWriter(Session session, ECLServerHandler handler, ECLServerWriter(Session session, ECLServerHandler handler, ReplicationServerDomain replicationServerDomain) { super(session, handler, replicationServerDomain); super(session, handler, replicationServerDomain, new DSRSShutdownSync()); setName("Replication ECL Writer Thread for operation " + handler.getOperationId()); @@ -85,21 +86,26 @@ this.replicationServerDomain = replicationServerDomain; this.suspended = false; this.shutdown = false; this.mypsearch = findPersistentSearch(handler); } // Look for the psearch object related to this operation, the one that // will be notified with new entries to be returned. ECLWorkflowElement wfe = (ECLWorkflowElement) DirectoryServer .getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT); /** * Look for the persistent search object related to this operation, the one * that will be notified with new entries to be returned. */ private PersistentSearch findPersistentSearch(ECLServerHandler handler) { ECLWorkflowElement wfe = (ECLWorkflowElement) DirectoryServer.getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT); for (PersistentSearch psearch : wfe.getPersistentSearches()) { if (psearch.getSearchOperation().toString().equals( handler.getOperationId())) { mypsearch = psearch; break; return psearch; } } return null; } /** @@ -107,7 +113,7 @@ * waiting for the startCLSessionMsg. Then it may be * suspended between 2 jobs, each job being a separate search. */ public synchronized void suspendWriter() private synchronized void suspendWriter() { suspended = true; } @@ -115,7 +121,7 @@ /** * Resume the writer. */ public synchronized void resumeWriter() synchronized void resumeWriter() { suspended = false; notify(); @@ -187,7 +193,7 @@ * @throws IOException when raised (connection closure) * @throws InterruptedException when raised */ public void doIt() throws IOException, InterruptedException private void doIt() throws IOException, InterruptedException { while (true) { @@ -237,7 +243,7 @@ /** * Shutdown the writer. */ public synchronized void shutdownWriter() synchronized void shutdownWriter() { shutdown = true; notify(); opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -45,6 +45,7 @@ import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.types.Attributes.*; import static org.opends.server.util.StaticUtils.*; /** @@ -231,13 +232,10 @@ public List<Attribute> getMonitorData() { List<Attribute> attributes = new ArrayList<Attribute>(); attributes.add(Attributes.create("handler", getMonitorInstanceName())); attributes.add( Attributes.create("queue-size", String.valueOf(msgQueue.count()))); attributes.add( Attributes.create( "queue-size-bytes", String.valueOf(msgQueue.bytesCount()))); attributes.add(Attributes.create("following", String.valueOf(following))); attributes.add(create("handler", getMonitorInstanceName())); attributes.add(create("queue-size", String.valueOf(msgQueue.count()))); attributes.add(create("queue-size-bytes", String.valueOf(msgQueue.bytesCount()))); attributes.add(create("following", String.valueOf(following))); return attributes; } @@ -422,21 +420,20 @@ */ public CSN getOlderUpdateCSN() { CSN result = null; synchronized (msgQueue) { if (following) { if (!msgQueue.isEmpty()) { result = msgQueue.first().getCSN(); return msgQueue.first().getCSN(); } } else { if (!lateQueue.isEmpty()) { result = lateQueue.first().getCSN(); return lateQueue.first().getCSN(); } else { @@ -447,11 +444,11 @@ the lateQueue when it will send the next update but we are not yet there. So let's take the last change not sent directly from the db. */ result = findOldestCSNFromReplicaDBs(); return findOldestCSNFromReplicaDBs(); } } } return result; return null; } private CSN findOldestCSNFromReplicaDBs() @@ -460,10 +457,13 @@ try { cursor = replicationServerDomain.getCursorFrom(serverState); cursor.next(); if (cursor.getRecord() != null) while (cursor.next()) { return cursor.getRecord().getCSN(); final UpdateMsg record = cursor.getRecord(); if (record.contributesToDomainState()) { return record.getCSN(); } } return null; } opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -37,8 +37,8 @@ import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.*; import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.*; import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation; import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.ConflictBehavior; import org.opends.server.admin.std.server.ReplicationServerCfg; import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg; import org.opends.server.api.VirtualAttributeProvider; @@ -51,9 +51,13 @@ import org.opends.server.replication.common.*; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.*; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.file.FileChangelogDB; import org.opends.server.replication.server.changelog.je.JEChangelogDB; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.types.*; import org.opends.server.util.ServerConstants; import org.opends.server.util.StaticUtils; @@ -82,6 +86,7 @@ /** The current configuration of this replication server. */ private ReplicationServerCfg config; private final DSRSShutdownSync dsrsShutdownSync; /** * This table is used to store the list of dn for which we are currently @@ -126,34 +131,39 @@ /** * Creates a new Replication server using the provided configuration entry. * * @param configuration The configuration of this replication server. * @param cfg The configuration of this replication server. * @throws ConfigException When Configuration is invalid. */ public ReplicationServer(ReplicationServerCfg configuration) throws ConfigException public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException { this.config = configuration; ReplicationDBImplementation dbImpl = configuration.getReplicationDBImplementation(); if (dbImpl == ReplicationDBImplementation.JE) this(cfg, new DSRSShutdownSync()); } /** * Creates a new Replication server using the provided configuration entry. * * @param cfg The configuration of this replication server. * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. * @throws ConfigException When Configuration is invalid. */ public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException { this.config = cfg; this.dsrsShutdownSync = dsrsShutdownSync; ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation(); if (DebugLogger.debugEnabled()) { if (DebugLogger.debugEnabled()) { TRACER.debugMessage(DebugLogLevel.INFO, "Using JE as DB implementation for changelog DB"); } this.changelogDB = new JEChangelogDB(this, configuration); TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl + " as DB implementation for changelog DB"); } else { if (DebugLogger.debugEnabled()) { TRACER.debugMessage(DebugLogLevel.INFO, "Using LOG FILE as DB implementation for changelog DB"); } this.changelogDB = new FileChangelogDB(this, configuration); } this.changelogDB = dbImpl == ReplicationDBImplementation.JE ? new JEChangelogDB(this, cfg) : new FileChangelogDB(this, cfg); replSessionSecurity = new ReplSessionSecurity(); initialize(); configuration.addChangeListener(this); cfg.addChangeListener(this); localPorts.add(getReplicationPort()); @@ -1227,6 +1237,16 @@ return this.changelogDB; } /** * Returns the synchronization object for shutdown of combined DS/RS instances. * * @return the synchronization object for shutdown of combined DS/RS instances. */ DSRSShutdownSync getDSRSShutdownSync() { return dsrsShutdownSync; } /** {@inheritDoc} */ @Override public String toString() opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -68,7 +68,7 @@ /** * The session opened with the remote server. */ protected Session session; protected final Session session; /** * The serverURL of the remote server. @@ -77,40 +77,39 @@ /** * Number of updates received from the server in assured safe read mode. */ protected int assuredSrReceivedUpdates = 0; private int assuredSrReceivedUpdates = 0; /** * Number of updates received from the server in assured safe read mode that * timed out. */ protected AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger(); private final AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger(); /** * Number of updates sent to the server in assured safe read mode. */ protected int assuredSrSentUpdates = 0; private int assuredSrSentUpdates = 0; /** * Number of updates sent to the server in assured safe read mode that timed * out. */ protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger(); private final AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger(); /** * Number of updates received from the server in assured safe data mode. */ protected int assuredSdReceivedUpdates = 0; private int assuredSdReceivedUpdates = 0; /** * Number of updates received from the server in assured safe data mode that * timed out. */ protected AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger(); private final AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger(); /** * Number of updates sent to the server in assured safe data mode. */ protected int assuredSdSentUpdates = 0; private int assuredSdSentUpdates = 0; /** * Number of updates sent to the server in assured safe data mode that timed * out. * Number of updates sent to the server in assured safe data mode that timed out. */ protected AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger(); private final AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger(); /** * The associated ServerWriter that sends messages to the remote server. @@ -301,7 +300,8 @@ // sendWindow MUST be created before starting the writer sendWindow = new Semaphore(sendWindowSize); writer = new ServerWriter(session, this, replicationServerDomain); writer = new ServerWriter(session, this, replicationServerDomain, replicationServer.getDSRSShutdownSync()); reader = new ServerReader(session, this); session.setName("Replication server RS(" + getReplicationServerId() @@ -626,7 +626,7 @@ * Increment the number of updates sent to the server in assured safe data * mode. */ public void incrementAssuredSdSentUpdates() private void incrementAssuredSdSentUpdates() { assuredSdSentUpdates++; } @@ -662,7 +662,7 @@ * Increment the number of updates sent to the server in assured safe read * mode. */ public void incrementAssuredSrSentUpdates() private void incrementAssuredSrSentUpdates() { assuredSrSentUpdates++; } opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -32,8 +32,10 @@ import org.opends.server.api.DirectoryThread; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.ServerStatus; import org.opends.server.replication.protocol.ReplicaOfflineMsg; import org.opends.server.replication.protocol.Session; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.service.DSRSShutdownSync; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; @@ -55,8 +57,7 @@ private final Session session; private final ServerHandler handler; private final ReplicationServerDomain replicationServerDomain; private final DSRSShutdownSync dsrsShutdownSync; /** * Create a ServerWriter. Then ServerWriter then waits on the ServerHandler @@ -68,9 +69,11 @@ * handler for which the ServerWriter is created. * @param replicationServerDomain * The ReplicationServerDomain of this ServerWriter. * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. */ public ServerWriter(Session session, ServerHandler handler, ReplicationServerDomain replicationServerDomain) ReplicationServerDomain replicationServerDomain, DSRSShutdownSync dsrsShutdownSync) { // Session may be null for ECLServerWriter. super("Replication server RS(" + handler.getReplicationServerId() @@ -80,6 +83,7 @@ this.session = session; this.handler = handler; this.replicationServerDomain = replicationServerDomain; this.dsrsShutdownSync = dsrsShutdownSync; } /** @@ -98,7 +102,9 @@ Message errMessage = null; try { while (true) boolean shutdown = false; while (!shutdown || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN())) { final UpdateMsg updateMsg = replicationServerDomain.take(this.handler); if (updateMsg == null) @@ -106,12 +112,16 @@ // this connection is closing errMessage = Message.raw( "Connection closure: null update returned by domain."); return; shutdown = true; } else if (!isUpdateMsgFiltered(updateMsg)) { // Publish the update to the remote server using a protocol version it supports session.publish(updateMsg); if (updateMsg instanceof ReplicaOfflineMsg) { dsrsShutdownSync.replicaOfflineMsgForwarded(replicationServerDomain.getBaseDN()); } } } } opends/src/server/org/opends/server/replication/service/DSRSShutdownSync.java
New file @@ -0,0 +1,85 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2014 ForgeRock AS */ package org.opends.server.replication.service; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicLong; import org.opends.server.types.DN; /** * Class useful for the case where DS/RS instances are collocated inside the * same JVM. It synchronizes the shutdown of the DS and RS sides. * <p> * More specifically, it ensures a ReplicaOfflineMsg sent by the DS is * relayed/forwarded by the collocated RS to the other RSs in the topology * before the whole process shuts down. * * @since OPENDJ-1453 */ public class DSRSShutdownSync { private static final ConcurrentSkipListSet<DN> replicaOfflineMsgs = new ConcurrentSkipListSet<DN>(); private static AtomicLong stopInstanceTimestamp = new AtomicLong(); /** * Message has been sent. * * @param baseDN * the domain for which the message has been sent */ public void replicaOfflineMsgSent(DN baseDN) { stopInstanceTimestamp.compareAndSet(0, System.currentTimeMillis()); replicaOfflineMsgs.add(baseDN); } /** * Message has been forwarded. * * @param baseDN * the domain for which the message has been sent */ public void replicaOfflineMsgForwarded(DN baseDN) { replicaOfflineMsgs.remove(baseDN); } /** * Whether a ReplicationServer ServerReader or ServerWriter can proceed with * shutdown. * * @param baseDN * the baseDN of the ServerReader or ServerWriter . * @return true if the caller can shutdown, false otherwise */ public boolean canShutdown(DN baseDN) { return !replicaOfflineMsgs.contains(baseDN) || System.currentTimeMillis() - stopInstanceTimestamp.get() > 5000; } } opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -3005,7 +3005,8 @@ // The server is shutting down. listenerThread.initiateShutdown(); } else if (processUpdate(updateMsg)) else if (processUpdate(updateMsg) && updateMsg.contributesToDomainState()) { /* * Warning: in synchronous mode, no way to tell the replay of an @@ -3426,9 +3427,11 @@ */ public void publish(UpdateMsg msg) { // Publish the update broker.publish(msg); state.update(msg.getCSN()); if (msg.contributesToDomainState()) { state.update(msg.getCSN()); } numSentUpdates.incrementAndGet(); }