From aea0892feca2fd3d56c9c810debed6d22389454e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 11 Jul 2014 09:54:43 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3938) Replica offline messages should be synced with updates
---
opendj3-server-dev/src/server/org/opends/server/replication/service/DSRSShutdownSync.java | 85 ++++++++++
opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 8
opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java | 65 +++----
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java | 9
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java | 37 +++
opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java | 36 ++-
opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java | 30 +-
opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java | 20 +
opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 125 +++++++++------
opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java | 5
opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java | 28 +-
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java | 7
opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java | 10 +
13 files changed, 319 insertions(+), 146 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index d99d8c1..cc581d0 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -70,6 +70,7 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
@@ -185,6 +186,7 @@
public static final String DS_SYNC_CONFLICT = "ds-sync-conflict";
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+ private final DSRSShutdownSync dsrsShutdownSync;
/**
* The update to replay message queue where the listener thread is going to
* push incoming update messages.
@@ -455,14 +457,17 @@
*
* @param configuration The configuration of this ReplicationDomain.
* @param updateToReplayQueue The queue for update messages to replay.
+ * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
* @throws ConfigException In case of invalid configuration.
*/
LDAPReplicationDomain(ReplicationDomainCfg configuration,
- BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException
+ BlockingQueue<UpdateToReplay> updateToReplayQueue,
+ DSRSShutdownSync dsrsShutdownSync) throws ConfigException
{
super(configuration, -1);
this.updateToReplayQueue = updateToReplayQueue;
+ this.dsrsShutdownSync = dsrsShutdownSync;
// Get assured configuration
readAssuredConfig(configuration, false);
@@ -2011,6 +2016,7 @@
public void publishReplicaOfflineMsg()
{
pendingChanges.putReplicaOfflineMsg();
+ dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN());
}
/**
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index 0eec310..d3eb610 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -43,14 +43,14 @@
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.api.*;
import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.types.operation.*;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.replication.plugin.
-ReplicationRepairRequestControl.*;
+import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -73,9 +73,10 @@
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
- private ReplicationServerListener replicationServerListener = null;
+ private ReplicationServerListener replicationServerListener;
private static final Map<DN, LDAPReplicationDomain> domains =
- new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
+ new ConcurrentHashMap<DN, LDAPReplicationDomain>(4);
+ private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
/**
* The queue of received update messages, to be treated by the ReplayThread
@@ -119,8 +120,7 @@
* Can be null is the request has no associated operation.
* @return The domain for this DN.
*/
- public static LDAPReplicationDomain findDomain(
- DN dn, PluginOperation pluginOp)
+ public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
{
/*
* Don't run the special replication code on Operation that are
@@ -130,7 +130,9 @@
{
final Operation op = (Operation) pluginOp;
if (op.dontSynchronize())
+ {
return null;
+ }
/*
* Check if the provided operation is a repair operation and set the
@@ -187,8 +189,8 @@
{
try
{
- LDAPReplicationDomain domain =
- new LDAPReplicationDomain(configuration, updateToReplayQueue);
+ final LDAPReplicationDomain domain = new LDAPReplicationDomain(
+ configuration, updateToReplayQueue, dsrsShutdownSync);
if (domains.size() == 0)
{
// Create the threads that will process incoming update messages
@@ -223,9 +225,8 @@
BlockingQueue<UpdateToReplay> queue)
throws ConfigException
{
- LDAPReplicationDomain domain =
- new LDAPReplicationDomain(configuration, queue);
-
+ final LDAPReplicationDomain domain =
+ new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
domains.put(domain.getBaseDN(), domain);
return domain;
}
@@ -251,35 +252,30 @@
/** {@inheritDoc} */
@Override
public void initializeSynchronizationProvider(
- ReplicationSynchronizationProviderCfg configuration)
- throws org.forgerock.opendj.config.server.ConfigException
+ ReplicationSynchronizationProviderCfg cfg) throws ConfigException
{
domains.clear();
- replicationServerListener = new ReplicationServerListener(configuration);
+ replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
// Register as an add and delete listener with the root configuration so we
// can be notified if Multimaster domain entries are added or removed.
- configuration.addReplicationDomainAddListener(this);
- configuration.addReplicationDomainDeleteListener(this);
+ cfg.addReplicationDomainAddListener(this);
+ cfg.addReplicationDomainDeleteListener(this);
// Register as a root configuration listener so that we can be notified if
// number of replay threads is changed and apply changes.
- configuration.addReplicationChangeListener(this);
+ cfg.addReplicationChangeListener(this);
- replayThreadNumber = configuration.getNumUpdateReplayThreads();
- connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
- Integer.MAX_VALUE);
+ replayThreadNumber = cfg.getNumUpdateReplayThreads();
+ connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
// Create the list of domains that are already defined.
- for (String name : configuration.listReplicationDomains())
+ for (String name : cfg.listReplicationDomains())
{
- createNewDomain(configuration.getReplicationDomain(name));
+ createNewDomain(cfg.getReplicationDomain(name));
}
- /*
- * If any schema changes were made with the server offline, then handle them
- * now.
- */
+ // If any schema changes were made with the server offline, then handle them now.
List<Modification> offlineSchemaChanges =
DirectoryServer.getOfflineSchemaChanges();
if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
@@ -407,12 +403,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationModifyOperation modifyOperation)
{
- LDAPReplicationDomain domain =
- findDomain(modifyOperation.getEntryDN(), modifyOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(modifyOperation);
+ LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(modifyOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -420,12 +416,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationAddOperation addOperation) throws DirectoryException
{
- LDAPReplicationDomain domain =
- findDomain(addOperation.getEntryDN(), addOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(addOperation);
+ LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(addOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -433,12 +429,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationDeleteOperation deleteOperation) throws DirectoryException
{
- LDAPReplicationDomain domain =
- findDomain(deleteOperation.getEntryDN(), deleteOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(deleteOperation);
+ LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(deleteOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -446,12 +442,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
{
- LDAPReplicationDomain domain =
- findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(modifyDNOperation);
+ LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(modifyDNOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -510,7 +506,9 @@
LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
if (domain == null || !domain.solveConflict())
+ {
return new SynchronizationProviderResult.ContinueProcessing();
+ }
// The historical object is retrieved from the attachment created
// in the HandleConflictResolution phase.
@@ -542,11 +540,15 @@
LDAPReplicationDomain domain =
findDomain(addOperation.getEntryDN(), addOperation);
if (domain == null)
+ {
return new SynchronizationProviderResult.ContinueProcessing();
+ }
// For LOCAL op only, generate CSN and attach Context
if (!addOperation.isSynchronizationOperation())
+ {
domain.doPreOperation(addOperation);
+ }
// Add to the operation the historical attribute : "dn:changeNumber:add"
EntryHistorical.setHistoricalAttrToOperation(addOperation);
@@ -569,7 +571,9 @@
stopReplayThreads();
if (replicationServerListener != null)
+ {
replicationServerListener.shutdown();
+ }
DirectoryServer.deregisterBackupTaskListener(this);
DirectoryServer.deregisterRestoreTaskListener(this);
@@ -590,10 +594,11 @@
@Override
public void processSchemaChange(List<Modification> modifications)
{
- LDAPReplicationDomain domain =
- findDomain(DirectoryServer.getSchemaDN(), null);
+ LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
if (domain != null)
+ {
domain.synchronizeModifications(modifications);
+ }
}
/** {@inheritDoc} */
@@ -604,7 +609,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupStart();
+ }
}
}
@@ -617,7 +624,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupEnd();
+ }
}
}
@@ -629,7 +638,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.disable();
+ }
}
}
@@ -642,7 +653,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.enable();
+ }
}
}
@@ -654,7 +667,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.disable();
+ }
}
}
@@ -667,7 +682,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.enable();
+ }
}
}
@@ -679,7 +696,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupStart();
+ }
}
}
@@ -692,7 +711,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupEnd();
+ }
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index beb6d19..8252018 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -126,7 +126,10 @@
while (firstChange != null && firstChange.isCommitted())
{
- state.update(firstCSN);
+ if (firstChange.getMsg().contributesToDomainState())
+ {
+ state.update(firstCSN);
+ }
pendingChanges.remove(firstCSN);
if (pendingChanges.isEmpty())
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
index 96de92f..939bfa0 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
@@ -26,30 +26,30 @@
*/
package org.opends.server.replication.plugin;
-import org.forgerock.i18n.LocalizableMessage;
-
import java.util.List;
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ResultCode;
+
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
-import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.admin.std.server.ReplicationServerCfg;
-import org.forgerock.opendj.config.server.ConfigException;
+import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.ConfigChangeResult;
-import org.forgerock.opendj.ldap.ResultCode;
-
/**
* This class is used to create and object that can
* register in the admin framework as a listener for changes, add and delete
* on the ReplicationServer configuration objects.
- *
*/
public class ReplicationServerListener
implements ConfigurationAddListener<ReplicationServerCfg>,
ConfigurationDeleteListener<ReplicationServerCfg>
{
+ private final DSRSShutdownSync dsrsShutdownSync;
private ReplicationServer replicationServer;
/**
@@ -58,36 +58,36 @@
*
* @param configuration The configuration that will be used to listen
* for replicationServer configuration changes.
- *
+ * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
* @throws ConfigException if the ReplicationServerListener can't register for
* listening to changes on the provided configuration
* object.
*/
public ReplicationServerListener(
- ReplicationSynchronizationProviderCfg configuration)
- throws ConfigException
+ ReplicationSynchronizationProviderCfg configuration,
+ DSRSShutdownSync dsrsShutdownSync) throws ConfigException
{
configuration.addReplicationServerAddListener(this);
configuration.addReplicationServerDeleteListener(this);
+ this.dsrsShutdownSync = dsrsShutdownSync;
if (configuration.hasReplicationServer())
{
- ReplicationServerCfg server = configuration.getReplicationServer();
- replicationServer = new ReplicationServer(server);
+ final ReplicationServerCfg cfg = configuration.getReplicationServer();
+ replicationServer = new ReplicationServer(cfg, dsrsShutdownSync);
}
}
- /**
- * {@inheritDoc}
- */
- public ConfigChangeResult applyConfigurationAdd(
- ReplicationServerCfg configuration)
+ /** {@inheritDoc} */
+ @Override
+ public ConfigChangeResult applyConfigurationAdd(ReplicationServerCfg cfg)
{
try
{
- replicationServer = new ReplicationServer(configuration);
+ replicationServer = new ReplicationServer(cfg, dsrsShutdownSync);
return new ConfigChangeResult(ResultCode.SUCCESS, false);
- } catch (ConfigException e)
+ }
+ catch (ConfigException e)
{
// we should never get to this point because the configEntry has
// already been validated in configAddisAcceptable
@@ -95,14 +95,12 @@
}
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
+ @Override
public boolean isConfigurationAddAcceptable(
- ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
+ ReplicationServerCfg cfg, List<LocalizableMessage> unacceptableReasons)
{
- return ReplicationServer.isConfigurationAcceptable(
- configuration, unacceptableReasons);
+ return ReplicationServer.isConfigurationAcceptable(cfg, unacceptableReasons);
}
/**
@@ -111,14 +109,14 @@
public void shutdown()
{
if (replicationServer != null)
+ {
replicationServer.shutdown();
+ }
}
- /**
- * {@inheritDoc}
- */
- public ConfigChangeResult applyConfigurationDelete(
- ReplicationServerCfg configuration)
+ /** {@inheritDoc} */
+ @Override
+ public ConfigChangeResult applyConfigurationDelete(ReplicationServerCfg cfg)
{
// There can be only one replicationServer, just shutdown the
// replicationServer currently configured.
@@ -129,11 +127,10 @@
return new ConfigChangeResult(ResultCode.SUCCESS, false);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
+ @Override
public boolean isConfigurationDeleteAcceptable(
- ReplicationServerCfg configuration, List<LocalizableMessage> unacceptableReasons)
+ ReplicationServerCfg cfg, List<LocalizableMessage> unacceptableReasons)
{
return true;
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
index a9ac7c7..a1409cd 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
@@ -106,6 +106,13 @@
/** {@inheritDoc} */
@Override
+ public boolean contributesToDomainState()
+ {
+ return false; // replica offline msg MUST NOT update the ds-sync-state
+ }
+
+ /** {@inheritDoc} */
+ @Override
public String toString()
{
return getClass().getSimpleName() + " offlineCSN=" + csn;
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java
index 3cf7e78..f27e597 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -294,4 +294,14 @@
{
return payload;
}
+
+ /**
+ * Whether the current message can update the "ds-sync-state" attribute.
+ *
+ * @return true if current message can update the "ds-sync-state" attribute, false otherwise.
+ */
+ public boolean contributesToDomainState()
+ {
+ return true;
+ }
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 880308c..0005e50 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -35,6 +35,7 @@
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.ECLUpdateMsg;
import org.opends.server.replication.protocol.Session;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
@@ -47,7 +48,7 @@
* This class defines a server writer, which is used to send changes to a
* directory server.
*/
-public class ECLServerWriter extends ServerWriter
+class ECLServerWriter extends ServerWriter
{
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
@@ -56,7 +57,7 @@
private final ReplicationServerDomain replicationServerDomain;
private boolean suspended;
private volatile boolean shutdown;
- private PersistentSearch mypsearch;
+ private final PersistentSearch mypsearch;
/**
* Create a ServerWriter.
@@ -66,10 +67,10 @@
* @param replicationServerDomain the ReplicationServerDomain of this
* ServerWriter.
*/
- public ECLServerWriter(Session session, ECLServerHandler handler,
+ ECLServerWriter(Session session, ECLServerHandler handler,
ReplicationServerDomain replicationServerDomain)
{
- super(session, handler, replicationServerDomain);
+ super(session, handler, replicationServerDomain, new DSRSShutdownSync());
setName("Replication ECL Writer Thread for operation " +
handler.getOperationId());
@@ -79,21 +80,26 @@
this.replicationServerDomain = replicationServerDomain;
this.suspended = false;
this.shutdown = false;
+ this.mypsearch = findPersistentSearch(handler);
+ }
- // Look for the psearch object related to this operation, the one that
- // will be notified with new entries to be returned.
- ECLWorkflowElement wfe =
- (ECLWorkflowElement) DirectoryServer
- .getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
+ /**
+ * Look for the persistent search object related to this operation, the one
+ * that will be notified with new entries to be returned.
+ */
+ private PersistentSearch findPersistentSearch(ECLServerHandler handler)
+ {
+ ECLWorkflowElement wfe = (ECLWorkflowElement)
+ DirectoryServer.getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
for (PersistentSearch psearch : wfe.getPersistentSearches())
{
if (psearch.getSearchOperation().toString().equals(
handler.getOperationId()))
{
- mypsearch = psearch;
- break;
+ return psearch;
}
}
+ return null;
}
/**
@@ -101,7 +107,7 @@
* waiting for the startCLSessionMsg. Then it may be
* suspended between 2 jobs, each job being a separate search.
*/
- public synchronized void suspendWriter()
+ private synchronized void suspendWriter()
{
suspended = true;
}
@@ -109,7 +115,7 @@
/**
* Resume the writer.
*/
- public synchronized void resumeWriter()
+ synchronized void resumeWriter()
{
suspended = false;
notify();
@@ -180,7 +186,7 @@
* @throws IOException when raised (connection closure)
* @throws InterruptedException when raised
*/
- public void doIt() throws IOException, InterruptedException
+ private void doIt() throws IOException, InterruptedException
{
while (true)
{
@@ -230,7 +236,7 @@
/**
* Shutdown the writer.
*/
- public synchronized void shutdownWriter()
+ synchronized void shutdownWriter()
{
shutdown = true;
notify();
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
index 9171191..e3684be 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -44,6 +44,7 @@
import org.opends.server.types.*;
import org.forgerock.opendj.ldap.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.types.Attributes.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -228,13 +229,10 @@
public List<Attribute> getMonitorData()
{
List<Attribute> attributes = new ArrayList<Attribute>();
- attributes.add(Attributes.create("handler", getMonitorInstanceName()));
- attributes.add(
- Attributes.create("queue-size", String.valueOf(msgQueue.count())));
- attributes.add(
- Attributes.create(
- "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
- attributes.add(Attributes.create("following", String.valueOf(following)));
+ attributes.add(create("handler", getMonitorInstanceName()));
+ attributes.add(create("queue-size", String.valueOf(msgQueue.count())));
+ attributes.add(create("queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
+ attributes.add(create("following", String.valueOf(following)));
return attributes;
}
@@ -419,21 +417,20 @@
*/
public CSN getOlderUpdateCSN()
{
- CSN result = null;
synchronized (msgQueue)
{
if (following)
{
if (!msgQueue.isEmpty())
{
- result = msgQueue.first().getCSN();
+ return msgQueue.first().getCSN();
}
}
else
{
if (!lateQueue.isEmpty())
{
- result = lateQueue.first().getCSN();
+ return lateQueue.first().getCSN();
}
else
{
@@ -444,11 +441,11 @@
the lateQueue when it will send the next update but we are not yet
there. So let's take the last change not sent directly from the db.
*/
- result = findOldestCSNFromReplicaDBs();
+ return findOldestCSNFromReplicaDBs();
}
}
}
- return result;
+ return null;
}
private CSN findOldestCSNFromReplicaDBs()
@@ -457,10 +454,13 @@
try
{
cursor = replicationServerDomain.getCursorFrom(serverState);
- cursor.next();
- if (cursor.getRecord() != null)
+ while (cursor.next())
{
- return cursor.getRecord().getCSN();
+ final UpdateMsg record = cursor.getRecord();
+ if (record.contributesToDomainState())
+ {
+ return record.getCSN();
+ }
}
return null;
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
index 2c6d9a8..61a993c 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -54,6 +54,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -79,6 +80,7 @@
/** The current configuration of this replication server. */
private ReplicationServerCfg config;
+ private final DSRSShutdownSync dsrsShutdownSync;
/**
* This table is used to store the list of dn for which we are currently
@@ -122,18 +124,31 @@
/**
* Creates a new Replication server using the provided configuration entry.
*
- * @param configuration The configuration of this replication server.
+ * @param cfg The configuration of this replication server.
* @throws ConfigException When Configuration is invalid.
*/
- public ReplicationServer(ReplicationServerCfg configuration)
- throws ConfigException
+ public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
{
- this.config = configuration;
- this.changelogDB = new JEChangelogDB(this, configuration);
+ this(cfg, new DSRSShutdownSync());
+ }
+
+ /**
+ * Creates a new Replication server using the provided configuration entry.
+ *
+ * @param cfg The configuration of this replication server.
+ * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
+ * @throws ConfigException When Configuration is invalid.
+ */
+ public ReplicationServer(ReplicationServerCfg cfg,
+ DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+ {
+ this.config = cfg;
+ this.changelogDB = new JEChangelogDB(this, cfg);
+ this.dsrsShutdownSync = dsrsShutdownSync;
replSessionSecurity = new ReplSessionSecurity();
initialize();
- configuration.addChangeListener(this);
+ cfg.addChangeListener(this);
localPorts.add(getReplicationPort());
@@ -1183,6 +1198,16 @@
return this.changelogDB;
}
+ /**
+ * Returns the synchronization object for shutdown of combined DS/RS instances.
+ *
+ * @return the synchronization object for shutdown of combined DS/RS instances.
+ */
+ DSRSShutdownSync getDSRSShutdownSync()
+ {
+ return dsrsShutdownSync;
+ }
+
/** {@inheritDoc} */
@Override
public String toString()
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
index d83ba85..3c664be 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -72,7 +72,7 @@
/**
* The session opened with the remote server.
*/
- protected Session session;
+ protected final Session session;
/**
* The serverURL of the remote server.
@@ -81,40 +81,39 @@
/**
* Number of updates received from the server in assured safe read mode.
*/
- protected int assuredSrReceivedUpdates = 0;
+ private int assuredSrReceivedUpdates = 0;
/**
* Number of updates received from the server in assured safe read mode that
* timed out.
*/
- protected AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
+ private final AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
/**
* Number of updates sent to the server in assured safe read mode.
*/
- protected int assuredSrSentUpdates = 0;
+ private int assuredSrSentUpdates = 0;
/**
* Number of updates sent to the server in assured safe read mode that timed
* out.
*/
- protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
+ private final AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
/**
* Number of updates received from the server in assured safe data mode.
*/
- protected int assuredSdReceivedUpdates = 0;
+ private int assuredSdReceivedUpdates = 0;
/**
* Number of updates received from the server in assured safe data mode that
* timed out.
*/
- protected AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
+ private final AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
/**
* Number of updates sent to the server in assured safe data mode.
*/
- protected int assuredSdSentUpdates = 0;
+ private int assuredSdSentUpdates = 0;
/**
- * Number of updates sent to the server in assured safe data mode that timed
- * out.
+ * Number of updates sent to the server in assured safe data mode that timed out.
*/
- protected AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
+ private final AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
/**
* The associated ServerWriter that sends messages to the remote server.
@@ -305,7 +304,8 @@
// sendWindow MUST be created before starting the writer
sendWindow = new Semaphore(sendWindowSize);
- writer = new ServerWriter(session, this, replicationServerDomain);
+ writer = new ServerWriter(session, this, replicationServerDomain,
+ replicationServer.getDSRSShutdownSync());
reader = new ServerReader(session, this);
session.setName("Replication server RS(" + getReplicationServerId()
@@ -630,7 +630,7 @@
* Increment the number of updates sent to the server in assured safe data
* mode.
*/
- public void incrementAssuredSdSentUpdates()
+ private void incrementAssuredSdSentUpdates()
{
assuredSdSentUpdates++;
}
@@ -666,7 +666,7 @@
* Increment the number of updates sent to the server in assured safe read
* mode.
*/
- public void incrementAssuredSrSentUpdates()
+ private void incrementAssuredSrSentUpdates()
{
assuredSrSentUpdates++;
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java
index b071f99..343e2d5 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -32,8 +32,10 @@
import org.opends.server.api.DirectoryThread;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.service.DSRSShutdownSync;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.common.ServerStatus.*;
@@ -50,8 +52,7 @@
private final Session session;
private final ServerHandler handler;
private final ReplicationServerDomain replicationServerDomain;
-
-
+ private final DSRSShutdownSync dsrsShutdownSync;
/**
* Create a ServerWriter. Then ServerWriter then waits on the ServerHandler
@@ -63,9 +64,11 @@
* handler for which the ServerWriter is created.
* @param replicationServerDomain
* The ReplicationServerDomain of this ServerWriter.
+ * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
*/
public ServerWriter(Session session, ServerHandler handler,
- ReplicationServerDomain replicationServerDomain)
+ ReplicationServerDomain replicationServerDomain,
+ DSRSShutdownSync dsrsShutdownSync)
{
// Session may be null for ECLServerWriter.
super("Replication server RS(" + handler.getReplicationServerId()
@@ -75,6 +78,7 @@
this.session = session;
this.handler = handler;
this.replicationServerDomain = replicationServerDomain;
+ this.dsrsShutdownSync = dsrsShutdownSync;
}
/**
@@ -93,7 +97,9 @@
LocalizableMessage errMessage = null;
try
{
- while (true)
+ boolean shutdown = false;
+ while (!shutdown
+ || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
{
final UpdateMsg updateMsg = replicationServerDomain.take(this.handler);
if (updateMsg == null)
@@ -101,12 +107,16 @@
// this connection is closing
errMessage = LocalizableMessage.raw(
"Connection closure: null update returned by domain.");
- return;
+ shutdown = true;
}
else if (!isUpdateMsgFiltered(updateMsg))
{
// Publish the update to the remote server using a protocol version it supports
session.publish(updateMsg);
+ if (updateMsg instanceof ReplicaOfflineMsg)
+ {
+ dsrsShutdownSync.replicaOfflineMsgForwarded(replicationServerDomain.getBaseDN());
+ }
}
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/DSRSShutdownSync.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/DSRSShutdownSync.java
new file mode 100644
index 0000000..5ef595e
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/DSRSShutdownSync.java
@@ -0,0 +1,85 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.service;
+
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opends.server.types.DN;
+
+/**
+ * Class useful for the case where DS/RS instances are collocated inside the
+ * same JVM. It synchronizes the shutdown of the DS and RS sides.
+ * <p>
+ * More specifically, it ensures a ReplicaOfflineMsg sent by the DS is
+ * relayed/forwarded by the collocated RS to the other RSs in the topology
+ * before the whole process shuts down.
+ *
+ * @since OPENDJ-1453
+ */
+public class DSRSShutdownSync
+{
+
+ private static final ConcurrentSkipListSet<DN> replicaOfflineMsgs =
+ new ConcurrentSkipListSet<DN>();
+ private static AtomicLong stopInstanceTimestamp = new AtomicLong();
+
+ /**
+ * Message has been sent.
+ *
+ * @param baseDN
+ * the domain for which the message has been sent
+ */
+ public void replicaOfflineMsgSent(DN baseDN)
+ {
+ stopInstanceTimestamp.compareAndSet(0, System.currentTimeMillis());
+ replicaOfflineMsgs.add(baseDN);
+ }
+
+ /**
+ * Message has been forwarded.
+ *
+ * @param baseDN
+ * the domain for which the message has been sent
+ */
+ public void replicaOfflineMsgForwarded(DN baseDN)
+ {
+ replicaOfflineMsgs.remove(baseDN);
+ }
+
+ /**
+ * Whether a ReplicationServer ServerReader or ServerWriter can proceed with
+ * shutdown.
+ *
+ * @param baseDN
+ * the baseDN of the ServerReader or ServerWriter .
+ * @return true if the caller can shutdown, false otherwise
+ */
+ public boolean canShutdown(DN baseDN)
+ {
+ return !replicaOfflineMsgs.contains(baseDN)
+ || System.currentTimeMillis() - stopInstanceTimestamp.get() > 5000;
+ }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 667cc2a..2faa52d 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -2974,7 +2974,8 @@
// The server is shutting down.
listenerThread.initiateShutdown();
}
- else if (processUpdate(updateMsg))
+ else if (processUpdate(updateMsg)
+ && updateMsg.contributesToDomainState())
{
/*
* Warning: in synchronous mode, no way to tell the replay of an
@@ -3393,9 +3394,11 @@
*/
public void publish(UpdateMsg msg)
{
- // Publish the update
broker.publish(msg);
- state.update(msg.getCSN());
+ if (msg.contributesToDomainState())
+ {
+ state.update(msg.getCSN());
+ }
numSentUpdates.incrementAndGet();
}
--
Gitblit v1.10.0