From f948474a8031c24160da4b31f0b97354456b40ad Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 03 Jul 2014 15:19:12 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3938) Replica offline messages should be synced with updates
---
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 125 ++++++++++++++++++++++++-----------------
1 files changed, 73 insertions(+), 52 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index f78bf74..e22aa5b 100644
--- a/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -41,13 +41,13 @@
import org.opends.server.api.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.opends.server.types.operation.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.replication.plugin.
-ReplicationRepairRequestControl.*;
+import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -67,9 +67,10 @@
BackupTaskListener, RestoreTaskListener, ImportTaskListener,
ExportTaskListener
{
- private ReplicationServerListener replicationServerListener = null;
+ private ReplicationServerListener replicationServerListener;
private static final Map<DN, LDAPReplicationDomain> domains =
- new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
+ new ConcurrentHashMap<DN, LDAPReplicationDomain>(4);
+ private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync();
/**
* The queue of received update messages, to be treated by the ReplayThread
@@ -113,8 +114,7 @@
* Can be null is the request has no associated operation.
* @return The domain for this DN.
*/
- public static LDAPReplicationDomain findDomain(
- DN dn, PluginOperation pluginOp)
+ public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp)
{
/*
* Don't run the special replication code on Operation that are
@@ -124,7 +124,9 @@
{
final Operation op = (Operation) pluginOp;
if (op.dontSynchronize())
+ {
return null;
+ }
/*
* Check if the provided operation is a repair operation and set the
@@ -181,8 +183,8 @@
{
try
{
- LDAPReplicationDomain domain =
- new LDAPReplicationDomain(configuration, updateToReplayQueue);
+ final LDAPReplicationDomain domain = new LDAPReplicationDomain(
+ configuration, updateToReplayQueue, dsrsShutdownSync);
if (domains.size() == 0)
{
// Create the threads that will process incoming update messages
@@ -218,9 +220,8 @@
BlockingQueue<UpdateToReplay> queue)
throws ConfigException
{
- LDAPReplicationDomain domain =
- new LDAPReplicationDomain(configuration, queue);
-
+ final LDAPReplicationDomain domain =
+ new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync);
domains.put(domain.getBaseDN(), domain);
return domain;
}
@@ -246,35 +247,30 @@
/** {@inheritDoc} */
@Override
public void initializeSynchronizationProvider(
- ReplicationSynchronizationProviderCfg configuration)
- throws ConfigException
+ ReplicationSynchronizationProviderCfg cfg) throws ConfigException
{
domains.clear();
- replicationServerListener = new ReplicationServerListener(configuration);
+ replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync);
// Register as an add and delete listener with the root configuration so we
// can be notified if Multimaster domain entries are added or removed.
- configuration.addReplicationDomainAddListener(this);
- configuration.addReplicationDomainDeleteListener(this);
+ cfg.addReplicationDomainAddListener(this);
+ cfg.addReplicationDomainDeleteListener(this);
// Register as a root configuration listener so that we can be notified if
// number of replay threads is changed and apply changes.
- configuration.addReplicationChangeListener(this);
+ cfg.addReplicationChangeListener(this);
- replayThreadNumber = configuration.getNumUpdateReplayThreads();
- connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(),
- Integer.MAX_VALUE);
+ replayThreadNumber = cfg.getNumUpdateReplayThreads();
+ connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE);
// Create the list of domains that are already defined.
- for (String name : configuration.listReplicationDomains())
+ for (String name : cfg.listReplicationDomains())
{
- createNewDomain(configuration.getReplicationDomain(name));
+ createNewDomain(cfg.getReplicationDomain(name));
}
- /*
- * If any schema changes were made with the server offline, then handle them
- * now.
- */
+ // If any schema changes were made with the server offline, then handle them now.
List<Modification> offlineSchemaChanges =
DirectoryServer.getOfflineSchemaChanges();
if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty())
@@ -402,12 +398,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationModifyOperation modifyOperation)
{
- LDAPReplicationDomain domain =
- findDomain(modifyOperation.getEntryDN(), modifyOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(modifyOperation);
+ LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(modifyOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -415,12 +411,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationAddOperation addOperation) throws DirectoryException
{
- LDAPReplicationDomain domain =
- findDomain(addOperation.getEntryDN(), addOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(addOperation);
+ LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(addOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -428,12 +424,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationDeleteOperation deleteOperation) throws DirectoryException
{
- LDAPReplicationDomain domain =
- findDomain(deleteOperation.getEntryDN(), deleteOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(deleteOperation);
+ LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(deleteOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -441,12 +437,12 @@
public SynchronizationProviderResult handleConflictResolution(
PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException
{
- LDAPReplicationDomain domain =
- findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
- if (domain == null)
- return new SynchronizationProviderResult.ContinueProcessing();
-
- return domain.handleConflictResolution(modifyDNOperation);
+ LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation);
+ if (domain != null)
+ {
+ return domain.handleConflictResolution(modifyDNOperation);
+ }
+ return new SynchronizationProviderResult.ContinueProcessing();
}
/** {@inheritDoc} */
@@ -505,7 +501,9 @@
LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation);
if (domain == null || !domain.solveConflict())
+ {
return new SynchronizationProviderResult.ContinueProcessing();
+ }
// The historical object is retrieved from the attachment created
// in the HandleConflictResolution phase.
@@ -537,11 +535,15 @@
LDAPReplicationDomain domain =
findDomain(addOperation.getEntryDN(), addOperation);
if (domain == null)
+ {
return new SynchronizationProviderResult.ContinueProcessing();
+ }
// For LOCAL op only, generate CSN and attach Context
if (!addOperation.isSynchronizationOperation())
+ {
domain.doPreOperation(addOperation);
+ }
// Add to the operation the historical attribute : "dn:changeNumber:add"
EntryHistorical.setHistoricalAttrToOperation(addOperation);
@@ -564,7 +566,9 @@
stopReplayThreads();
if (replicationServerListener != null)
+ {
replicationServerListener.shutdown();
+ }
DirectoryServer.deregisterBackupTaskListener(this);
DirectoryServer.deregisterRestoreTaskListener(this);
@@ -585,10 +589,11 @@
@Override
public void processSchemaChange(List<Modification> modifications)
{
- LDAPReplicationDomain domain =
- findDomain(DirectoryServer.getSchemaDN(), null);
+ LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null);
if (domain != null)
+ {
domain.synchronizeModifications(modifications);
+ }
}
/** {@inheritDoc} */
@@ -599,7 +604,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupStart();
+ }
}
}
@@ -612,7 +619,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupEnd();
+ }
}
}
@@ -624,7 +633,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.disable();
+ }
}
}
@@ -637,7 +648,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.enable();
+ }
}
}
@@ -649,7 +662,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.disable();
+ }
}
}
@@ -662,7 +677,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.enable();
+ }
}
}
@@ -674,7 +691,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupStart();
+ }
}
}
@@ -687,7 +706,9 @@
{
LDAPReplicationDomain domain = findDomain(dn, null);
if (domain != null)
+ {
domain.backupEnd();
+ }
}
}
--
Gitblit v1.10.0