From f948474a8031c24160da4b31f0b97354456b40ad Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 03 Jul 2014 15:19:12 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3938) Replica offline messages should be synced with updates
---
opends/src/server/org/opends/server/replication/server/MessageHandler.java | 30 +-
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 125 ++++++++-----
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java | 5
opends/src/server/org/opends/server/replication/service/DSRSShutdownSync.java | 85 +++++++++
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java | 9
opends/src/server/org/opends/server/replication/server/ServerWriter.java | 20 +
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 66 ++++--
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java | 10 +
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java | 36 ++-
opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java | 63 +++---
opends/src/server/org/opends/server/replication/server/ServerHandler.java | 28 +-
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 8
opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java | 7
13 files changed, 330 insertions(+), 162 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index e6eee2b..9744a22 100644
--- a/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -67,6 +67,7 @@
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
@@ -182,6 +183,7 @@
*/
private static final DebugTracer TRACER = getTracer();
+ private final DSRSShutdownSync dsrsShutdownSync;
/**
* The update to replay message queue where the listener thread is going to
* push incoming update messages.
@@ -452,14 +454,17 @@
*
* @param configuration The configuration of this ReplicationDomain.
* @param updateToReplayQueue The queue for update messages to replay.
+ * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
* @throws ConfigException In case of invalid configuration.
*/
LDAPReplicationDomain(ReplicationDomainCfg configuration,
- BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException
+ BlockingQueue<UpdateToReplay> updateToReplayQueue,
+ DSRSShutdownSync dsrsShutdownSync) throws ConfigException
{
super(configuration, -1);
this.updateToReplayQueue = updateToReplayQueue;
+ this.dsrsShutdownSync = dsrsShutdownSync;
// Get assured configuration
readAssuredConfig(configuration, false);
@@ -2017,6 +2022,7 @@
public void publishReplicaOfflineMsg()
{
pendingChanges.putReplicaOfflineMsg();
+ dsrsShutdownSync.replicaOfflineMsgSent(getBaseDN());
}
/**
diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index f78bf74..e22aa5b 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -41,13 +41,13 @@
import org.opends.server.api.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.opends.server.types.operation.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.replication.plugin.
-ReplicationRepairRequestControl.*;
+import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -67,9 +67,10 @@
BackupTaskListener, RestoreTaskListener, ImportTaskListener,
ExportTaskListener
{
- private ReplicationServerListener replicationServerListener = null;
+ private ReplicationServerListener replicationServerListener;
private static final Map<DN, LDAPReplicationDomain> domains =
- new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
+ new ConcurrentHashMap<DN, LDAPReplicationDomain>(4);
+ private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
/**
* The queue of received update messages, to be treated by the ReplayThread
@@ -113,8 +114,7 @@
* Can be null is the request has no associated operation.
* @return The domain for this DN.
*/
- public static LDAPReplicationDomain findDomain(
- DN dn, PluginOperation pluginOp)
+ public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
{
/*
* Don't run the special replication code on Operation that are
@@ -124,7 +124,9 @@
{
final Operation op = (Operation) pluginOp;
if (op.dontSynchronize())
+ {
return null;
+ }
/*
* Check if the provided operation is a repair operation and set the
@@ -181,8 +183,8 @@
{
try
{
- LDAPReplicationDomain domain =
- new LDAPReplicationDomain(configuration, updateToReplayQueue);
+ final LDAPReplicationDomain domain = new LDAPReplicationDomain(
+ configuration, updateToReplayQueue, dsrsShutdownSync);
if (domains.size() == 0)
{
// Create the threads that will process incoming update messages
@@ -218,9 +220,8 @@
BlockingQueue<UpdateToReplay> queue)
throws ConfigException
{
- LDAPReplicationDomain domain =
- new LDAPReplicationDomain(configuration, queue);
-
+ final LDAPReplicationDomain domain =
+ new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
domains.put(domain.getBaseDN(), domain);
return domain;
}
@@ -246,35 +247,30 @@
/** {@inheritDoc} */
@Override
public void initializeSynchronizationProvider(
- ReplicationSynchronizationProviderCfg configuration)
- throws ConfigException
+ ReplicationSynchronizationProviderCfg cfg) throws ConfigException
{
domains.clear();
- replicationServerListener = new ReplicationServerListener(configuration);
+ replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
// Register as an add and delete listener with the root configuration so we
// can be notified if Multimaster domain entries are added or removed.
- configuration.addReplicationDomainAddListener(this);
- configuration.addReplicationDomainDeleteListener(this);
+ cfg.addReplicationDomainAddListener(this);
+ cfg.addReplicationDomainDeleteListener(this);
// Register as a root configuration listener so that we can be notified if
// number of replay threads is changed and apply changes.
- configuration.addReplicationChangeListener(this);
+ cfg.addReplicationChangeListener(this);
- replayThreadNumber = configuration.getNumUpdateReplayThreads();
- connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
- Integer.MAX_VALUE);
+ replayThreadNumber = cfg.getNumUpdateReplayThreads();
+ connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
// Create the list of domains that are already defined.
- for (String name : configuration.listReplicationDomains())
+ for (String name : cfg.listReplicationDomains())
{
- createNewDomain(configuration.getReplicationDomain(name));
+ createNewDomain(cfg.getReplicationDomain(name));
}
- /*
- * If any schema changes were made with the server offline, then handle them
- * now.
- */
+ // If any schema changes were made with the server offline, then handle them now.
List<Modification> offlineSchemaChanges =
DirectoryServer.getOfflineSchemaChanges();
if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
@@ -402,12 +398,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationModifyOperation modifyOperation)
{
- LDAPReplicationDomain domain =
- findDomain(modifyOperation.getEntryDN(), modifyOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(modifyOperation);
+ LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(modifyOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -415,12 +411,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationAddOperation addOperation) throws DirectoryException
{
- LDAPReplicationDomain domain =
- findDomain(addOperation.getEntryDN(), addOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(addOperation);
+ LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(addOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -428,12 +424,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationDeleteOperation deleteOperation) throws DirectoryException
{
- LDAPReplicationDomain domain =
- findDomain(deleteOperation.getEntryDN(), deleteOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(deleteOperation);
+ LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(deleteOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -441,12 +437,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
{
- LDAPReplicationDomain domain =
- findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(modifyDNOperation);
+ LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(modifyDNOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -505,7 +501,9 @@
LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
if (domain == null || !domain.solveConflict())
+ {
return new SynchronizationProviderResult.ContinueProcessing();
+ }
// The historical object is retrieved from the attachment created
// in the HandleConflictResolution phase.
@@ -537,11 +535,15 @@
LDAPReplicationDomain domain =
findDomain(addOperation.getEntryDN(), addOperation);
if (domain == null)
+ {
return new SynchronizationProviderResult.ContinueProcessing();
+ }
// For LOCAL op only, generate CSN and attach Context
if (!addOperation.isSynchronizationOperation())
+ {
domain.doPreOperation(addOperation);
+ }
// Add to the operation the historical attribute : "dn:changeNumber:add"
EntryHistorical.setHistoricalAttrToOperation(addOperation);
@@ -564,7 +566,9 @@
stopReplayThreads();
if (replicationServerListener != null)
+ {
replicationServerListener.shutdown();
+ }
DirectoryServer.deregisterBackupTaskListener(this);
DirectoryServer.deregisterRestoreTaskListener(this);
@@ -585,10 +589,11 @@
@Override
public void processSchemaChange(List<Modification> modifications)
{
- LDAPReplicationDomain domain =
- findDomain(DirectoryServer.getSchemaDN(), null);
+ LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
if (domain != null)
+ {
domain.synchronizeModifications(modifications);
+ }
}
/** {@inheritDoc} */
@@ -599,7 +604,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupStart();
+ }
}
}
@@ -612,7 +619,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupEnd();
+ }
}
}
@@ -624,7 +633,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.disable();
+ }
}
}
@@ -637,7 +648,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.enable();
+ }
}
}
@@ -649,7 +662,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.disable();
+ }
}
}
@@ -662,7 +677,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.enable();
+ }
}
}
@@ -674,7 +691,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupStart();
+ }
}
}
@@ -687,7 +706,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupEnd();
+ }
}
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index eb40c79..f4e6545 100644
--- a/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -126,7 +126,10 @@
while (firstChange != null && firstChange.isCommitted())
{
- state.update(firstCSN);
+ if (firstChange.getMsg().contributesToDomainState())
+ {
+ state.update(firstCSN);
+ }
pendingChanges.remove(firstCSN);
if (pendingChanges.isEmpty())
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
index c91ee47..92ed8c5 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
@@ -22,34 +22,34 @@
*
*
* Copyright 2008 Sun Microsystems, Inc.
+ * Portions Copyright 2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
-import org.opends.messages.Message;
-
import java.util.List;
+import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
-import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
import org.opends.server.config.ConfigException;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.ResultCode;
-
/**
* This class is used to create and object that can
* register in the admin framework as a listener for changes, add and delete
* on the ReplicationServer configuration objects.
- *
*/
public class ReplicationServerListener
implements ConfigurationAddListener<ReplicationServerCfg>,
ConfigurationDeleteListener<ReplicationServerCfg>
{
- ReplicationServer replicationServer = null;
+ private final DSRSShutdownSync dsrsShutdownSync;
+ private ReplicationServer replicationServer;
/**
* Build a ReplicationServer Listener from the given Multimaster
@@ -57,36 +57,36 @@
*
* @param configuration The configuration that will be used to listen
* for replicationServer configuration changes.
- *
+ * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
* @throws ConfigException if the ReplicationServerListener can't register for
* listening to changes on the provided configuration
* object.
*/
public ReplicationServerListener(
- ReplicationSynchronizationProviderCfg configuration)
- throws ConfigException
+ ReplicationSynchronizationProviderCfg configuration,
+ DSRSShutdownSync dsrsShutdownSync) throws ConfigException
{
configuration.addReplicationServerAddListener(this);
configuration.addReplicationServerDeleteListener(this);
+ this.dsrsShutdownSync = dsrsShutdownSync;
if (configuration.hasReplicationServer())
{
- ReplicationServerCfg server = configuration.getReplicationServer();
- replicationServer = new ReplicationServer(server);
+ final ReplicationServerCfg cfg = configuration.getReplicationServer();
+ replicationServer = new ReplicationServer(cfg, dsrsShutdownSync);
}
}
- /**
- * {@inheritDoc}
- */
- public ConfigChangeResult applyConfigurationAdd(
- ReplicationServerCfg configuration)
+ /** {@inheritDoc} */
+ @Override
+ public ConfigChangeResult applyConfigurationAdd(ReplicationServerCfg cfg)
{
try
{
- replicationServer = new ReplicationServer(configuration);
+ replicationServer = new ReplicationServer(cfg, dsrsShutdownSync);
return new ConfigChangeResult(ResultCode.SUCCESS, false);
- } catch (ConfigException e)
+ }
+ catch (ConfigException e)
{
// we should never get to this point because the configEntry has
// already been validated in configAddisAcceptable
@@ -94,14 +94,12 @@
}
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
+ @Override
public boolean isConfigurationAddAcceptable(
- ReplicationServerCfg configuration, List<Message> unacceptableReasons)
+ ReplicationServerCfg cfg, List<Message> unacceptableReasons)
{
- return ReplicationServer.isConfigurationAcceptable(
- configuration, unacceptableReasons);
+ return ReplicationServer.isConfigurationAcceptable(cfg, unacceptableReasons);
}
/**
@@ -110,14 +108,14 @@
public void shutdown()
{
if (replicationServer != null)
+ {
replicationServer.shutdown();
+ }
}
- /**
- * {@inheritDoc}
- */
- public ConfigChangeResult applyConfigurationDelete(
- ReplicationServerCfg configuration)
+ /** {@inheritDoc} */
+ @Override
+ public ConfigChangeResult applyConfigurationDelete(ReplicationServerCfg cfg)
{
// There can be only one replicationServer, just shutdown the
// replicationServer currently configured.
@@ -128,11 +126,10 @@
return new ConfigChangeResult(ResultCode.SUCCESS, false);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
+ @Override
public boolean isConfigurationDeleteAcceptable(
- ReplicationServerCfg configuration, List<Message> unacceptableReasons)
+ ReplicationServerCfg cfg, List<Message> unacceptableReasons)
{
return true;
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
index a9ac7c7..a1409cd 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
@@ -106,6 +106,13 @@
/** {@inheritDoc} */
@Override
+ public boolean contributesToDomainState()
+ {
+ return false; // replica offline msg MUST NOT update the ds-sync-state
+ }
+
+ /** {@inheritDoc} */
+ @Override
public String toString()
{
return getClass().getSimpleName() + " offlineCSN=" + csn;
diff --git a/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java b/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
index 04ac885..98b4111 100644
--- a/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -294,4 +294,14 @@
{
return payload;
}
+
+ /**
+ * Whether the current message can update the "ds-sync-state" attribute.
+ *
+ * @return true if current message can update the "ds-sync-state" attribute, false otherwise.
+ */
+ public boolean contributesToDomainState()
+ {
+ return true;
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 97974ca..4d9bc65 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -35,6 +35,7 @@
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.ECLUpdateMsg;
import org.opends.server.replication.protocol.Session;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
@@ -50,7 +51,7 @@
* This class defines a server writer, which is used to send changes to a
* directory server.
*/
-public class ECLServerWriter extends ServerWriter
+class ECLServerWriter extends ServerWriter
{
/**
* The tracer object for the debug logger.
@@ -62,7 +63,7 @@
private final ReplicationServerDomain replicationServerDomain;
private boolean suspended;
private volatile boolean shutdown;
- private PersistentSearch mypsearch;
+ private final PersistentSearch mypsearch;
/**
* Create a ServerWriter.
@@ -72,10 +73,10 @@
* @param replicationServerDomain the ReplicationServerDomain of this
* ServerWriter.
*/
- public ECLServerWriter(Session session, ECLServerHandler handler,
+ ECLServerWriter(Session session, ECLServerHandler handler,
ReplicationServerDomain replicationServerDomain)
{
- super(session, handler, replicationServerDomain);
+ super(session, handler, replicationServerDomain, new DSRSShutdownSync());
setName("Replication ECL Writer Thread for operation " +
handler.getOperationId());
@@ -85,21 +86,26 @@
this.replicationServerDomain = replicationServerDomain;
this.suspended = false;
this.shutdown = false;
+ this.mypsearch = findPersistentSearch(handler);
+ }
- // Look for the psearch object related to this operation, the one that
- // will be notified with new entries to be returned.
- ECLWorkflowElement wfe =
- (ECLWorkflowElement) DirectoryServer
- .getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
+ /**
+ * Look for the persistent search object related to this operation, the one
+ * that will be notified with new entries to be returned.
+ */
+ private PersistentSearch findPersistentSearch(ECLServerHandler handler)
+ {
+ ECLWorkflowElement wfe = (ECLWorkflowElement)
+ DirectoryServer.getWorkflowElement(ECLWorkflowElement.ECL_WORKFLOW_ELEMENT);
for (PersistentSearch psearch : wfe.getPersistentSearches())
{
if (psearch.getSearchOperation().toString().equals(
handler.getOperationId()))
{
- mypsearch = psearch;
- break;
+ return psearch;
}
}
+ return null;
}
/**
@@ -107,7 +113,7 @@
* waiting for the startCLSessionMsg. Then it may be
* suspended between 2 jobs, each job being a separate search.
*/
- public synchronized void suspendWriter()
+ private synchronized void suspendWriter()
{
suspended = true;
}
@@ -115,7 +121,7 @@
/**
* Resume the writer.
*/
- public synchronized void resumeWriter()
+ synchronized void resumeWriter()
{
suspended = false;
notify();
@@ -187,7 +193,7 @@
* @throws IOException when raised (connection closure)
* @throws InterruptedException when raised
*/
- public void doIt() throws IOException, InterruptedException
+ private void doIt() throws IOException, InterruptedException
{
while (true)
{
@@ -237,7 +243,7 @@
/**
* Shutdown the writer.
*/
- public synchronized void shutdownWriter()
+ synchronized void shutdownWriter()
{
shutdown = true;
notify();
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index 72060f9..f3366b2 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -45,6 +45,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.types.Attributes.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -231,13 +232,10 @@
public List<Attribute> getMonitorData()
{
List<Attribute> attributes = new ArrayList<Attribute>();
- attributes.add(Attributes.create("handler", getMonitorInstanceName()));
- attributes.add(
- Attributes.create("queue-size", String.valueOf(msgQueue.count())));
- attributes.add(
- Attributes.create(
- "queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
- attributes.add(Attributes.create("following", String.valueOf(following)));
+ attributes.add(create("handler", getMonitorInstanceName()));
+ attributes.add(create("queue-size", String.valueOf(msgQueue.count())));
+ attributes.add(create("queue-size-bytes", String.valueOf(msgQueue.bytesCount())));
+ attributes.add(create("following", String.valueOf(following)));
return attributes;
}
@@ -422,21 +420,20 @@
*/
public CSN getOlderUpdateCSN()
{
- CSN result = null;
synchronized (msgQueue)
{
if (following)
{
if (!msgQueue.isEmpty())
{
- result = msgQueue.first().getCSN();
+ return msgQueue.first().getCSN();
}
}
else
{
if (!lateQueue.isEmpty())
{
- result = lateQueue.first().getCSN();
+ return lateQueue.first().getCSN();
}
else
{
@@ -447,11 +444,11 @@
the lateQueue when it will send the next update but we are not yet
there. So let's take the last change not sent directly from the db.
*/
- result = findOldestCSNFromReplicaDBs();
+ return findOldestCSNFromReplicaDBs();
}
}
}
- return result;
+ return null;
}
private CSN findOldestCSNFromReplicaDBs()
@@ -460,10 +457,13 @@
try
{
cursor = replicationServerDomain.getCursorFrom(serverState);
- cursor.next();
- if (cursor.getRecord() != null)
+ while (cursor.next())
{
- return cursor.getRecord().getCSN();
+ final UpdateMsg record = cursor.getRecord();
+ if (record.contributesToDomainState())
+ {
+ return record.getCSN();
+ }
}
return null;
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 96cf81f..9f013a8 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -37,8 +37,8 @@
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
-import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.*;
-import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.*;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
+import org.opends.server.admin.std.meta.VirtualAttributeCfgDefn.ConflictBehavior;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
import org.opends.server.api.VirtualAttributeProvider;
@@ -51,9 +51,13 @@
import org.opends.server.replication.common.*;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.file.FileChangelogDB;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.util.StaticUtils;
@@ -82,6 +86,7 @@
/** The current configuration of this replication server. */
private ReplicationServerCfg config;
+ private final DSRSShutdownSync dsrsShutdownSync;
/**
* This table is used to store the list of dn for which we are currently
@@ -126,34 +131,39 @@
/**
* Creates a new Replication server using the provided configuration entry.
*
- * @param configuration The configuration of this replication server.
+ * @param cfg The configuration of this replication server.
* @throws ConfigException When Configuration is invalid.
*/
- public ReplicationServer(ReplicationServerCfg configuration)
- throws ConfigException
+ public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
{
- this.config = configuration;
- ReplicationDBImplementation dbImpl = configuration.getReplicationDBImplementation();
- if (dbImpl == ReplicationDBImplementation.JE)
+ this(cfg, new DSRSShutdownSync());
+ }
+
+ /**
+ * Creates a new Replication server using the provided configuration entry.
+ *
+ * @param cfg The configuration of this replication server.
+ * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
+ * @throws ConfigException When Configuration is invalid.
+ */
+ public ReplicationServer(ReplicationServerCfg cfg,
+ DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+ {
+ this.config = cfg;
+ this.dsrsShutdownSync = dsrsShutdownSync;
+ ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
+ if (DebugLogger.debugEnabled())
{
- if (DebugLogger.debugEnabled())
- {
- TRACER.debugMessage(DebugLogLevel.INFO, "Using JE as DB implementation for changelog DB");
- }
- this.changelogDB = new JEChangelogDB(this, configuration);
+ TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl
+ + " as DB implementation for changelog DB");
}
- else
- {
- if (DebugLogger.debugEnabled())
- {
- TRACER.debugMessage(DebugLogLevel.INFO, "Using LOG FILE as DB implementation for changelog DB");
- }
- this.changelogDB = new FileChangelogDB(this, configuration);
- }
+ this.changelogDB = dbImpl == ReplicationDBImplementation.JE
+ ? new JEChangelogDB(this, cfg)
+ : new FileChangelogDB(this, cfg);
replSessionSecurity = new ReplSessionSecurity();
initialize();
- configuration.addChangeListener(this);
+ cfg.addChangeListener(this);
localPorts.add(getReplicationPort());
@@ -1227,6 +1237,16 @@
return this.changelogDB;
}
+ /**
+ * Returns the synchronization object for shutdown of combined DS/RS instances.
+ *
+ * @return the synchronization object for shutdown of combined DS/RS instances.
+ */
+ DSRSShutdownSync getDSRSShutdownSync()
+ {
+ return dsrsShutdownSync;
+ }
+
/** {@inheritDoc} */
@Override
public String toString()
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 291dcdb..63718bf 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -68,7 +68,7 @@
/**
* The session opened with the remote server.
*/
- protected Session session;
+ protected final Session session;
/**
* The serverURL of the remote server.
@@ -77,40 +77,39 @@
/**
* Number of updates received from the server in assured safe read mode.
*/
- protected int assuredSrReceivedUpdates = 0;
+ private int assuredSrReceivedUpdates = 0;
/**
* Number of updates received from the server in assured safe read mode that
* timed out.
*/
- protected AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
+ private final AtomicInteger assuredSrReceivedUpdatesTimeout = new AtomicInteger();
/**
* Number of updates sent to the server in assured safe read mode.
*/
- protected int assuredSrSentUpdates = 0;
+ private int assuredSrSentUpdates = 0;
/**
* Number of updates sent to the server in assured safe read mode that timed
* out.
*/
- protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
+ private final AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
/**
* Number of updates received from the server in assured safe data mode.
*/
- protected int assuredSdReceivedUpdates = 0;
+ private int assuredSdReceivedUpdates = 0;
/**
* Number of updates received from the server in assured safe data mode that
* timed out.
*/
- protected AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
+ private final AtomicInteger assuredSdReceivedUpdatesTimeout = new AtomicInteger();
/**
* Number of updates sent to the server in assured safe data mode.
*/
- protected int assuredSdSentUpdates = 0;
+ private int assuredSdSentUpdates = 0;
/**
- * Number of updates sent to the server in assured safe data mode that timed
- * out.
+ * Number of updates sent to the server in assured safe data mode that timed out.
*/
- protected AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
+ private final AtomicInteger assuredSdSentUpdatesTimeout = new AtomicInteger();
/**
* The associated ServerWriter that sends messages to the remote server.
@@ -301,7 +300,8 @@
// sendWindow MUST be created before starting the writer
sendWindow = new Semaphore(sendWindowSize);
- writer = new ServerWriter(session, this, replicationServerDomain);
+ writer = new ServerWriter(session, this, replicationServerDomain,
+ replicationServer.getDSRSShutdownSync());
reader = new ServerReader(session, this);
session.setName("Replication server RS(" + getReplicationServerId()
@@ -626,7 +626,7 @@
* Increment the number of updates sent to the server in assured safe data
* mode.
*/
- public void incrementAssuredSdSentUpdates()
+ private void incrementAssuredSdSentUpdates()
{
assuredSdSentUpdates++;
}
@@ -662,7 +662,7 @@
* Increment the number of updates sent to the server in assured safe read
* mode.
*/
- public void incrementAssuredSrSentUpdates()
+ private void incrementAssuredSrSentUpdates()
{
assuredSrSentUpdates++;
}
diff --git a/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 1de2e40..e807306 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -32,8 +32,10 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.service.DSRSShutdownSync;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -55,8 +57,7 @@
private final Session session;
private final ServerHandler handler;
private final ReplicationServerDomain replicationServerDomain;
-
-
+ private final DSRSShutdownSync dsrsShutdownSync;
/**
* Create a ServerWriter. Then ServerWriter then waits on the ServerHandler
@@ -68,9 +69,11 @@
* handler for which the ServerWriter is created.
* @param replicationServerDomain
* The ReplicationServerDomain of this ServerWriter.
+ * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
*/
public ServerWriter(Session session, ServerHandler handler,
- ReplicationServerDomain replicationServerDomain)
+ ReplicationServerDomain replicationServerDomain,
+ DSRSShutdownSync dsrsShutdownSync)
{
// Session may be null for ECLServerWriter.
super("Replication server RS(" + handler.getReplicationServerId()
@@ -80,6 +83,7 @@
this.session = session;
this.handler = handler;
this.replicationServerDomain = replicationServerDomain;
+ this.dsrsShutdownSync = dsrsShutdownSync;
}
/**
@@ -98,7 +102,9 @@
Message errMessage = null;
try
{
- while (true)
+ boolean shutdown = false;
+ while (!shutdown
+ || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
{
final UpdateMsg updateMsg = replicationServerDomain.take(this.handler);
if (updateMsg == null)
@@ -106,12 +112,16 @@
// this connection is closing
errMessage = Message.raw(
"Connection closure: null update returned by domain.");
- return;
+ shutdown = true;
}
else if (!isUpdateMsgFiltered(updateMsg))
{
// Publish the update to the remote server using a protocol version it supports
session.publish(updateMsg);
+ if (updateMsg instanceof ReplicaOfflineMsg)
+ {
+ dsrsShutdownSync.replicaOfflineMsgForwarded(replicationServerDomain.getBaseDN());
+ }
}
}
}
diff --git a/opends/src/server/org/opends/server/replication/service/DSRSShutdownSync.java b/opends/src/server/org/opends/server/replication/service/DSRSShutdownSync.java
new file mode 100644
index 0000000..5ef595e
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/service/DSRSShutdownSync.java
@@ -0,0 +1,85 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.service;
+
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opends.server.types.DN;
+
+/**
+ * Class useful for the case where DS/RS instances are collocated inside the
+ * same JVM. It synchronizes the shutdown of the DS and RS sides.
+ * <p>
+ * More specifically, it ensures a ReplicaOfflineMsg sent by the DS is
+ * relayed/forwarded by the collocated RS to the other RSs in the topology
+ * before the whole process shuts down.
+ *
+ * @since OPENDJ-1453
+ */
+public class DSRSShutdownSync
+{
+
+ private static final ConcurrentSkipListSet<DN> replicaOfflineMsgs =
+ new ConcurrentSkipListSet<DN>();
+ private static AtomicLong stopInstanceTimestamp = new AtomicLong();
+
+ /**
+ * Message has been sent.
+ *
+ * @param baseDN
+ * the domain for which the message has been sent
+ */
+ public void replicaOfflineMsgSent(DN baseDN)
+ {
+ stopInstanceTimestamp.compareAndSet(0, System.currentTimeMillis());
+ replicaOfflineMsgs.add(baseDN);
+ }
+
+ /**
+ * Message has been forwarded.
+ *
+ * @param baseDN
+ * the domain for which the message has been sent
+ */
+ public void replicaOfflineMsgForwarded(DN baseDN)
+ {
+ replicaOfflineMsgs.remove(baseDN);
+ }
+
+ /**
+ * Whether a ReplicationServer ServerReader or ServerWriter can proceed with
+ * shutdown.
+ *
+ * @param baseDN
+ * the baseDN of the ServerReader or ServerWriter .
+ * @return true if the caller can shutdown, false otherwise
+ */
+ public boolean canShutdown(DN baseDN)
+ {
+ return !replicaOfflineMsgs.contains(baseDN)
+ || System.currentTimeMillis() - stopInstanceTimestamp.get() > 5000;
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 06376f1..ab7908a 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -3005,7 +3005,8 @@
// The server is shutting down.
listenerThread.initiateShutdown();
}
- else if (processUpdate(updateMsg))
+ else if (processUpdate(updateMsg)
+ && updateMsg.contributesToDomainState())
{
/*
* Warning: in synchronous mode, no way to tell the replay of an
@@ -3426,9 +3427,11 @@
*/
public void publish(UpdateMsg msg)
{
- // Publish the update
broker.publish(msg);
- state.update(msg.getCSN());
+ if (msg.contributesToDomainState())
+ {
+ state.update(msg.getCSN());
+ }
numSentUpdates.incrementAndGet();
}
--
Gitblit v1.10.0