From 8c8415177610baee0fc1c615926f21da621f0836 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 24 Jun 2013 15:20:37 +0000
Subject: [PATCH] OPENDJ-885 (CR-1909) Replication replay may lose changes if it can't acquire a writeLock
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java | 87 ++++++++++++++++++-------------------------
1 files changed, 36 insertions(+), 51 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
index c92c682..aab9397 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -27,57 +27,29 @@
*/
package org.opends.server.replication.plugin;
-import java.util.ArrayList;
-import static org.opends.server.replication.plugin.
-ReplicationRepairRequestControl.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*;
+import static org.opends.server.util.StaticUtils.*;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
-import org.opends.server.api.Backend;
-import org.opends.server.api.BackupTaskListener;
-import org.opends.server.api.ExportTaskListener;
-import org.opends.server.api.ImportTaskListener;
-import org.opends.server.api.RestoreTaskListener;
-import org.opends.server.api.SynchronizationProvider;
+import org.opends.server.api.*;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.types.BackupConfig;
-import org.opends.server.types.ConfigChangeResult;
-import org.opends.server.types.Control;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
-import org.opends.server.types.LDIFExportConfig;
-import org.opends.server.types.LDIFImportConfig;
-import org.opends.server.types.Modification;
-import org.opends.server.types.Operation;
-import org.opends.server.types.RestoreConfig;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SynchronizationProviderResult;
-import org.opends.server.types.operation.PluginOperation;
-import org.opends.server.types.operation.PostOperationAddOperation;
-import org.opends.server.types.operation.PostOperationDeleteOperation;
-import org.opends.server.types.operation.PostOperationModifyDNOperation;
-import org.opends.server.types.operation.PostOperationModifyOperation;
-import org.opends.server.types.operation.PostOperationOperation;
-import org.opends.server.types.operation.PreOperationAddOperation;
-import org.opends.server.types.operation.PreOperationDeleteOperation;
-import org.opends.server.types.operation.PreOperationModifyDNOperation;
-import org.opends.server.types.operation.PreOperationModifyOperation;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.logError;
+import org.opends.server.types.*;
+import org.opends.server.types.operation.*;
/**
* This class is used to load the Replication code inside the JVM
@@ -104,8 +76,8 @@
* The queue of received update messages, to be treated by the ReplayThread
* threads.
*/
- private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
- new LinkedBlockingQueue<UpdateToReplay>();
+ private static final BlockingQueue<UpdateToReplay> updateToReplayQueue =
+ new LinkedBlockingQueue<UpdateToReplay>(10000);
/**
* The list of ReplayThread threads.
@@ -228,9 +200,9 @@
}
/**
- * Creates a new domain from its configEntry, do the
- * necessary initialization and starts it so that it is
- * fully operational when this method returns.
+ * Creates a new domain from its configEntry, do the necessary initialization
+ * and starts it so that it is fully operational when this method returns. It
+ * is only used for tests so far.
*
* @param configuration The entry with the configuration of this domain.
* @param queue The BlockingQueue that this domain will use.
@@ -239,13 +211,13 @@
*
* @throws ConfigException When the configuration is not valid.
*/
- public static LDAPReplicationDomain createNewDomain(
+ static LDAPReplicationDomain createNewDomain(
ReplicationDomainCfg configuration,
BlockingQueue<UpdateToReplay> queue)
throws ConfigException
{
- LDAPReplicationDomain domain;
- domain = new LDAPReplicationDomain(configuration, queue);
+ LDAPReplicationDomain domain =
+ new LDAPReplicationDomain(configuration, queue);
domains.put(domain.getBaseDN(), domain);
return domain;
@@ -362,6 +334,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isConfigurationAddAcceptable(
ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
{
@@ -372,6 +345,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public ConfigChangeResult applyConfigurationAdd(
ReplicationDomainCfg configuration)
{
@@ -652,6 +626,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processBackupBegin(Backend backend, BackupConfig config)
{
for (DN dn : backend.getBaseDNs())
@@ -665,6 +640,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processBackupEnd(Backend backend, BackupConfig config,
boolean successful)
{
@@ -679,6 +655,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processRestoreBegin(Backend backend, RestoreConfig config)
{
for (DN dn : backend.getBaseDNs())
@@ -692,6 +669,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processRestoreEnd(Backend backend, RestoreConfig config,
boolean successful)
{
@@ -706,6 +684,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processImportBegin(Backend backend, LDIFImportConfig config)
{
for (DN dn : backend.getBaseDNs())
@@ -719,6 +698,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processImportEnd(Backend backend, LDIFImportConfig config,
boolean successful)
{
@@ -733,6 +713,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processExportBegin(Backend backend, LDIFExportConfig config)
{
for (DN dn : backend.getBaseDNs())
@@ -746,6 +727,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void processExportEnd(Backend backend, LDIFExportConfig config,
boolean successful)
{
@@ -760,6 +742,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public ConfigChangeResult applyConfigurationDelete(
ReplicationDomainCfg configuration)
{
@@ -771,6 +754,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public boolean isConfigurationDeleteAcceptable(
ReplicationDomainCfg configuration, List<Message> unacceptableReasons)
{
@@ -804,10 +788,10 @@
/**
* {@inheritDoc}
*/
- public boolean
- isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
- configuration,
- List<Message> unacceptableReasons)
+ @Override
+ public boolean isConfigurationChangeAcceptable(
+ ReplicationSynchronizationProviderCfg configuration,
+ List<Message> unacceptableReasons)
{
return true;
}
@@ -815,9 +799,9 @@
/**
* {@inheritDoc}
*/
- public ConfigChangeResult
- applyConfigurationChange
- (ReplicationSynchronizationProviderCfg configuration)
+ @Override
+ public ConfigChangeResult applyConfigurationChange(
+ ReplicationSynchronizationProviderCfg configuration)
{
int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
@@ -838,6 +822,7 @@
/**
* {@inheritDoc}
*/
+ @Override
public void completeSynchronizationProvider()
{
isRegistered = true;
--
Gitblit v1.10.0