From cb1bb5d131addd27e2927ec90cc572a8c4d40f80 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 09 Jan 2014 09:52:23 +0000
Subject: [PATCH] Front-port of r10098.
---
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java | 126 ++----
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 256 +++++-------
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java | 26
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java | 27
opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 100 ----
opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java | 52 +-
opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java | 503 ++++++++++++------------
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java | 22
opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java | 21
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java | 14
10 files changed, 470 insertions(+), 677 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java b/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
index fcc036d..fb42a43 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/common/DSInfo.java
@@ -22,12 +22,11 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.common;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
/**
* This class holds information about a DS connected to the topology. This
@@ -35,8 +34,7 @@
* messages, to keep every member (RS or DS) of the topology aware of the DS
* topology.
* <p>
- * This class is almost immutable, because it does not copy the List and Set
- * passed into the ctor.
+ * @Immutable
*/
public final class DSInfo
{
@@ -68,7 +66,6 @@
private final Set<String> eclIncludesForDeletes;
-
/**
* Creates a new instance of DSInfo with every given info.
*
@@ -101,8 +98,8 @@
public DSInfo(int dsId, String dsUrl, int rsId, long generationId,
ServerStatus status, boolean assuredFlag,
AssuredMode assuredMode, byte safeDataLevel, byte groupId,
- List<String> refUrls, Set<String> eclIncludes,
- Set<String> eclIncludesForDeletes, short protocolVersion)
+ Collection<String> refUrls, Collection<String> eclIncludes,
+ Collection<String> eclIncludesForDeletes, short protocolVersion)
{
this.dsId = dsId;
this.dsUrl = dsUrl;
@@ -113,9 +110,11 @@
this.assuredMode = assuredMode;
this.safeDataLevel = safeDataLevel;
this.groupId = groupId;
- this.refUrls = refUrls;
- this.eclIncludes = eclIncludes;
- this.eclIncludesForDeletes = eclIncludesForDeletes;
+ this.refUrls = Collections.unmodifiableList(new ArrayList<String>(refUrls));
+ this.eclIncludes =
+ Collections.unmodifiableSet(new HashSet<String>(eclIncludes));
+ this.eclIncludesForDeletes =
+ Collections.unmodifiableSet(new HashSet<String>(eclIncludesForDeletes));
this.protocolVersion = protocolVersion;
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 596b373..8db4b9c 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -77,7 +77,6 @@
import static org.opends.messages.ToolMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.common.AssuredMode.*;
import static org.opends.server.replication.plugin.EntryHistorical.*;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.replication.service.ReplicationMonitor.*;
@@ -186,7 +185,6 @@
private final PersistentServerState state;
private int numReplayedPostOpCalled = 0;
- private volatile long generationId = -1;
private volatile boolean generationIdSavedStatus = false;
private final CSNGenerator generator;
@@ -227,7 +225,6 @@
private final SortedMap<CSN, FakeOperation> replayOperations =
new TreeMap<CSN, FakeOperation>();
- private ReplicationDomainCfg config;
private ExternalChangelogDomain eclDomain;
/**
@@ -471,11 +468,8 @@
public LDAPReplicationDomain(ReplicationDomainCfg configuration,
BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException
{
- super(configuration.getBaseDN(),
- configuration.getServerId(),
- configuration.getInitializationWindowSize());
+ super(configuration, -1);
- this.config = configuration;
this.updateToReplayQueue = updateToReplayQueue;
// Get assured configuration
@@ -484,12 +478,7 @@
// Get fractional configuration
fractionalConfig = new FractionalConfig(getBaseDN());
readFractionalConfig(configuration, false);
-
- setGroupId((byte)configuration.getGroupId());
- setURLs(configuration.getReferralsUrl());
-
storeECLConfiguration(configuration);
-
solveConflictFlag = isSolveConflict(configuration);
Backend backend = retrievesBackend(getBaseDN());
@@ -551,76 +540,6 @@
}
/**
- * Gets and stores the assured replication configuration parameters. Returns
- * a boolean indicating if the passed configuration has changed compared to
- * previous values and the changes require a reconnection.
- * @param configuration The configuration object
- * @param allowReconnection Tells if one must reconnect if significant changes
- * occurred
- */
- private void readAssuredConfig(ReplicationDomainCfg configuration,
- boolean allowReconnection)
- {
- final boolean needReconnection = needReconnection(configuration);
-
- // Disconnect if required: changing configuration values before
- // disconnection would make assured replication used immediately and
- // disconnection could cause some timeouts error.
- if (needReconnection && allowReconnection)
- disableService();
-
- switch (configuration.getAssuredType())
- {
- case NOT_ASSURED:
- setAssured(false);
- break;
- case SAFE_DATA:
- setAssured(true);
- setAssuredMode(AssuredMode.SAFE_DATA_MODE);
- break;
- case SAFE_READ:
- setAssured(true);
- setAssuredMode(AssuredMode.SAFE_READ_MODE);
- break;
- }
- setAssuredSdLevel((byte) configuration.getAssuredSdLevel());
- setAssuredTimeout(configuration.getAssuredTimeout());
-
- // Reconnect if required
- if (needReconnection && allowReconnection)
- enableService();
- }
-
- private boolean needReconnection(ReplicationDomainCfg cfg)
- {
- switch (cfg.getAssuredType())
- {
- case NOT_ASSURED:
- if (isAssured())
- {
- return true;
- }
- break;
- case SAFE_DATA:
- if (!isAssured() || getAssuredMode() == SAFE_READ_MODE)
- {
- return true;
- }
- break;
- case SAFE_READ:
- if (!isAssured() || getAssuredMode() == SAFE_DATA_MODE)
- {
- return true;
- }
- break;
- }
-
- return isAssured()
- && getAssuredMode() == SAFE_DATA_MODE
- && cfg.getAssuredSdLevel() != getAssuredSdLevel();
- }
-
- /**
* Sets the error message id to be used when online import is stopped with
* error by the fractional replication ldif import plugin.
* @param importErrorMessageId The message to use.
@@ -686,7 +605,8 @@
}
// Disable service if configuration changed
- if (needReconnection && allowReconnection)
+ final boolean needRestart = needReconnection && allowReconnection;
+ if (needRestart)
{
disableService();
}
@@ -713,7 +633,7 @@
}
// Reconnect if required
- if (needReconnection && allowReconnection)
+ if (needRestart)
enableService();
}
@@ -1622,7 +1542,7 @@
// FIXME should the next call use the initWindow parameter rather than the
// instance variable?
- super.initializeRemote(target, requestorID, initTask, this.initWindow);
+ super.initializeRemote(target, requestorID, initTask, getInitWindow());
}
/**
@@ -2377,7 +2297,7 @@
DirectoryServer.deregisterAlertGenerator(this);
// stop the ReplicationDomain
- stopDomain();
+ disableService();
}
// wait for completion of the persistentServerState thread.
@@ -3412,14 +3332,6 @@
return genId;
}
- /** {@inheritDoc} */
- @Override
- public long getGenerationID()
- {
- return generationId;
- }
-
-
/**
* Run a modify operation to update the entry whose DN is given as
* a parameter with the generationID information.
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
index 9b83896..87913ce 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -22,17 +22,14 @@
*
*
* Copyright 2008-2009 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1;
@@ -43,6 +40,7 @@
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
+import org.opends.server.util.StaticUtils;
/**
* This message is used by DS to confirm a RS he wants to connect to him (open
@@ -60,19 +58,18 @@
*/
public class StartSessionMsg extends ReplicationMsg
{
- // The list of referrals URLs to the sending DS
- private List<String> referralsURLs = new ArrayList<String>();
- // The initial status the DS starts with
+ /** The list of referrals URLs to the sending DS. */
+ private final List<String> referralsURLs = new ArrayList<String>();
+ /** The initial status the DS starts with. */
private ServerStatus status = ServerStatus.INVALID_STATUS;
- // Assured replication enabled on DS or not
- private boolean assuredFlag = false;
- // DS assured mode (relevant if assured replication enabled)
+ /** Assured replication enabled on DS or not. */
+ private boolean assuredFlag;
+ /** DS assured mode (relevant if assured replication enabled). */
private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
- // DS safe data level (relevant if assured mode is safe data)
- private byte safeDataLevel = (byte) 1;
+ /** DS safe data level (relevant if assured mode is safe data). */
+ private byte safeDataLevel = 1;
private Set<String> eclIncludes = new HashSet<String>();
-
private Set<String> eclIncludesForDeletes = new HashSet<String>();
/**
@@ -103,10 +100,10 @@
* @param assuredMode Assured type
* @param safeDataLevel Assured mode safe data level
*/
- public StartSessionMsg(ServerStatus status, List<String> referralsURLs,
+ public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs,
boolean assuredFlag, AssuredMode assuredMode, byte safeDataLevel)
{
- this.referralsURLs = referralsURLs;
+ this.referralsURLs.addAll(referralsURLs);
this.status = status;
this.assuredFlag = assuredFlag;
this.assuredMode = assuredMode;
@@ -119,9 +116,9 @@
* @param status Status we are starting with
* @param referralsURLs Referrals URLs to be used by peer DSs
*/
- public StartSessionMsg(ServerStatus status, List<String> referralsURLs)
+ public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs)
{
- this.referralsURLs = referralsURLs;
+ this.referralsURLs.addAll(referralsURLs);
this.status = status;
this.assuredFlag = false;
}
@@ -130,9 +127,7 @@
// Msg encoding
// ============
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public byte[] getBytes(short reqProtocolVersion)
throws UnsupportedEncodingException
@@ -334,7 +329,6 @@
/* Read the referrals URLs */
int pos = 5;
- referralsURLs = new ArrayList<String>();
while (pos < in.length)
{
/*
@@ -373,24 +367,18 @@
return status;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
- String urls = "";
- for (String s : referralsURLs)
- {
- urls += s + " | ";
- }
- return ("StartSessionMsg content:\nstatus: " + status +
+ String urls = StaticUtils.collectionToString(referralsURLs, " | ");
+ return "StartSessionMsg content:\nstatus: " + status +
"\nassuredFlag: " + assuredFlag +
"\nassuredMode: " + assuredMode +
"\nsafeDataLevel: " + safeDataLevel +
"\nreferralsURLs: " + urls +
"\nEclIncludes " + eclIncludes +
- "\nEclIncludeForDeletes: " + eclIncludesForDeletes);
+ "\nEclIncludeForDeletes: " + eclIncludesForDeletes;
}
/**
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index b01feaa..7f41ec9 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -39,6 +39,7 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.task.Task;
@@ -56,6 +57,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.AssuredMode.*;
import static org.opends.server.replication.common.StatusMachine.*;
/**
@@ -100,7 +102,7 @@
* implementation using methods {@link #initializeRemote(int)}
* or {@link #initializeFromRemote(int)}.
* <p>
- * At shutdown time, the {@link #stopDomain()} method should be called to
+ * At shutdown time, the {@link #disableService()} method should be called to
* cleanly stop the replication service.
*/
public abstract class ReplicationDomain
@@ -115,25 +117,21 @@
*/
private static final DebugTracer TRACER = getTracer();
+ /** The configuration of the replication domain. */
+ protected volatile ReplicationDomainCfg config;
/**
- * The baseDN for the Replication Service.
- * All Replication Domain using this baseDN will be connected
- * through the Replication Service.
+ * The assured configuration of the replication domain. It is a duplicate of
+ * {@link #config} because of its update model.
+ *
+ * @see #readAssuredConfig(ReplicationDomainCfg, boolean)
*/
- private final DN baseDN;
-
- /**
- * The identifier of this Replication Domain inside the
- * Replication Service.
- * Each Domain must use a unique ServerID.
- */
- private final int serverID;
+ private volatile ReplicationDomainCfg assuredConfig;
/**
* The ReplicationBroker that is used by this ReplicationDomain to
* connect to the ReplicationService.
*/
- protected ReplicationBroker broker = null;
+ protected ReplicationBroker broker;
/**
* This Map is used to store all outgoing assured messages in order
@@ -158,33 +156,6 @@
private volatile DirectoryThread listenerThread = null;
/**
- * A Map used to store all the ReplicationDomains created on this server.
- */
- private static Map<DN, ReplicationDomain> domains =
- new HashMap<DN, ReplicationDomain>();
-
- /*
- * Assured mode properties
- */
- /** Whether assured mode is enabled for this domain. */
- private boolean assured = false;
- /** Assured sub mode (used when assured is true). */
- private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
- /** Safe Data level (used when assuredMode is SAFE_DATA). */
- private byte assuredSdLevel = 1;
- /** The timeout in ms that should be used, when waiting for assured acks. */
- private long assuredTimeout = 2000;
-
- /** Group id. */
- private byte groupId = 1;
- /**
- * Referrals urls to be published to other servers of the topology.
- * <p>
- * TODO: fill that with all currently opened urls if no urls configured
- */
- private final List<String> refUrls = new ArrayList<String>();
-
- /**
* A set of counters used for Monitoring.
*/
private AtomicInteger numProcessedUpdates = new AtomicInteger(0);
@@ -265,15 +236,6 @@
private final Map<Integer, Integer> assuredSdServerTimeoutUpdates =
new HashMap<Integer,Integer>();
- /**
- * Window size used during initialization .. between
- * - the initializer/exporter DS that listens/waits acknowledges and that
- * slows down data msg publishing based on the slowest server
- * - and each initialized/importer DS that publishes acknowledges each
- * WINDOW/2 data msg received.
- */
- protected final int initWindow;
-
/* Status related monitoring fields */
/**
@@ -311,6 +273,12 @@
private final Object sessionLock = new Object();
/**
+ * The generationId for this replication domain. It is made of a hash of the
+ * 1000 first entries for this domain.
+ */
+ protected volatile long generationId;
+
+ /**
* Returns the {@link CSNGenerator} that will be used to
* generate {@link CSN} for this domain.
*
@@ -325,46 +293,39 @@
/**
* Creates a ReplicationDomain with the provided parameters.
*
- * @param baseDN The identifier of the Replication Domain to which
- * this object is participating.
- * @param serverID The identifier of the server that is participating
- * to the Replication Domain.
- * This identifier should be different for each server that
- * is participating to a given Replication Domain.
- * @param initWindow Window used during initialization.
+ * @param config
+ * The configuration object for this ReplicationDomain
+ * @param generationId
+ * the generation of this ReplicationDomain
*/
- public ReplicationDomain(DN baseDN, int serverID, int initWindow)
+ public ReplicationDomain(ReplicationDomainCfg config, long generationId)
{
- this.baseDN = baseDN;
- this.serverID = serverID;
- this.initWindow = initWindow;
+ this.config = config;
+ this.assuredConfig = config;
+ this.generationId = generationId;
this.state = new ServerState();
- this.generator = new CSNGenerator(serverID, state);
-
- domains.put(baseDN, this);
+ this.generator = new CSNGenerator(getServerId(), state);
}
/**
- * Creates a ReplicationDomain with the provided parameters.
- * (for unit test purpose only)
+ * Creates a ReplicationDomain with the provided parameters. (for unit test
+ * purpose only)
*
- * @param baseDN The identifier of the Replication Domain to which
- * this object is participating.
- * @param serverID The identifier of the server that is participating
- * to the Replication Domain.
- * This identifier should be different for each server that
- * is participating to a given Replication Domain.
- * @param serverState The serverState to use
+ * @param config
+ * The configuration object for this ReplicationDomain
+ * @param generationId
+ * the generation of this ReplicationDomain
+ * @param serverState
+ * The serverState to use
*/
- public ReplicationDomain(DN baseDN, int serverID, ServerState serverState)
+ public ReplicationDomain(ReplicationDomainCfg config, long generationId,
+ ServerState serverState)
{
- this.baseDN = baseDN;
- this.serverID = serverID;
- this.initWindow = 100;
+ this.config = config;
+ this.assuredConfig = config;
+ this.generationId = generationId;
this.state = serverState;
- this.generator = new CSNGenerator(serverID, state);
-
- domains.put(baseDN, this);
+ this.generator = new CSNGenerator(getServerId(), state);
}
/**
@@ -387,7 +348,7 @@
if (!isValidInitialStatus(initStatus))
{
logError(ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
- getBaseDNString(), Integer.toString(serverID)));
+ getBaseDNString(), Integer.toString(getServerId())));
}
else
{
@@ -406,7 +367,7 @@
private void receiveChangeStatus(ChangeStatusMsg csMsg)
{
if (debugEnabled())
- TRACER.debugInfo("Replication domain " + baseDN +
+ TRACER.debugInfo("Replication domain " + getBaseDN() +
" received change status message:\n" + csMsg);
ServerStatus reqStatus = csMsg.getRequestedStatus();
@@ -416,7 +377,7 @@
if (event == StatusMachineEvent.INVALID_EVENT)
{
logError(ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
- getBaseDNString(), Integer.toString(serverID)));
+ getBaseDNString(), Integer.toString(getServerId())));
return;
}
@@ -468,13 +429,24 @@
}
/**
- * Returns the base DN of this ReplicationDomain.
+ * Returns the current config of this ReplicationDomain.
+ *
+ * @return the config
+ */
+ protected ReplicationDomainCfg getConfig()
+ {
+ return config;
+ }
+
+ /**
+ * Returns the base DN of this ReplicationDomain. All Replication Domain using
+ * this baseDN will be connected through the Replication Service.
*
* @return The base DN of this ReplicationDomain
*/
public DN getBaseDN()
{
- return baseDN;
+ return config.getBaseDN();
}
/**
@@ -484,16 +456,32 @@
*/
public String getBaseDNString()
{
- return baseDN.toNormalizedString();
+ return getBaseDN().toNormalizedString();
}
/**
- * Get the server ID.
+ * Get the server ID. The identifier of this Replication Domain inside the
+ * Replication Service. Each Domain must use a unique ServerID.
+ *
* @return The server ID.
*/
public int getServerId()
{
- return serverID;
+ return config.getServerId();
+ }
+
+ /**
+ * Window size used during initialization .. between - the
+ * initializer/exporter DS that listens/waits acknowledges and that slows down
+ * data msg publishing based on the slowest server - and each
+ * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg
+ * received.
+ *
+ * @return the initWindow
+ */
+ protected int getInitWindow()
+ {
+ return config.getInitializationWindowSize();
}
/**
@@ -502,25 +490,38 @@
*/
public boolean isAssured()
{
- return assured;
+ return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType())
+ || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType());
}
/**
- * Gives the mode for the assured replication of the domain.
+ * Gives the mode for the assured replication of the domain. Only used when
+ * assured is true).
+ *
* @return The mode for the assured replication of the domain.
*/
public AssuredMode getAssuredMode()
{
- return assuredMode;
+ switch (assuredConfig.getAssuredType())
+ {
+ case SAFE_DATA:
+ case NOT_ASSURED: // The assured mode will be ignored in that case anyway
+ return AssuredMode.SAFE_DATA_MODE;
+ case SAFE_READ:
+ return AssuredMode.SAFE_READ_MODE;
+ }
+ return null; // should never happen
}
/**
- * Gives the assured level of the replication of the domain.
+ * Gives the assured Safe Data level of the replication of the domain. (used
+ * when assuredMode is SAFE_DATA).
+ *
* @return The assured level of the replication of the domain.
*/
public byte getAssuredSdLevel()
{
- return assuredSdLevel;
+ return (byte) assuredConfig.getAssuredSdLevel();
}
/**
@@ -529,7 +530,7 @@
*/
public long getAssuredTimeout()
{
- return assuredTimeout;
+ return assuredConfig.getAssuredTimeout();
}
/**
@@ -538,16 +539,20 @@
*/
public byte getGroupId()
{
- return groupId;
+ return (byte) config.getGroupId();
}
/**
- * Gets the referrals URLs this domain publishes.
+ * Gets the referrals URLs this domain publishes. Referrals urls to be
+ * published to other servers of the topology.
+ * <p>
+ * TODO: fill that with all currently opened urls if no urls configured
+ *
* @return The referrals URLs this domain publishes.
*/
- public List<String> getRefUrls()
+ public Set<String> getRefUrls()
{
- return refUrls;
+ return config.getReferralsUrl();
}
/**
@@ -673,67 +678,6 @@
}
/**
- * Set the list of Referrals that should be returned when an
- * operation needs to be redirected to this server.
- *
- * @param referralsUrl The list of referrals.
- */
- public void setURLs(Set<String> referralsUrl)
- {
- this.refUrls.addAll(referralsUrl);
- }
-
- /**
- * Set the timeout of the assured replication.
- *
- * @param assuredTimeout the timeout of the assured replication.
- */
- public void setAssuredTimeout(long assuredTimeout)
- {
- this.assuredTimeout = assuredTimeout;
- }
-
- /**
- * Sets the groupID.
- *
- * @param groupId The groupID.
- */
- public void setGroupId(byte groupId)
- {
- this.groupId = groupId;
- }
-
- /**
- * Sets the level of assured replication.
- *
- * @param assuredSdLevel The level of assured replication.
- */
- public void setAssuredSdLevel(byte assuredSdLevel)
- {
- this.assuredSdLevel = assuredSdLevel;
- }
-
- /**
- * Sets the assured replication mode.
- *
- * @param dataMode The assured replication mode.
- */
- public void setAssuredMode(AssuredMode dataMode)
- {
- this.assuredMode = dataMode;
- }
-
- /**
- * Sets assured replication.
- *
- * @param assured A boolean indicating if assured replication should be used.
- */
- public void setAssured(boolean assured)
- {
- this.assured = assured;
- }
-
- /**
* Receives an update message from the replicationServer.
* The other types of messages are processed in an opaque way for the caller.
* Also responsible for updating the list of pending changes
@@ -802,8 +746,8 @@
*/
if (debugEnabled())
TRACER.debugInfo(
- "[IE] processErrorMsg:" + this.serverID +
- " baseDN: " + this.baseDN +
+ "[IE] processErrorMsg:" + getServerId() +
+ " baseDN: " + getBaseDN() +
" Error Msg received: " + errorMsg);
if (errorMsg.getCreationTime() > ieContext.startTime)
@@ -873,10 +817,9 @@
}
numRcvdUpdates.incrementAndGet();
- byte rsGroupId = broker.getRsGroupId();
if (update.isAssured()
- && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE
- && rsGroupId == groupId)
+ && broker.getRsGroupId() == getGroupId()
+ && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
{
assuredSrReceivedUpdates.incrementAndGet();
}
@@ -949,7 +892,7 @@
requested servers. Log problem
*/
logError(NOTE_DS_RECEIVED_ACK_ERROR.get(
- getBaseDNString(), Integer.toString(serverID),
+ getBaseDNString(), Integer.toString(getServerId()),
update.toString(), ack.errorsToString()));
List<Integer> failedServers = ack.getFailedServers();
@@ -1048,7 +991,7 @@
*/
public ExportThread(int serverIdToInitialize, int initWindow)
{
- super("Export thread from serverId=" + serverID + " to serverId="
+ super("Export thread from serverId=" + getServerId() + " to serverId="
+ serverIdToInitialize);
this.serverIdToInitialize = serverIdToInitialize;
this.initWindow = initWindow;
@@ -1379,7 +1322,7 @@
public void initializeRemote(int target, Task initTask)
throws DirectoryException
{
- initializeRemote(target, this.serverID, initTask, this.initWindow);
+ initializeRemote(target, getServerId(), initTask, getInitWindow());
}
/**
@@ -1418,7 +1361,7 @@
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get(
- countEntries(), getBaseDNString(), serverID));
+ countEntries(), getBaseDNString(), getServerId()));
for (DSInfo dsi : getReplicasList())
{
@@ -1436,8 +1379,8 @@
}
else
{
- logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
- countEntries(), getBaseDNString(), serverID, serverToInitialize));
+ logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(),
+ getBaseDNString(), getServerId(), serverToInitialize));
ieContext.startList.add(serverToInitialize);
@@ -1471,7 +1414,7 @@
// Send start message to the peer
InitializeTargetMsg initTargetMsg = new InitializeTargetMsg(
- getBaseDN(), serverID, serverToInitialize,
+ getBaseDN(), getServerId(), serverToInitialize,
serverRunningTheTask, ieContext.entryCount, initWindow);
broker.publish(initTargetMsg);
@@ -1492,8 +1435,8 @@
exportBackend(new BufferedOutputStream(new ReplOutputStream(this)));
// Notify the peer of the success
- DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination());
- broker.publish(doneMsg);
+ broker.publish(
+ new DoneMsg(getServerId(), initTargetMsg.getDestination()));
}
catch(DirectoryException exportException)
{
@@ -1595,12 +1538,12 @@
if (serverToInitialize == RoutableMsg.ALL_SERVERS)
{
logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get(
- getBaseDNString(), serverID, cause));
+ getBaseDNString(), getServerId(), cause));
}
else
{
logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
- getBaseDNString(), serverID, serverToInitialize, cause));
+ getBaseDNString(), getServerId(), serverToInitialize, cause));
}
@@ -1894,10 +1837,8 @@
// send the ack of flow control mgmt
if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0)
{
- InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
- this.serverID,
- entryMsg.getSenderID(),
- ieContext.msgCnt);
+ final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg(
+ getServerId(), entryMsg.getSenderID(), ieContext.msgCnt);
broker.publish(amsg, false);
if (debugEnabled())
{
@@ -1945,7 +1886,7 @@
Message.raw(Category.SYNC, Severity.NOTICE,
ERR_INIT_EXPORTER_DISCONNECTION.get(
getBaseDNString(),
- Integer.toString(this.serverID),
+ Integer.toString(getServerId()),
Integer.toString(ieContext.importSource)));
ieContext.setExceptionIfNoneSet(new DirectoryException(
ResultCode.OTHER, errMsg));
@@ -2017,7 +1958,7 @@
// build the message
EntryMsg entryMessage = new EntryMsg(
- serverID,ieContext.getExportTarget(), lDIFEntry, pos, length,
+ getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length,
++ieContext.msgCnt);
// Waiting the slowest loop
@@ -2219,7 +2160,7 @@
ieContext.initializeTask = initTask;
ieContext.attemptCnt = 0;
ieContext.initReqMsgSent = new InitializeRequestMsg(
- getBaseDN(), serverID, source, this.initWindow);
+ getBaseDN(), getServerId(), source, getInitWindow());
// Publish Init request msg
broker.publish(ieContext.initReqMsgSent);
@@ -2281,14 +2222,14 @@
try
{
// Log starting
- logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
- getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID));
+ logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(getBaseDNString(),
+ initTargetMsgReceived.getSenderID(), getServerId()));
// Go into full update status
setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT);
// Acquire an import context if no already done (and initialize).
- if (initTargetMsgReceived.getInitiatorID() != this.serverID)
+ if (initTargetMsgReceived.getInitiatorID() != getServerId())
{
/*
The initTargetMsgReceived is for an import initiated by the remote
@@ -2418,7 +2359,8 @@
finally
{
Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
- getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID,
+ getBaseDNString(), initTargetMsgReceived.getSenderID(),
+ getServerId(),
(ieContext.getException() == null ? ""
: ieContext.getException().getLocalizedMessage()));
logError(msg);
@@ -2458,7 +2400,7 @@
if (newStatus == ServerStatus.INVALID_STATUS)
{
logError(ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(),
- Integer.toString(serverID), status.toString(), event.toString()));
+ String.valueOf(getServerId()), status.toString(), event.toString()));
return;
}
@@ -2472,13 +2414,11 @@
resetMonitoringCounters();
}
- // Store new status
status = newStatus;
-
if (debugEnabled())
{
- TRACER.debugInfo("Replication domain " + baseDN + " new status is: "
- + status);
+ TRACER.debugInfo("Replication domain " + getBaseDN()
+ + " new status is: " + status);
}
// Perform whatever actions are needed to apply properties for being
@@ -2560,10 +2500,8 @@
// check that at least one ReplicationServer did change its generation-id
checkGenerationID(-1);
- // Reconnect to the Replication Server so that it adopt our
- // GenerationID.
- disableService();
- enableService();
+ // Reconnect to the Replication Server so that it adopts our GenerationID.
+ restartService();
// wait for the domain to reconnect.
int count = 0;
@@ -2597,8 +2535,8 @@
{
if (debugEnabled())
{
- TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN
- + " resetGenerationId " + generationIdNewValue);
+ TRACER.debugInfo("Server id " + getServerId() + " and domain "
+ + getBaseDN() + " resetGenerationId " + generationIdNewValue);
}
ResetGenerationIdMsg genIdMessage =
@@ -2607,7 +2545,7 @@
if (!isConnected())
{
Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(),
- Integer.toString(serverID),
+ Integer.toString(getServerId()),
Long.toString(genIdMessage.getGenerationId()));
throw new DirectoryException(ResultCode.OTHER, message);
}
@@ -3110,33 +3048,19 @@
}
/**
- * Definitively stops the Replication Service.
- */
- public void stopDomain()
- {
- disableService();
- domains.remove(baseDN);
- }
-
- /**
* Change some ReplicationDomain parameters.
*
* @param config
* The new configuration that this domain should now use.
*/
- public void changeConfig(ReplicationDomainCfg config)
+ protected void changeConfig(ReplicationDomainCfg config)
{
- this.groupId = (byte) config.getGroupId();
-
if (broker != null && broker.changeConfig(config))
{
- disableService();
- enableService();
+ restartService();
}
}
-
-
/**
* Applies a configuration change to the attributes which should be be
* included in the ECL.
@@ -3149,15 +3073,19 @@
public void changeConfig(Set<String> includeAttributes,
Set<String> includeAttributesForDeletes)
{
- if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes)
- && broker != null)
+ final boolean attrsModified = setEclIncludes(
+ getServerId(), includeAttributes, includeAttributesForDeletes);
+ if (attrsModified && broker != null)
{
- disableService();
- enableService();
+ restartService();
}
}
-
+ private void restartService()
+ {
+ disableService();
+ enableService();
+ }
/**
* This method should trigger an export of the replicated data.
@@ -3236,15 +3164,13 @@
Send an ack if it was requested and the group id is the same of the RS
one. Only Safe Read mode makes sense in DS for returning an ack.
*/
- byte rsGroupId = broker.getRsGroupId();
// Assured feature is supported starting from replication protocol V2
if (msg.isAssured()
&& broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
{
- AssuredMode msgAssuredMode = msg.getAssuredMode();
- if (msgAssuredMode == AssuredMode.SAFE_READ_MODE)
+ if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
{
- if (rsGroupId == groupId)
+ if (broker.getRsGroupId() == getGroupId())
{
// Send the ack
AckMsg ackMsg = new AckMsg(msg.getCSN());
@@ -3255,7 +3181,7 @@
ackMsg.setHasReplayError(true);
// -> replay error occurred in our server
List<Integer> idList = new ArrayList<Integer>();
- idList.add(serverID);
+ idList.add(getServerId());
ackMsg.setFailedServers(idList);
}
broker.publish(ackMsg);
@@ -3269,10 +3195,11 @@
}
}
}
- else if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+ else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE)
{
- logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID),
- msgAssuredMode.toString(), getBaseDNString(), msg.toString()));
+ logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(String.valueOf(getServerId()),
+ msg.getAssuredMode().toString(), getBaseDNString(),
+ msg.toString()));
}
// Nothing to do in Assured safe data mode, only RS ack updates.
}
@@ -3303,23 +3230,22 @@
*/
protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg)
{
- byte rsGroupId = broker.getRsGroupId();
/*
* If assured configured, set message accordingly to request an ack in the
* right assured mode.
- * No ack requested for a RS with a different group id. Assured
- * replication supported for the same locality, i.e: a topology working in
- * the same
- * geographical location). If we are connected to a RS which is not in our
- * locality, no need to ask for an ack.
+ * No ack requested for a RS with a different group id.
+ * Assured replication supported for the same locality,
+ * i.e: a topology working in the same geographical location).
+ * If we are connected to a RS which is not in our locality,
+ * no need to ask for an ack.
*/
- if (assured && rsGroupId == groupId)
+ if (needsAck())
{
msg.setAssured(true);
- msg.setAssuredMode(assuredMode);
- if (assuredMode == AssuredMode.SAFE_DATA_MODE)
+ msg.setAssuredMode(getAssuredMode());
+ if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE)
{
- msg.setSafeDataLevel(assuredSdLevel);
+ msg.setSafeDataLevel(getAssuredSdLevel());
}
// Add the assured message to the list of update that are waiting for acks
@@ -3327,6 +3253,11 @@
}
}
+ private boolean needsAck()
+ {
+ return isAssured() && broker.getRsGroupId() == getGroupId();
+ }
+
/**
* Wait for the processing of an assured message after it has been sent, if
* assured replication is configured, otherwise, do nothing.
@@ -3340,14 +3271,10 @@
protected void waitForAckIfAssuredEnabled(UpdateMsg msg)
throws TimeoutException
{
- byte rsGroupId = broker.getRsGroupId();
-
- // If assured mode configured, wait for acknowledgment for the just sent
- // message
- if (assured && rsGroupId == groupId)
+ if (needsAck())
{
// Increment assured replication monitoring counters
- switch (assuredMode)
+ switch (getAssuredMode())
{
case SAFE_READ_MODE:
assuredSrSentUpdates.incrementAndGet();
@@ -3383,12 +3310,12 @@
if (debugEnabled())
{
TRACER.debugInfo("waitForAck method interrupted for replication " +
- "baseDN: " + baseDN);
+ "baseDN: " + getBaseDN());
}
break;
}
// Timeout ?
- if ( (System.currentTimeMillis() - startTime) >= assuredTimeout )
+ if ((System.currentTimeMillis() - startTime) >= getAssuredTimeout())
{
/*
Timeout occurred, be sure that ack is not being received and if so,
@@ -3424,8 +3351,8 @@
}
throw new TimeoutException("No ack received for message csn: " + csn
- + " and replication domain: " + baseDN + " after "
- + assuredTimeout + " ms.");
+ + " and replication domain: " + getBaseDN() + " after "
+ + getAssuredTimeout() + " ms.");
}
}
}
@@ -3481,7 +3408,7 @@
{
// This exception may only be raised if assured replication is enabled
logError(NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(),
- Long.toString(assuredTimeout), update.toString()));
+ Long.toString(getAssuredTimeout()), update.toString()));
}
}
@@ -3493,11 +3420,25 @@
*
* @return The GenerationID.
*/
- public abstract long getGenerationID();
+ public long getGenerationID()
+ {
+ return generationId;
+ }
/**
- * Subclasses should use this method to add additional monitoring
- * information in the ReplicationDomain.
+ * Sets the generationId for this replication domain.
+ *
+ * @param generationId
+ * the generationId to set
+ */
+ public void setGenerationID(long generationId)
+ {
+ this.generationId = generationId;
+ }
+
+ /**
+ * Subclasses should use this method to add additional monitoring information
+ * in the ReplicationDomain.
*
* @return Additional monitoring attributes that will be added in the
* ReplicationDomain monitoring entry.
@@ -3711,13 +3652,69 @@
*/
public CSN getLastLocalChange()
{
- return state.getCSN(serverID);
+ return state.getCSN(getServerId());
+ }
+
+ /**
+ * Gets and stores the assured replication configuration parameters. Returns a
+ * boolean indicating if the passed configuration has changed compared to
+ * previous values and the changes require a reconnection.
+ *
+ * @param config
+ * The configuration object
+ * @param allowReconnection
+ * Tells if one must reconnect if significant changes occurred
+ */
+ protected void readAssuredConfig(ReplicationDomainCfg config,
+ boolean allowReconnection)
+ {
+ // Disconnect if required: changing configuration values before
+ // disconnection would make assured replication used immediately and
+ // disconnection could cause some timeouts error.
+ if (needReconnection(config) && allowReconnection)
+ {
+ disableService();
+
+ assuredConfig = config;
+
+ enableService();
+ }
+ }
+
+ private boolean needReconnection(ReplicationDomainCfg cfg)
+ {
+ final AssuredMode assuredMode = getAssuredMode();
+ switch (cfg.getAssuredType())
+ {
+ case NOT_ASSURED:
+ if (isAssured())
+ {
+ return true;
+ }
+ break;
+ case SAFE_DATA:
+ if (!isAssured() || assuredMode == SAFE_READ_MODE)
+ {
+ return true;
+ }
+ break;
+ case SAFE_READ:
+ if (!isAssured() || assuredMode == SAFE_DATA_MODE)
+ {
+ return true;
+ }
+ break;
+ }
+
+ return isAssured()
+ && assuredMode == SAFE_DATA_MODE
+ && cfg.getAssuredSdLevel() != getAssuredSdLevel();
}
/** {@inheritDoc} */
@Override
public String toString()
{
- return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID;
+ return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId();
}
}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
index 2a9b6c1..cae6348 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DummyReplicationDomain.java
@@ -20,13 +20,14 @@
*
* CDDL HEADER END
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.plugin;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;
+import java.util.TreeSet;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
@@ -39,12 +40,9 @@
public class DummyReplicationDomain extends ReplicationDomain
{
- private final long generationId;
-
public DummyReplicationDomain(long generationId)
{
- super(null, -1, 0);
- this.generationId = generationId;
+ super(new DomainFakeCfg(null, -1, new TreeSet<String>()), generationId);
}
@Override
@@ -92,10 +90,4 @@
return false;
}
- @Override
- public long getGenerationID()
- {
- return this.generationId;
- }
-
}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
index bc13f0f..6028476 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/FractionalReplicationTest.java
@@ -538,6 +538,16 @@
replicationServer = new ReplicationServer(conf);
}
+ private static DomainFakeCfg newConfig(DN baseDN, int serverID,
+ SortedSet<String> replicationServers, long heartbeatInterval)
+ {
+ DomainFakeCfg fakeCfg =
+ new DomainFakeCfg(baseDN, serverID, replicationServers);
+ fakeCfg.setHeartbeatInterval(heartbeatInterval);
+ fakeCfg.setChangetimeHeartbeatInterval(500);
+ return fakeCfg;
+ }
+
/**
* This class is the minimum implementation of a Concrete ReplicationDomain
* used to be able to connect to the RS with a known genid. Also to be able
@@ -561,18 +571,14 @@
*/
private StringBuilder importString;
private int exportedEntryCount;
- private long generationID = -1;
public FakeReplicationDomain(DN baseDN, int serverID,
SortedSet<String> replicationServers, long heartbeatInterval,
long generationId) throws ConfigException
{
- super(baseDN, serverID, 100);
- generationID = generationId;
- DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers);
- fakeCfg.setHeartbeatInterval(heartbeatInterval);
- fakeCfg.setChangetimeHeartbeatInterval(500);
- startPublishService(fakeCfg);
+ super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval),
+ generationId);
+ startPublishService(getConfig());
startListenService();
}
@@ -604,12 +610,6 @@
}
@Override
- public long getGenerationID()
- {
- return generationID;
- }
-
- @Override
protected void importBackend(InputStream input) throws DirectoryException
{
byte[] buffer = new byte[1000];
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
index 90ba002..38cba02 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -61,7 +61,7 @@
*/
public class TopologyViewTest extends ReplicationTestCase
{
- // Server id definitions
+ /** Server id definitions */
private static final int DS1_ID = 1;
private static final int DS2_ID = 2;
private static final int DS3_ID = 3;
@@ -72,7 +72,7 @@
private static final int RS2_ID = 52;
private static final int RS3_ID = 53;
- // Group id definitions
+ /** Group id definitions */
private static final int DS1_GID = 1;
private static final int DS2_GID = 1;
private static final int DS3_GID = 2;
@@ -83,7 +83,7 @@
private static final int RS2_GID = 2;
private static final int RS3_GID = 3;
- // Assured conf definitions
+ /** Assured conf definitions */
private static final AssuredType DS1_AT = AssuredType.NOT_ASSURED;
private static final int DS1_SDL = -1;
private static SortedSet<String> DS1_RU = new TreeSet<String>();
@@ -140,7 +140,7 @@
private ReplicationServer rs2 = null;
private ReplicationServer rs3 = null;
- // The tracer object for the debug logger
+ /** The tracer object for the debug logger */
private static final DebugTracer TRACER = getTracer();
private void debugInfo(String s)
@@ -460,7 +460,7 @@
return replicationDomain;
}
- // Definitions of steps for the test case
+ /** Definitions of steps for the test case */
private static final int STEP_1 = 1;
private static final int STEP_2 = 2;
private static final int STEP_3 = 3;
@@ -824,17 +824,13 @@
}
// Perform necessary conversions
- boolean assuredFlag = (assuredType != AssuredType.NOT_ASSURED);
- AssuredMode assMode = ( (assuredType == AssuredType.SAFE_READ) ?
- AssuredMode.SAFE_READ_MODE : AssuredMode.SAFE_DATA_MODE);
- List<String> urls = new ArrayList<String>();
- for(String str : refUrls)
- {
- urls.add(str);
- }
+ boolean assuredFlag = assuredType != AssuredType.NOT_ASSURED;
+ AssuredMode assMode = assuredType == AssuredType.SAFE_READ
+ ? AssuredMode.SAFE_READ_MODE
+ : AssuredMode.SAFE_DATA_MODE;
return new DSInfo(dsId, "dummy:1234", rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assMode,
- (byte)assuredSdLevel, groupId, urls, eclIncludes, eclIncludes, protocolVersion);
+ (byte)assuredSdLevel, groupId, refUrls, eclIncludes, eclIncludes, protocolVersion);
}
/**
@@ -1018,36 +1014,20 @@
/**
* Get the topo view of the current analyzed DS
*/
- List<DSInfo> internalDsList = rd.getReplicasList();
// Add info for DS itself:
// we need to clone the list as we don't want to modify the list kept
// inside the DS.
- List<DSInfo> dsList = new ArrayList<DSInfo>();
- for (DSInfo aDsInfo : internalDsList)
- {
- dsList.add(aDsInfo);
- }
- int dsId = rd.getServerId();
- int rsId = rd.getRsServerId();
- ServerStatus status = rd.getStatus();
- boolean assuredFlag = rd.isAssured();
- AssuredMode assuredMode = rd.getAssuredMode();
- byte safeDataLevel = rd.getAssuredSdLevel();
- byte groupId = rd.getGroupId();
- List<String> refUrls = rd.getRefUrls();
- Set<String> eclInclude = rd.getEclIncludes();
- Set<String> eclIncludeForDeletes = rd.getEclIncludesForDeletes();
- short protocolVersion = ProtocolVersion.getCurrentVersion();
- DSInfo dsInfo = new DSInfo(dsId, "dummy:1234", rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assuredMode,
- safeDataLevel, groupId, refUrls, eclInclude, eclIncludeForDeletes, protocolVersion);
- dsList.add(dsInfo);
+ final DSInfo dsInfo = new DSInfo(rd.getServerId(), "dummy:1234", rd.getRsServerId(),
+ TEST_DN_WITH_ROOT_ENTRY_GENID,
+ rd.getStatus(),
+ rd.isAssured(), rd.getAssuredMode(), rd.getAssuredSdLevel(),
+ rd.getGroupId(), rd.getRefUrls(),
+ rd.getEclIncludes(), rd.getEclIncludesForDeletes(),
+ ProtocolVersion.getCurrentVersion());
+ final List<DSInfo> dsList = new ArrayList<DSInfo>(rd.getReplicasList());
+ dsList.add(dsInfo);
TopoView dsTopoView = new TopoView(dsList, rd.getRsList());
-
- /**
- * Compare to what is the expected view
- */
-
assertEquals(dsTopoView, theoricalTopoView, " in DSid=" + currentDsId);
}
}
@@ -1058,8 +1038,8 @@
*/
private class TopoView
{
- private List<DSInfo> dsList = null;
- private List<RSInfo> rsList = null;
+ private List<DSInfo> dsList;
+ private List<RSInfo> rsList;
public TopoView(List<DSInfo> dsList, List<RSInfo> rsList)
{
@@ -1073,20 +1053,23 @@
@Override
public boolean equals(Object obj)
{
- assertNotNull(obj);
- assertFalse(obj.getClass() != this.getClass());
-
- TopoView topoView = (TopoView) obj;
-
- // Check dsList
- if (topoView.dsList.size() != dsList.size())
+ if (obj == null || getClass() != obj.getClass())
return false;
- for (DSInfo dsInfo : topoView.dsList)
+ TopoView other = (TopoView) obj;
+ return checkLists(dsList, other.dsList)
+ && checkLists(rsList, other.rsList);
+ }
+
+ private boolean checkLists(List<?> list, List<?> otherList)
+ {
+ if (otherList.size() != list.size())
+ return false;
+ for (Object otherObj : otherList)
{
int found = 0;
- for (DSInfo thisDsInfo : dsList)
+ for (Object thisObj : list)
{
- if (thisDsInfo.equals(dsInfo))
+ if (thisObj.equals(otherObj))
found++;
}
// Not found
@@ -1096,52 +1079,33 @@
assertFalse(found > 1);
// Ok, found exactly once in the list, examine next structure
}
-
- // Check rsList
- if (topoView.rsList.size() != rsList.size())
- return false;
- for (RSInfo rsInfo : topoView.rsList)
- {
- int found = 0;
- for (RSInfo thisRsInfo : rsList)
- {
- if (thisRsInfo.equals(rsInfo))
- found++;
- }
- // Not found
- if (found == 0)
- return false;
- // Should never see twice as rsInfo structure in a dsList
- assertFalse(found > 1);
- // Ok, found exactly once in the list, examine next structure
- }
-
return true;
}
@Override
public String toString()
{
- String dsStr = "";
+ final StringBuilder sb = new StringBuilder("TopoView:");
+ sb.append("\n----------------------------\n");
+ sb.append("CONNECTED DS SERVERS:\n");
for (DSInfo dsInfo : dsList)
{
- dsStr += dsInfo.toString() + "\n----------------------------\n";
+ sb.append(dsInfo).append("\n----------------------------\n");
}
-
- String rsStr = "";
+ sb.append("CONNECTED RS SERVERS:\n");
for (RSInfo rsInfo : rsList)
{
- rsStr += rsInfo.toString() + "\n----------------------------\n";
+ sb.append(rsInfo).append("\n----------------------------\n");
}
+ return sb.toString();
+ }
- return ("TopoView:" +
- "\n----------------------------\n" + "CONNECTED DS SERVERS:\n" + dsStr +
- "CONNECTED RS SERVERS:\n" + rsStr);
+ private TopologyViewTest getOuterType()
+ {
+ return TopologyViewTest.this;
}
}
-
-
private String getHostPort(int port)
{
return LOCAL_HOST_NAME + ":" + port;
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 5c140f6..f778fb4 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -38,6 +38,7 @@
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
@@ -56,6 +57,7 @@
import org.testng.annotations.Test;
import static java.util.Arrays.*;
+
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -239,14 +241,12 @@
* (no server state constructor version)
*/
private FakeReplicationDomain createFakeReplicationDomain(int serverId,
- int groupId, int rsId, long generationId, boolean assured,
- AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
- int scenario)
- throws Exception
+ int groupId, int rsId, long generationId, AssuredMode assuredMode,
+ int safeDataLevel, long assuredTimeout, int scenario) throws Exception
{
- ReplicationDomainCfg config = newFakeCfg(serverId, getRsPort(rsId), groupId);
- return createFakeReplicationDomain(config, groupId, rsId, generationId, assured,
- assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState(), true);
+ return createFakeReplicationDomain(serverId, groupId, rsId, generationId,
+ assuredMode, safeDataLevel, assuredTimeout, scenario,
+ new ServerState(), true);
}
private int getRsPort(int rsId)
@@ -284,17 +284,23 @@
* @throws Exception
*/
private FakeReplicationDomain createFakeReplicationDomain(
- ReplicationDomainCfg config, int groupId, int rsId, long generationId,
- boolean assured, AssuredMode assuredMode, int safeDataLevel,
+ int serverId, int groupId, int rsId, long generationId,
+ AssuredMode assuredMode, int safeDataLevel,
long assuredTimeout, int scenario, ServerState serverState,
boolean startListen) throws Exception
{
- // Set port to right real RS according to its id
- int rsPort = getRsPort(rsId);
+ final DomainFakeCfg config = newDomainConfig(serverId, groupId, rsId,
+ assuredMode, safeDataLevel, assuredTimeout);
+ return createFakeReplicationDomain(config, rsId, generationId, scenario,
+ serverState, startListen);
+ }
- FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain(
- config.getBaseDN(), config.getServerId(), generationId, (byte) groupId,
- assured, assuredMode, (byte) safeDataLevel, assuredTimeout, scenario, serverState);
+ private FakeReplicationDomain createFakeReplicationDomain(
+ ReplicationDomainCfg config, int rsId, long generationId, int scenario,
+ ServerState serverState, boolean startListen) throws Exception
+ {
+ FakeReplicationDomain fakeReplicationDomain =
+ new FakeReplicationDomain(config, generationId, scenario, serverState);
fakeReplicationDomain.startPublishService(config);
if (startListen)
@@ -304,20 +310,42 @@
assertTrue(fakeReplicationDomain.isConnected());
// Check connected server port
HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer());
- assertEquals(rd.getPort(), rsPort);
+ assertEquals(rd.getPort(), getRsPort(rsId));
return fakeReplicationDomain;
}
- private DomainFakeCfg newFakeCfg(int serverId, int rsPort, int groupId) throws Exception
+ private DomainFakeCfg newDomainConfig(int serverId, int groupId, int rsId,
+ AssuredMode assuredMode, int safeDataLevel, long assuredTimeout)
+ throws DirectoryException
{
- DomainFakeCfg fakeCfg = new DomainFakeCfg(
- DN.valueOf(TEST_ROOT_DN_STRING), serverId, newSortedSet("localhost:" + rsPort), groupId);
+ final int rsPort = getRsPort(rsId);
+ final DomainFakeCfg fakeCfg = new DomainFakeCfg(
+ DN.valueOf(TEST_ROOT_DN_STRING), serverId, newSortedSet("localhost:" + rsPort),
+ getAssuredType(assuredMode),
+ safeDataLevel, groupId, assuredTimeout, new TreeSet<String>());
fakeCfg.setHeartbeatInterval(1000);
fakeCfg.setChangetimeHeartbeatInterval(500);
return fakeCfg;
}
+ private AssuredType getAssuredType(AssuredMode assuredMode)
+ {
+ if (assuredMode == null)
+ {
+ return AssuredType.NOT_ASSURED;
+ }
+
+ switch (assuredMode)
+ {
+ case SAFE_READ_MODE:
+ return AssuredType.SAFE_READ;
+ case SAFE_DATA_MODE:
+ return AssuredType.SAFE_DATA;
+ }
+ throw new RuntimeException("Not implemented for " + assuredMode);
+ }
+
/**
* Creates and connects a new fake replication server, using the passed scenario.
*/
@@ -412,9 +440,8 @@
{
/** The scenario this DS is expecting */
private int scenario = -1;
- private long generationId = -1;
- private CSNGenerator gen = null;
+ private CSNGenerator gen;
/** False if a received update had assured parameters not as expected */
private boolean everyUpdatesAreOk = true;
@@ -437,28 +464,14 @@
* behavior upon reception of updates)
* @throws org.opends.server.config.ConfigException
*/
- public FakeReplicationDomain(
- DN baseDN,
- int serverID,
- long generationId,
- byte groupId,
- boolean assured,
- AssuredMode assuredMode,
- byte safeDataLevel,
- long assuredTimeout,
- int scenario,
- ServerState serverState) throws ConfigException
+ public FakeReplicationDomain(ReplicationDomainCfg config,
+ long generationId, int scenario, ServerState serverState)
+ throws ConfigException
{
- super(baseDN, serverID, serverState);
- this.generationId = generationId;
- setGroupId(groupId);
- setAssured(assured);
- setAssuredMode(assuredMode);
- setAssuredSdLevel(safeDataLevel);
- setAssuredTimeout(assuredTimeout);
+ super(config, generationId, serverState);
this.scenario = scenario;
- gen = new CSNGenerator(serverID, 0L);
+ gen = new CSNGenerator(config.getServerId(), 0L);
}
public boolean receivedUpdatesOk()
@@ -490,12 +503,6 @@
}
@Override
- public long getGenerationID()
- {
- return generationId;
- }
-
- @Override
protected void importBackend(InputStream input) throws DirectoryException
{
// Not needed for this test
@@ -549,8 +556,8 @@
debugInfo("Fake DS " + getServerId() + " received update assured flag is wrong: " + updateMsg);
ok = false;
}
- if (updateMsg.getAssuredMode() != getAssuredMode())
- {
+ if (isAssured() && updateMsg.getAssuredMode() != getAssuredMode())
+ { // it is meaningless to have different assured mode when UpdateMsg is not assured
debugInfo("Fake DS " + getServerId() + " received update assured mode is wrong: " + updateMsg);
ok = false;
}
@@ -645,7 +652,7 @@
* @param assuredMode the expected assured mode of the incoming updates (also used for outgoing updates)
* @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates)
* @param groupId our group id
- * @param baseDN the basedn we connect with, to the real RS
+ * @param baseDN the baseDN we connect with, to the real RS
* @param generationId the generation id we use at connection to real RS
*/
public FakeReplicationServer(int port, int serverId, boolean assured,
@@ -1022,9 +1029,7 @@
*/
// Create real RS 1
- rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
- testCase, 0);
- assertNotNull(rs1);
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
/*
* Start main DS (the one which sends updates)
@@ -1033,8 +1038,7 @@
// Create and connect fake domain 1 to RS 1
// Assured mode: SD, level 1
fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, mainDsGid, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
/*
* Start one other fake DS
@@ -1049,8 +1053,7 @@
// this would timeout. If main DS group id is not the same as the real RS one,
// the update will even not come to real RS as assured
fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID,
- DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
}
/*
@@ -1363,9 +1366,7 @@
*/
// Create real RS 1
- rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
- testCase, 0);
- assertNotNull(rs1);
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
/*
* Start main DS (the one which sends updates)
@@ -1373,8 +1374,7 @@
// Create and connect fake domain 1 to RS 1
fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
/*
* Start one other fake DS
@@ -1384,8 +1384,7 @@
if (otherFakeDS)
{
fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID,
- otherFakeDsGenId, false, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ otherFakeDsGenId, null, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
}
/*
@@ -1873,9 +1872,7 @@
*/
// Create real RS 1
- rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
- testCase, 0);
- assertNotNull(rs1);
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
/*
* Start fake RS to make the RS have the default generation id
@@ -1976,20 +1973,13 @@
*/
int numberOfRealRSs = 3;
- // Create real RS 1
+ // Create real RS 1, 2, 3
rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
testCase, numberOfRealRSs);
- assertNotNull(rs1);
-
- // Create real RS 2
rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
testCase, numberOfRealRSs);
- assertNotNull(rs2);
-
- // Create real RS 3
rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
testCase, numberOfRealRSs);
- assertNotNull(rs3);
/*
* Start DS that will send updates
@@ -1998,8 +1988,7 @@
// Wait for RSs to connect together
// Create and connect fake domain 1 to RS 1
fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// Wait for RSs connections to be finished
// DS must see expected numbers of RSs
@@ -2049,9 +2038,7 @@
*/
// Create real RS 1
- rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
- testCase, 0);
- assertNotNull(rs1);
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
/*******************
* Start main DS 1 (the one which sends updates)
@@ -2060,8 +2047,7 @@
// Create and connect DS 1 to RS 1
// Assured mode: SR
fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
/*
* Send a first assured safe read update
@@ -2089,10 +2075,9 @@
// Create and connect DS 2 to RS 1
// Assured mode: SR
ServerState serverState = fakeRd1.getServerState();
- ReplicationDomainCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID), DEFAULT_GID);
- fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- REPLY_OK_DS_SCENARIO, serverState, true);
+ fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+ REPLY_OK_DS_SCENARIO, serverState, true);
// Wait for connections to be established
waitForStableTopo(fakeRd1, 1, 1);
@@ -2306,31 +2291,26 @@
*/
// Create real RS 1
- rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
- testCase, 0);
- assertNotNull(rs1);
+ rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0);
/*
* Start main DS 1 (the one which sends updates)
*/
fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
/*
* Start another fake DS 2 connected to RS
*/
fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- REPLY_OK_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
/*
* Start another fake DS 3 connected to RS
*/
fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID,
- otherFakeDsGenId, (otherFakeDsGid == DEFAULT_GID),
- AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- otherFakeDsScen);
+ otherFakeDsGenId, otherFakeDsGid == DEFAULT_GID ? AssuredMode.SAFE_READ_MODE : null,
+ 1, LONG_TIMEOUT, otherFakeDsScen);
/*
* Start fake RS (RS 1) connected to RS
@@ -2617,25 +2597,17 @@
*/
int numberOfRealRSs = 4;
- // Create real RS 1
+ // Create real RS 1, 2, 3
rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
testCase, numberOfRealRSs);
- assertNotNull(rs1);
-
- // Create real RS 2
rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
testCase, numberOfRealRSs);
- assertNotNull(rs2);
-
- // Create real RS 3
rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
testCase, numberOfRealRSs);
- assertNotNull(rs3);
// Create real RS 4 (different GID 2)
rs4 = createReplicationServer(RS4_ID, OTHER_GID_BIS, SMALL_TIMEOUT,
testCase, numberOfRealRSs);
- assertNotNull(rs4);
/*
* Start DS 1 that will send assured updates
@@ -2644,8 +2616,7 @@
// Wait for RSs to connect together
// Create and connect fake domain 1 to RS 1
fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// Wait for connections to be finished
// DS must see expected numbers of DSs/RSs
@@ -2674,23 +2645,19 @@
// DS 2 connected to RS 1
fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- REPLY_OK_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
// DS 3 connected to RS 2
fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, DEFAULT_GID, RS2_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- REPLY_OK_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
// DS 4 connected to RS 3
fakeRDs[4] = createFakeReplicationDomain(FDS4_ID, DEFAULT_GID, RS3_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- REPLY_OK_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
// DS 5 connected to RS 3
fakeRDs[5] = createFakeReplicationDomain(FDS5_ID, DEFAULT_GID, RS3_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- REPLY_OK_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
/*
* Start DSs that will not receive updates from DS 1 as assured because
@@ -2699,23 +2666,19 @@
// DS 6 connected to RS 1
fakeRDs[6] = createFakeReplicationDomain(FDS6_ID, OTHER_GID, RS1_ID,
- DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// DS 7 connected to RS 2
fakeRDs[7] = createFakeReplicationDomain(FDS7_ID, OTHER_GID, RS2_ID,
- DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// DS 8 connected to RS 3
fakeRDs[8] = createFakeReplicationDomain(FDS8_ID, OTHER_GID, RS3_ID,
- DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// DS 9 (GID 2) connected to RS 4
fakeRDs[9] = createFakeReplicationDomain(FDS9_ID, OTHER_GID_BIS, RS4_ID,
- DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
/*
* Start DSs that will not receive updates from DS 1 because
@@ -2724,18 +2687,15 @@
// DS 10 connected to RS 1
fakeRDs[10] = createFakeReplicationDomain(FDS10_ID, DEFAULT_GID, RS1_ID,
- OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// DS 11 connected to RS 2
fakeRDs[11] = createFakeReplicationDomain(FDS11_ID, DEFAULT_GID, RS2_ID,
- OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// DS 12 connected to RS 3
fakeRDs[12] = createFakeReplicationDomain(FDS12_ID, DEFAULT_GID, RS3_ID,
- OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// Wait for connections to be finished
// DS must see expected numbers of DSs/RSs
@@ -2897,15 +2857,11 @@
*/
int numberOfRealRSs = 2;
- // Create real RS 1
+ // Create real RS 1, 2
rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
testCase, numberOfRealRSs);
- assertNotNull(rs1);
-
- // Create real RS 2
rs2 = createReplicationServer(RS2_ID, OTHER_GID, SMALL_TIMEOUT,
testCase, numberOfRealRSs);
- assertNotNull(rs2);
/*
* Start DSs with GID=DEFAULT_GID, connected to RS1
@@ -2913,13 +2869,11 @@
// DS 1 connected to RS 1
fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// DS 2 connected to RS 1
fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- REPLY_OK_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
/*
* Start DSs with GID=OTHER_GID, connected to RS2
@@ -2927,13 +2881,11 @@
// DS 3 connected to RS 2
fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, OTHER_GID, RS2_ID,
- DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- REPLY_OK_DS_SCENARIO);
+ DEFAULT_GENID, null, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
// DS 4 connected to RS 3
fakeRDs[4] = createFakeReplicationDomain(FDS4_ID, OTHER_GID, RS2_ID,
- DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- REPLY_OK_DS_SCENARIO);
+ DEFAULT_GENID, null, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO);
// Wait for connections to be finished
// DS must see expected numbers of DSs/RSs
@@ -3010,15 +2962,11 @@
*/
int numberOfRealRSs = 2;
- // Create real RS 1
+ // Create real RS 1, 2
rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT + 1000, // Be sure DS2 timeout is seen from DS1
testCase, numberOfRealRSs);
- assertNotNull(rs1);
-
- // Create real RS 2
rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
testCase, numberOfRealRSs);
- assertNotNull(rs2);
/*
* Start 2 fake DSs
@@ -3026,13 +2974,12 @@
// DS 1 connected to RS 1
fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// DS 2 connected to RS 2
fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, fakeDsGid, RS2_ID,
- fakeDsGenId, (fakeDsGid == DEFAULT_GID),
- AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, fakeDsScen);
+ fakeDsGenId, fakeDsGid == DEFAULT_GID ? AssuredMode.SAFE_READ_MODE : null,
+ 1, LONG_TIMEOUT, fakeDsScen);
// Wait for connections to be finished
// DS must see expected numbers of DSs/RSs
@@ -3167,15 +3114,14 @@
// DS 1 connected to RS 1
fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- TIMEOUT_DS_SCENARIO);
+ DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO);
// DS 2 connected to RS 1 with low window to easily put it in DEGRADED status
- DomainFakeCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID), DEFAULT_GID);
+ final DomainFakeCfg config = newDomainConfig(FDS2_ID, DEFAULT_GID, RS1_ID,
+ AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT);
config.setWindowSize(2);
- fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID,
- DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
- REPLY_OK_DS_SCENARIO, new ServerState(), false);
+ fakeRDs[2] = createFakeReplicationDomain(config, RS1_ID, DEFAULT_GENID,
+ REPLY_OK_DS_SCENARIO, new ServerState(), false);
// Wait for connections to be finished
// DS must see expected numbers of DSs/RSs
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
index 601a39b..2108816 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeReplicationDomain.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS
+ * Portions Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -65,19 +65,24 @@
private int exportedEntryCount;
- private long generationID = 1;
-
private FakeReplicationDomain(DN baseDN, int serverID,
SortedSet<String> replicationServers, int window, long heartbeatInterval)
throws ConfigException
{
- super(baseDN, serverID, 100);
+ super(newConfig(baseDN, serverID, replicationServers, window,
+ heartbeatInterval), 1);
+ startPublishService(getConfig());
+ startListenService();
+ }
+
+ private static DomainFakeCfg newConfig(DN baseDN, int serverID,
+ SortedSet<String> replicationServers, int window, long heartbeatInterval)
+ {
DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers);
fakeCfg.setHeartbeatInterval(heartbeatInterval);
fakeCfg.setChangetimeHeartbeatInterval(500);
fakeCfg.setWindowSize(window);
- startPublishService(fakeCfg);
- startListenService();
+ return fakeCfg;
}
public FakeReplicationDomain(DN baseDN, int serverID,
@@ -122,12 +127,6 @@
}
@Override
- public long getGenerationID()
- {
- return generationID;
- }
-
- @Override
protected void importBackend(InputStream input) throws DirectoryException
{
byte[] buffer = new byte[1000];
@@ -157,8 +156,4 @@
return true;
}
- public void setGenerationID(long newGenerationID)
- {
- generationID = newGenerationID;
- }
}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
index b0a49c0..d6b5ca1 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/service/FakeStressReplicationDomain.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013 ForgeRock AS
+ * Portions Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.service;
@@ -58,14 +58,20 @@
SortedSet<String> replicationServers, long heartbeatInterval,
BlockingQueue<UpdateMsg> queue) throws ConfigException
{
- super(baseDN, serverID, 100);
+ super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval), 1);
+ startPublishService(getConfig());
+ startListenService();
+ this.queue = queue;
+ }
+
+ private static DomainFakeCfg newConfig(DN baseDN, int serverID,
+ SortedSet<String> replicationServers, long heartbeatInterval)
+ {
final DomainFakeCfg fakeCfg =
new DomainFakeCfg(baseDN, serverID, replicationServers);
fakeCfg.setHeartbeatInterval(heartbeatInterval);
fakeCfg.setChangetimeHeartbeatInterval(500);
- startPublishService(fakeCfg);
- startListenService();
- this.queue = queue;
+ return fakeCfg;
}
private static final int IMPORT_SIZE = 100000000;
@@ -98,12 +104,6 @@
}
@Override
- public long getGenerationID()
- {
- return 1;
- }
-
- @Override
protected void importBackend(InputStream input) throws DirectoryException
{
long startDate = System.currentTimeMillis();
--
Gitblit v1.10.0