Front-port of r10098.
Replaced/removed numerous fields and methods of ReplicationDomain by directly storing the config.
Changed a lot of subclasses due to this change.
ReplicationDomain.java:
Pulled up generationId, config fields, getGenerationID(), readAssuredConfig() and needReconnection() methods from LDAPReplicationDomain.
Replaced baseDN, serverID, groupId, refUrls, initWindow fields with new config field + encapsulated fields, updated getters, removed setters
Replaced assured, assuredMode, assuredSdLevel, assuredTimeout fields with new assuredConfig field + encapsulated fields, updated getters, removed setters
Removed domains field, never used.
Inlined stopDomain().
Added generationId to the ctor + implemented getGenerationID() + added setGenerationID().
Extracted method restartService(), needsAck().
LDAPReplicationDomain.java:
Pulled up generationId, config fields, getGenerationID(), readAssuredConfig() and needReconnection() methods to ReplicationDomain.
In ctor, consequence of the change to ReplicationDomain.
DSInfo.java:
Made it immutable.
StartSessionMsg.java:
Code cleanup.
Javadocs.
DummyReplicationDomain.java, FractionalReplicationTest.java, FakeReplicationDomain.java, FakeStressReplicationDomain.java:
Consequence of the changes to ReplicationDomain.
Pulled up generationId, getGenerationID(), setGeneration() to ReplicationDomain.
Called ReplicationDomain.getConfig().
TopologyViewTest.java:
Consequence of the change to ReplicationDomain.getRefUrls().
Extracted method checkLists().
Code cleanup.
AssuredReplicationServerTest.java:
Streamlined and simplified the createFakeReplicationDomain() methods + removed the boolean assured parameter.
Added getAssuredType(AssuredMode).
In FakeReplicationDomain, consequence of the changes to ReplicationDomain.
In checkUpdateAssuredParameters(), changed the assert a bit to match removal of the boolean assured parameter.
Removed useless calls to assertNotNull() after calling createReplicationServer() (result is never null).
Replaced newFakeCfg() by newDomainConfig().
| | |
| | | * |
| | | * |
| | | * Copyright 2008-2010 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | * Portions copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.*; |
| | | |
| | | /** |
| | | * This class holds information about a DS connected to the topology. This |
| | |
| | | * messages, to keep every member (RS or DS) of the topology aware of the DS |
| | | * topology. |
| | | * <p> |
| | | * This class is almost immutable, because it does not copy the List and Set |
| | | * passed into the ctor. |
| | | * @Immutable |
| | | */ |
| | | public final class DSInfo |
| | | { |
| | |
| | | private final Set<String> eclIncludesForDeletes; |
| | | |
| | | |
| | | |
| | | /** |
| | | * Creates a new instance of DSInfo with every given info. |
| | | * |
| | |
| | | public DSInfo(int dsId, String dsUrl, int rsId, long generationId, |
| | | ServerStatus status, boolean assuredFlag, |
| | | AssuredMode assuredMode, byte safeDataLevel, byte groupId, |
| | | List<String> refUrls, Set<String> eclIncludes, |
| | | Set<String> eclIncludesForDeletes, short protocolVersion) |
| | | Collection<String> refUrls, Collection<String> eclIncludes, |
| | | Collection<String> eclIncludesForDeletes, short protocolVersion) |
| | | { |
| | | this.dsId = dsId; |
| | | this.dsUrl = dsUrl; |
| | |
| | | this.assuredMode = assuredMode; |
| | | this.safeDataLevel = safeDataLevel; |
| | | this.groupId = groupId; |
| | | this.refUrls = refUrls; |
| | | this.eclIncludes = eclIncludes; |
| | | this.eclIncludesForDeletes = eclIncludesForDeletes; |
| | | this.refUrls = Collections.unmodifiableList(new ArrayList<String>(refUrls)); |
| | | this.eclIncludes = |
| | | Collections.unmodifiableSet(new HashSet<String>(eclIncludes)); |
| | | this.eclIncludesForDeletes = |
| | | Collections.unmodifiableSet(new HashSet<String>(eclIncludesForDeletes)); |
| | | this.protocolVersion = protocolVersion; |
| | | } |
| | | |
| | |
| | | import static org.opends.messages.ToolMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.AssuredMode.*; |
| | | import static org.opends.server.replication.plugin.EntryHistorical.*; |
| | | import static org.opends.server.replication.protocol.OperationContext.*; |
| | | import static org.opends.server.replication.service.ReplicationMonitor.*; |
| | |
| | | private final PersistentServerState state; |
| | | private int numReplayedPostOpCalled = 0; |
| | | |
| | | private volatile long generationId = -1; |
| | | private volatile boolean generationIdSavedStatus = false; |
| | | |
| | | private final CSNGenerator generator; |
| | |
| | | private final SortedMap<CSN, FakeOperation> replayOperations = |
| | | new TreeMap<CSN, FakeOperation>(); |
| | | |
| | | private ReplicationDomainCfg config; |
| | | private ExternalChangelogDomain eclDomain; |
| | | |
| | | /** |
| | |
| | | public LDAPReplicationDomain(ReplicationDomainCfg configuration, |
| | | BlockingQueue<UpdateToReplay> updateToReplayQueue) throws ConfigException |
| | | { |
| | | super(configuration.getBaseDN(), |
| | | configuration.getServerId(), |
| | | configuration.getInitializationWindowSize()); |
| | | super(configuration, -1); |
| | | |
| | | this.config = configuration; |
| | | this.updateToReplayQueue = updateToReplayQueue; |
| | | |
| | | // Get assured configuration |
| | |
| | | // Get fractional configuration |
| | | fractionalConfig = new FractionalConfig(getBaseDN()); |
| | | readFractionalConfig(configuration, false); |
| | | |
| | | setGroupId((byte)configuration.getGroupId()); |
| | | setURLs(configuration.getReferralsUrl()); |
| | | |
| | | storeECLConfiguration(configuration); |
| | | |
| | | solveConflictFlag = isSolveConflict(configuration); |
| | | |
| | | Backend backend = retrievesBackend(getBaseDN()); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Gets and stores the assured replication configuration parameters. Returns |
| | | * a boolean indicating if the passed configuration has changed compared to |
| | | * previous values and the changes require a reconnection. |
| | | * @param configuration The configuration object |
| | | * @param allowReconnection Tells if one must reconnect if significant changes |
| | | * occurred |
| | | */ |
| | | private void readAssuredConfig(ReplicationDomainCfg configuration, |
| | | boolean allowReconnection) |
| | | { |
| | | final boolean needReconnection = needReconnection(configuration); |
| | | |
| | | // Disconnect if required: changing configuration values before |
| | | // disconnection would make assured replication used immediately and |
| | | // disconnection could cause some timeouts error. |
| | | if (needReconnection && allowReconnection) |
| | | disableService(); |
| | | |
| | | switch (configuration.getAssuredType()) |
| | | { |
| | | case NOT_ASSURED: |
| | | setAssured(false); |
| | | break; |
| | | case SAFE_DATA: |
| | | setAssured(true); |
| | | setAssuredMode(AssuredMode.SAFE_DATA_MODE); |
| | | break; |
| | | case SAFE_READ: |
| | | setAssured(true); |
| | | setAssuredMode(AssuredMode.SAFE_READ_MODE); |
| | | break; |
| | | } |
| | | setAssuredSdLevel((byte) configuration.getAssuredSdLevel()); |
| | | setAssuredTimeout(configuration.getAssuredTimeout()); |
| | | |
| | | // Reconnect if required |
| | | if (needReconnection && allowReconnection) |
| | | enableService(); |
| | | } |
| | | |
| | | private boolean needReconnection(ReplicationDomainCfg cfg) |
| | | { |
| | | switch (cfg.getAssuredType()) |
| | | { |
| | | case NOT_ASSURED: |
| | | if (isAssured()) |
| | | { |
| | | return true; |
| | | } |
| | | break; |
| | | case SAFE_DATA: |
| | | if (!isAssured() || getAssuredMode() == SAFE_READ_MODE) |
| | | { |
| | | return true; |
| | | } |
| | | break; |
| | | case SAFE_READ: |
| | | if (!isAssured() || getAssuredMode() == SAFE_DATA_MODE) |
| | | { |
| | | return true; |
| | | } |
| | | break; |
| | | } |
| | | |
| | | return isAssured() |
| | | && getAssuredMode() == SAFE_DATA_MODE |
| | | && cfg.getAssuredSdLevel() != getAssuredSdLevel(); |
| | | } |
| | | |
| | | /** |
| | | * Sets the error message id to be used when online import is stopped with |
| | | * error by the fractional replication ldif import plugin. |
| | | * @param importErrorMessageId The message to use. |
| | |
| | | } |
| | | |
| | | // Disable service if configuration changed |
| | | if (needReconnection && allowReconnection) |
| | | final boolean needRestart = needReconnection && allowReconnection; |
| | | if (needRestart) |
| | | { |
| | | disableService(); |
| | | } |
| | |
| | | } |
| | | |
| | | // Reconnect if required |
| | | if (needReconnection && allowReconnection) |
| | | if (needRestart) |
| | | enableService(); |
| | | } |
| | | |
| | |
| | | |
| | | // FIXME should the next call use the initWindow parameter rather than the |
| | | // instance variable? |
| | | super.initializeRemote(target, requestorID, initTask, this.initWindow); |
| | | super.initializeRemote(target, requestorID, initTask, getInitWindow()); |
| | | } |
| | | |
| | | /** |
| | |
| | | DirectoryServer.deregisterAlertGenerator(this); |
| | | |
| | | // stop the ReplicationDomain |
| | | stopDomain(); |
| | | disableService(); |
| | | } |
| | | |
| | | // wait for completion of the persistentServerState thread. |
| | |
| | | return genId; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getGenerationID() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Run a modify operation to update the entry whose DN is given as |
| | | * a parameter with the generationID information. |
| | |
| | | * |
| | | * |
| | | * Copyright 2008-2009 Sun Microsystems, Inc. |
| | | * Portions copyright 2011-2013 ForgeRock AS |
| | | * Portions copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.protocol; |
| | | |
| | | import java.io.ByteArrayOutputStream; |
| | | import java.io.IOException; |
| | | import java.io.UnsupportedEncodingException; |
| | | import java.util.ArrayList; |
| | | import java.util.HashSet; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.*; |
| | | import java.util.zip.DataFormatException; |
| | | |
| | | import org.opends.server.protocols.asn1.ASN1; |
| | |
| | | import org.opends.server.types.ByteSequenceReader; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | /** |
| | | * This message is used by DS to confirm a RS he wants to connect to him (open |
| | |
| | | */ |
| | | public class StartSessionMsg extends ReplicationMsg |
| | | { |
| | | // The list of referrals URLs to the sending DS |
| | | private List<String> referralsURLs = new ArrayList<String>(); |
| | | // The initial status the DS starts with |
| | | /** The list of referrals URLs to the sending DS. */ |
| | | private final List<String> referralsURLs = new ArrayList<String>(); |
| | | /** The initial status the DS starts with. */ |
| | | private ServerStatus status = ServerStatus.INVALID_STATUS; |
| | | // Assured replication enabled on DS or not |
| | | private boolean assuredFlag = false; |
| | | // DS assured mode (relevant if assured replication enabled) |
| | | /** Assured replication enabled on DS or not. */ |
| | | private boolean assuredFlag; |
| | | /** DS assured mode (relevant if assured replication enabled). */ |
| | | private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | // DS safe data level (relevant if assured mode is safe data) |
| | | private byte safeDataLevel = (byte) 1; |
| | | /** DS safe data level (relevant if assured mode is safe data). */ |
| | | private byte safeDataLevel = 1; |
| | | |
| | | private Set<String> eclIncludes = new HashSet<String>(); |
| | | |
| | | private Set<String> eclIncludesForDeletes = new HashSet<String>(); |
| | | |
| | | /** |
| | |
| | | * @param assuredMode Assured type |
| | | * @param safeDataLevel Assured mode safe data level |
| | | */ |
| | | public StartSessionMsg(ServerStatus status, List<String> referralsURLs, |
| | | public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs, |
| | | boolean assuredFlag, AssuredMode assuredMode, byte safeDataLevel) |
| | | { |
| | | this.referralsURLs = referralsURLs; |
| | | this.referralsURLs.addAll(referralsURLs); |
| | | this.status = status; |
| | | this.assuredFlag = assuredFlag; |
| | | this.assuredMode = assuredMode; |
| | |
| | | * @param status Status we are starting with |
| | | * @param referralsURLs Referrals URLs to be used by peer DSs |
| | | */ |
| | | public StartSessionMsg(ServerStatus status, List<String> referralsURLs) |
| | | public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs) |
| | | { |
| | | this.referralsURLs = referralsURLs; |
| | | this.referralsURLs.addAll(referralsURLs); |
| | | this.status = status; |
| | | this.assuredFlag = false; |
| | | } |
| | |
| | | // Msg encoding |
| | | // ============ |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public byte[] getBytes(short reqProtocolVersion) |
| | | throws UnsupportedEncodingException |
| | |
| | | |
| | | /* Read the referrals URLs */ |
| | | int pos = 5; |
| | | referralsURLs = new ArrayList<String>(); |
| | | while (pos < in.length) |
| | | { |
| | | /* |
| | |
| | | return status; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | String urls = ""; |
| | | for (String s : referralsURLs) |
| | | { |
| | | urls += s + " | "; |
| | | } |
| | | return ("StartSessionMsg content:\nstatus: " + status + |
| | | String urls = StaticUtils.collectionToString(referralsURLs, " | "); |
| | | return "StartSessionMsg content:\nstatus: " + status + |
| | | "\nassuredFlag: " + assuredFlag + |
| | | "\nassuredMode: " + assuredMode + |
| | | "\nsafeDataLevel: " + safeDataLevel + |
| | | "\nreferralsURLs: " + urls + |
| | | "\nEclIncludes " + eclIncludes + |
| | | "\nEclIncludeForDeletes: " + eclIncludesForDeletes); |
| | | "\nEclIncludeForDeletes: " + eclIncludesForDeletes; |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * |
| | | * Copyright 2008-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.backends.task.Task; |
| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.common.AssuredMode.*; |
| | | import static org.opends.server.replication.common.StatusMachine.*; |
| | | |
| | | /** |
| | |
| | | * implementation using methods {@link #initializeRemote(int)} |
| | | * or {@link #initializeFromRemote(int)}. |
| | | * <p> |
| | | * At shutdown time, the {@link #stopDomain()} method should be called to |
| | | * At shutdown time, the {@link #disableService()} method should be called to |
| | | * cleanly stop the replication service. |
| | | */ |
| | | public abstract class ReplicationDomain |
| | |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** The configuration of the replication domain. */ |
| | | protected volatile ReplicationDomainCfg config; |
| | | /** |
| | | * The baseDN for the Replication Service. |
| | | * All Replication Domain using this baseDN will be connected |
| | | * through the Replication Service. |
| | | * The assured configuration of the replication domain. It is a duplicate of |
| | | * {@link #config} because of its update model. |
| | | * |
| | | * @see #readAssuredConfig(ReplicationDomainCfg, boolean) |
| | | */ |
| | | private final DN baseDN; |
| | | |
| | | /** |
| | | * The identifier of this Replication Domain inside the |
| | | * Replication Service. |
| | | * Each Domain must use a unique ServerID. |
| | | */ |
| | | private final int serverID; |
| | | private volatile ReplicationDomainCfg assuredConfig; |
| | | |
| | | /** |
| | | * The ReplicationBroker that is used by this ReplicationDomain to |
| | | * connect to the ReplicationService. |
| | | */ |
| | | protected ReplicationBroker broker = null; |
| | | protected ReplicationBroker broker; |
| | | |
| | | /** |
| | | * This Map is used to store all outgoing assured messages in order |
| | |
| | | private volatile DirectoryThread listenerThread = null; |
| | | |
| | | /** |
| | | * A Map used to store all the ReplicationDomains created on this server. |
| | | */ |
| | | private static Map<DN, ReplicationDomain> domains = |
| | | new HashMap<DN, ReplicationDomain>(); |
| | | |
| | | /* |
| | | * Assured mode properties |
| | | */ |
| | | /** Whether assured mode is enabled for this domain. */ |
| | | private boolean assured = false; |
| | | /** Assured sub mode (used when assured is true). */ |
| | | private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; |
| | | /** Safe Data level (used when assuredMode is SAFE_DATA). */ |
| | | private byte assuredSdLevel = 1; |
| | | /** The timeout in ms that should be used, when waiting for assured acks. */ |
| | | private long assuredTimeout = 2000; |
| | | |
| | | /** Group id. */ |
| | | private byte groupId = 1; |
| | | /** |
| | | * Referrals urls to be published to other servers of the topology. |
| | | * <p> |
| | | * TODO: fill that with all currently opened urls if no urls configured |
| | | */ |
| | | private final List<String> refUrls = new ArrayList<String>(); |
| | | |
| | | /** |
| | | * A set of counters used for Monitoring. |
| | | */ |
| | | private AtomicInteger numProcessedUpdates = new AtomicInteger(0); |
| | |
| | | private final Map<Integer, Integer> assuredSdServerTimeoutUpdates = |
| | | new HashMap<Integer,Integer>(); |
| | | |
| | | /** |
| | | * Window size used during initialization .. between |
| | | * - the initializer/exporter DS that listens/waits acknowledges and that |
| | | * slows down data msg publishing based on the slowest server |
| | | * - and each initialized/importer DS that publishes acknowledges each |
| | | * WINDOW/2 data msg received. |
| | | */ |
| | | protected final int initWindow; |
| | | |
| | | /* Status related monitoring fields */ |
| | | |
| | | /** |
| | |
| | | private final Object sessionLock = new Object(); |
| | | |
| | | /** |
| | | * The generationId for this replication domain. It is made of a hash of the |
| | | * 1000 first entries for this domain. |
| | | */ |
| | | protected volatile long generationId; |
| | | |
| | | /** |
| | | * Returns the {@link CSNGenerator} that will be used to |
| | | * generate {@link CSN} for this domain. |
| | | * |
| | |
| | | /** |
| | | * Creates a ReplicationDomain with the provided parameters. |
| | | * |
| | | * @param baseDN The identifier of the Replication Domain to which |
| | | * this object is participating. |
| | | * @param serverID The identifier of the server that is participating |
| | | * to the Replication Domain. |
| | | * This identifier should be different for each server that |
| | | * is participating to a given Replication Domain. |
| | | * @param initWindow Window used during initialization. |
| | | * @param config |
| | | * The configuration object for this ReplicationDomain |
| | | * @param generationId |
| | | * the generation of this ReplicationDomain |
| | | */ |
| | | public ReplicationDomain(DN baseDN, int serverID, int initWindow) |
| | | public ReplicationDomain(ReplicationDomainCfg config, long generationId) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | | this.initWindow = initWindow; |
| | | this.config = config; |
| | | this.assuredConfig = config; |
| | | this.generationId = generationId; |
| | | this.state = new ServerState(); |
| | | this.generator = new CSNGenerator(serverID, state); |
| | | |
| | | domains.put(baseDN, this); |
| | | this.generator = new CSNGenerator(getServerId(), state); |
| | | } |
| | | |
| | | /** |
| | | * Creates a ReplicationDomain with the provided parameters. |
| | | * (for unit test purpose only) |
| | | * Creates a ReplicationDomain with the provided parameters. (for unit test |
| | | * purpose only) |
| | | * |
| | | * @param baseDN The identifier of the Replication Domain to which |
| | | * this object is participating. |
| | | * @param serverID The identifier of the server that is participating |
| | | * to the Replication Domain. |
| | | * This identifier should be different for each server that |
| | | * is participating to a given Replication Domain. |
| | | * @param serverState The serverState to use |
| | | * @param config |
| | | * The configuration object for this ReplicationDomain |
| | | * @param generationId |
| | | * the generation of this ReplicationDomain |
| | | * @param serverState |
| | | * The serverState to use |
| | | */ |
| | | public ReplicationDomain(DN baseDN, int serverID, ServerState serverState) |
| | | public ReplicationDomain(ReplicationDomainCfg config, long generationId, |
| | | ServerState serverState) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.serverID = serverID; |
| | | this.initWindow = 100; |
| | | this.config = config; |
| | | this.assuredConfig = config; |
| | | this.generationId = generationId; |
| | | this.state = serverState; |
| | | this.generator = new CSNGenerator(serverID, state); |
| | | |
| | | domains.put(baseDN, this); |
| | | this.generator = new CSNGenerator(getServerId(), state); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (!isValidInitialStatus(initStatus)) |
| | | { |
| | | logError(ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(), |
| | | getBaseDNString(), Integer.toString(serverID))); |
| | | getBaseDNString(), Integer.toString(getServerId()))); |
| | | } |
| | | else |
| | | { |
| | |
| | | private void receiveChangeStatus(ChangeStatusMsg csMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("Replication domain " + baseDN + |
| | | TRACER.debugInfo("Replication domain " + getBaseDN() + |
| | | " received change status message:\n" + csMsg); |
| | | |
| | | ServerStatus reqStatus = csMsg.getRequestedStatus(); |
| | |
| | | if (event == StatusMachineEvent.INVALID_EVENT) |
| | | { |
| | | logError(ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(), |
| | | getBaseDNString(), Integer.toString(serverID))); |
| | | getBaseDNString(), Integer.toString(getServerId()))); |
| | | return; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns the base DN of this ReplicationDomain. |
| | | * Returns the current config of this ReplicationDomain. |
| | | * |
| | | * @return the config |
| | | */ |
| | | protected ReplicationDomainCfg getConfig() |
| | | { |
| | | return config; |
| | | } |
| | | |
| | | /** |
| | | * Returns the base DN of this ReplicationDomain. All Replication Domain using |
| | | * this baseDN will be connected through the Replication Service. |
| | | * |
| | | * @return The base DN of this ReplicationDomain |
| | | */ |
| | | public DN getBaseDN() |
| | | { |
| | | return baseDN; |
| | | return config.getBaseDN(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public String getBaseDNString() |
| | | { |
| | | return baseDN.toNormalizedString(); |
| | | return getBaseDN().toNormalizedString(); |
| | | } |
| | | |
| | | /** |
| | | * Get the server ID. |
| | | * Get the server ID. The identifier of this Replication Domain inside the |
| | | * Replication Service. Each Domain must use a unique ServerID. |
| | | * |
| | | * @return The server ID. |
| | | */ |
| | | public int getServerId() |
| | | { |
| | | return serverID; |
| | | return config.getServerId(); |
| | | } |
| | | |
| | | /** |
| | | * Window size used during initialization .. between - the |
| | | * initializer/exporter DS that listens/waits acknowledges and that slows down |
| | | * data msg publishing based on the slowest server - and each |
| | | * initialized/importer DS that publishes acknowledges each WINDOW/2 data msg |
| | | * received. |
| | | * |
| | | * @return the initWindow |
| | | */ |
| | | protected int getInitWindow() |
| | | { |
| | | return config.getInitializationWindowSize(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public boolean isAssured() |
| | | { |
| | | return assured; |
| | | return AssuredType.SAFE_DATA.equals(assuredConfig.getAssuredType()) |
| | | || AssuredType.SAFE_READ.equals(assuredConfig.getAssuredType()); |
| | | } |
| | | |
| | | /** |
| | | * Gives the mode for the assured replication of the domain. |
| | | * Gives the mode for the assured replication of the domain. Only used when |
| | | * assured is true). |
| | | * |
| | | * @return The mode for the assured replication of the domain. |
| | | */ |
| | | public AssuredMode getAssuredMode() |
| | | { |
| | | return assuredMode; |
| | | switch (assuredConfig.getAssuredType()) |
| | | { |
| | | case SAFE_DATA: |
| | | case NOT_ASSURED: // The assured mode will be ignored in that case anyway |
| | | return AssuredMode.SAFE_DATA_MODE; |
| | | case SAFE_READ: |
| | | return AssuredMode.SAFE_READ_MODE; |
| | | } |
| | | return null; // should never happen |
| | | } |
| | | |
| | | /** |
| | | * Gives the assured level of the replication of the domain. |
| | | * Gives the assured Safe Data level of the replication of the domain. (used |
| | | * when assuredMode is SAFE_DATA). |
| | | * |
| | | * @return The assured level of the replication of the domain. |
| | | */ |
| | | public byte getAssuredSdLevel() |
| | | { |
| | | return assuredSdLevel; |
| | | return (byte) assuredConfig.getAssuredSdLevel(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public long getAssuredTimeout() |
| | | { |
| | | return assuredTimeout; |
| | | return assuredConfig.getAssuredTimeout(); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public byte getGroupId() |
| | | { |
| | | return groupId; |
| | | return (byte) config.getGroupId(); |
| | | } |
| | | |
| | | /** |
| | | * Gets the referrals URLs this domain publishes. |
| | | * Gets the referrals URLs this domain publishes. Referrals urls to be |
| | | * published to other servers of the topology. |
| | | * <p> |
| | | * TODO: fill that with all currently opened urls if no urls configured |
| | | * |
| | | * @return The referrals URLs this domain publishes. |
| | | */ |
| | | public List<String> getRefUrls() |
| | | public Set<String> getRefUrls() |
| | | { |
| | | return refUrls; |
| | | return config.getReferralsUrl(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set the list of Referrals that should be returned when an |
| | | * operation needs to be redirected to this server. |
| | | * |
| | | * @param referralsUrl The list of referrals. |
| | | */ |
| | | public void setURLs(Set<String> referralsUrl) |
| | | { |
| | | this.refUrls.addAll(referralsUrl); |
| | | } |
| | | |
| | | /** |
| | | * Set the timeout of the assured replication. |
| | | * |
| | | * @param assuredTimeout the timeout of the assured replication. |
| | | */ |
| | | public void setAssuredTimeout(long assuredTimeout) |
| | | { |
| | | this.assuredTimeout = assuredTimeout; |
| | | } |
| | | |
| | | /** |
| | | * Sets the groupID. |
| | | * |
| | | * @param groupId The groupID. |
| | | */ |
| | | public void setGroupId(byte groupId) |
| | | { |
| | | this.groupId = groupId; |
| | | } |
| | | |
| | | /** |
| | | * Sets the level of assured replication. |
| | | * |
| | | * @param assuredSdLevel The level of assured replication. |
| | | */ |
| | | public void setAssuredSdLevel(byte assuredSdLevel) |
| | | { |
| | | this.assuredSdLevel = assuredSdLevel; |
| | | } |
| | | |
| | | /** |
| | | * Sets the assured replication mode. |
| | | * |
| | | * @param dataMode The assured replication mode. |
| | | */ |
| | | public void setAssuredMode(AssuredMode dataMode) |
| | | { |
| | | this.assuredMode = dataMode; |
| | | } |
| | | |
| | | /** |
| | | * Sets assured replication. |
| | | * |
| | | * @param assured A boolean indicating if assured replication should be used. |
| | | */ |
| | | public void setAssured(boolean assured) |
| | | { |
| | | this.assured = assured; |
| | | } |
| | | |
| | | /** |
| | | * Receives an update message from the replicationServer. |
| | | * The other types of messages are processed in an opaque way for the caller. |
| | | * Also responsible for updating the list of pending changes |
| | |
| | | */ |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo( |
| | | "[IE] processErrorMsg:" + this.serverID + |
| | | " baseDN: " + this.baseDN + |
| | | "[IE] processErrorMsg:" + getServerId() + |
| | | " baseDN: " + getBaseDN() + |
| | | " Error Msg received: " + errorMsg); |
| | | |
| | | if (errorMsg.getCreationTime() > ieContext.startTime) |
| | |
| | | } |
| | | |
| | | numRcvdUpdates.incrementAndGet(); |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | if (update.isAssured() |
| | | && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE |
| | | && rsGroupId == groupId) |
| | | && broker.getRsGroupId() == getGroupId() |
| | | && update.getAssuredMode() == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | assuredSrReceivedUpdates.incrementAndGet(); |
| | | } |
| | |
| | | requested servers. Log problem |
| | | */ |
| | | logError(NOTE_DS_RECEIVED_ACK_ERROR.get( |
| | | getBaseDNString(), Integer.toString(serverID), |
| | | getBaseDNString(), Integer.toString(getServerId()), |
| | | update.toString(), ack.errorsToString())); |
| | | |
| | | List<Integer> failedServers = ack.getFailedServers(); |
| | |
| | | */ |
| | | public ExportThread(int serverIdToInitialize, int initWindow) |
| | | { |
| | | super("Export thread from serverId=" + serverID + " to serverId=" |
| | | super("Export thread from serverId=" + getServerId() + " to serverId=" |
| | | + serverIdToInitialize); |
| | | this.serverIdToInitialize = serverIdToInitialize; |
| | | this.initWindow = initWindow; |
| | |
| | | public void initializeRemote(int target, Task initTask) |
| | | throws DirectoryException |
| | | { |
| | | initializeRemote(target, this.serverID, initTask, this.initWindow); |
| | | initializeRemote(target, getServerId(), initTask, getInitWindow()); |
| | | } |
| | | |
| | | /** |
| | |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START_ALL.get( |
| | | countEntries(), getBaseDNString(), serverID)); |
| | | countEntries(), getBaseDNString(), getServerId())); |
| | | |
| | | for (DSInfo dsi : getReplicasList()) |
| | | { |
| | |
| | | } |
| | | else |
| | | { |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get( |
| | | countEntries(), getBaseDNString(), serverID, serverToInitialize)); |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(countEntries(), |
| | | getBaseDNString(), getServerId(), serverToInitialize)); |
| | | |
| | | ieContext.startList.add(serverToInitialize); |
| | | |
| | |
| | | |
| | | // Send start message to the peer |
| | | InitializeTargetMsg initTargetMsg = new InitializeTargetMsg( |
| | | getBaseDN(), serverID, serverToInitialize, |
| | | getBaseDN(), getServerId(), serverToInitialize, |
| | | serverRunningTheTask, ieContext.entryCount, initWindow); |
| | | |
| | | broker.publish(initTargetMsg); |
| | |
| | | exportBackend(new BufferedOutputStream(new ReplOutputStream(this))); |
| | | |
| | | // Notify the peer of the success |
| | | DoneMsg doneMsg = new DoneMsg(serverID, initTargetMsg.getDestination()); |
| | | broker.publish(doneMsg); |
| | | broker.publish( |
| | | new DoneMsg(getServerId(), initTargetMsg.getDestination())); |
| | | } |
| | | catch(DirectoryException exportException) |
| | | { |
| | |
| | | if (serverToInitialize == RoutableMsg.ALL_SERVERS) |
| | | { |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get( |
| | | getBaseDNString(), serverID, cause)); |
| | | getBaseDNString(), getServerId(), cause)); |
| | | } |
| | | else |
| | | { |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get( |
| | | getBaseDNString(), serverID, serverToInitialize, cause)); |
| | | getBaseDNString(), getServerId(), serverToInitialize, cause)); |
| | | } |
| | | |
| | | |
| | |
| | | // send the ack of flow control mgmt |
| | | if ((ieContext.msgCnt % (ieContext.initWindow/2)) == 0) |
| | | { |
| | | InitializeRcvAckMsg amsg = new InitializeRcvAckMsg( |
| | | this.serverID, |
| | | entryMsg.getSenderID(), |
| | | ieContext.msgCnt); |
| | | final InitializeRcvAckMsg amsg = new InitializeRcvAckMsg( |
| | | getServerId(), entryMsg.getSenderID(), ieContext.msgCnt); |
| | | broker.publish(amsg, false); |
| | | if (debugEnabled()) |
| | | { |
| | |
| | | Message.raw(Category.SYNC, Severity.NOTICE, |
| | | ERR_INIT_EXPORTER_DISCONNECTION.get( |
| | | getBaseDNString(), |
| | | Integer.toString(this.serverID), |
| | | Integer.toString(getServerId()), |
| | | Integer.toString(ieContext.importSource))); |
| | | ieContext.setExceptionIfNoneSet(new DirectoryException( |
| | | ResultCode.OTHER, errMsg)); |
| | |
| | | |
| | | // build the message |
| | | EntryMsg entryMessage = new EntryMsg( |
| | | serverID,ieContext.getExportTarget(), lDIFEntry, pos, length, |
| | | getServerId(),ieContext.getExportTarget(), lDIFEntry, pos, length, |
| | | ++ieContext.msgCnt); |
| | | |
| | | // Waiting the slowest loop |
| | |
| | | ieContext.initializeTask = initTask; |
| | | ieContext.attemptCnt = 0; |
| | | ieContext.initReqMsgSent = new InitializeRequestMsg( |
| | | getBaseDN(), serverID, source, this.initWindow); |
| | | getBaseDN(), getServerId(), source, getInitWindow()); |
| | | |
| | | // Publish Init request msg |
| | | broker.publish(ieContext.initReqMsgSent); |
| | |
| | | try |
| | | { |
| | | // Log starting |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get( |
| | | getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID)); |
| | | logError(NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(getBaseDNString(), |
| | | initTargetMsgReceived.getSenderID(), getServerId())); |
| | | |
| | | // Go into full update status |
| | | setNewStatus(StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT); |
| | | |
| | | // Acquire an import context if no already done (and initialize). |
| | | if (initTargetMsgReceived.getInitiatorID() != this.serverID) |
| | | if (initTargetMsgReceived.getInitiatorID() != getServerId()) |
| | | { |
| | | /* |
| | | The initTargetMsgReceived is for an import initiated by the remote |
| | |
| | | finally |
| | | { |
| | | Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get( |
| | | getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID, |
| | | getBaseDNString(), initTargetMsgReceived.getSenderID(), |
| | | getServerId(), |
| | | (ieContext.getException() == null ? "" |
| | | : ieContext.getException().getLocalizedMessage())); |
| | | logError(msg); |
| | |
| | | if (newStatus == ServerStatus.INVALID_STATUS) |
| | | { |
| | | logError(ERR_DS_CANNOT_CHANGE_STATUS.get(getBaseDNString(), |
| | | Integer.toString(serverID), status.toString(), event.toString())); |
| | | String.valueOf(getServerId()), status.toString(), event.toString())); |
| | | return; |
| | | } |
| | | |
| | |
| | | resetMonitoringCounters(); |
| | | } |
| | | |
| | | // Store new status |
| | | status = newStatus; |
| | | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Replication domain " + baseDN + " new status is: " |
| | | + status); |
| | | TRACER.debugInfo("Replication domain " + getBaseDN() |
| | | + " new status is: " + status); |
| | | } |
| | | |
| | | // Perform whatever actions are needed to apply properties for being |
| | |
| | | // check that at least one ReplicationServer did change its generation-id |
| | | checkGenerationID(-1); |
| | | |
| | | // Reconnect to the Replication Server so that it adopt our |
| | | // GenerationID. |
| | | disableService(); |
| | | enableService(); |
| | | // Reconnect to the Replication Server so that it adopts our GenerationID. |
| | | restartService(); |
| | | |
| | | // wait for the domain to reconnect. |
| | | int count = 0; |
| | |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("Server id " + serverID + " and domain " + baseDN |
| | | + " resetGenerationId " + generationIdNewValue); |
| | | TRACER.debugInfo("Server id " + getServerId() + " and domain " |
| | | + getBaseDN() + " resetGenerationId " + generationIdNewValue); |
| | | } |
| | | |
| | | ResetGenerationIdMsg genIdMessage = |
| | |
| | | if (!isConnected()) |
| | | { |
| | | Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(getBaseDNString(), |
| | | Integer.toString(serverID), |
| | | Integer.toString(getServerId()), |
| | | Long.toString(genIdMessage.getGenerationId())); |
| | | throw new DirectoryException(ResultCode.OTHER, message); |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Definitively stops the Replication Service. |
| | | */ |
| | | public void stopDomain() |
| | | { |
| | | disableService(); |
| | | domains.remove(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Change some ReplicationDomain parameters. |
| | | * |
| | | * @param config |
| | | * The new configuration that this domain should now use. |
| | | */ |
| | | public void changeConfig(ReplicationDomainCfg config) |
| | | protected void changeConfig(ReplicationDomainCfg config) |
| | | { |
| | | this.groupId = (byte) config.getGroupId(); |
| | | |
| | | if (broker != null && broker.changeConfig(config)) |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | | restartService(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Applies a configuration change to the attributes which should be be |
| | | * included in the ECL. |
| | |
| | | public void changeConfig(Set<String> includeAttributes, |
| | | Set<String> includeAttributesForDeletes) |
| | | { |
| | | if (setEclIncludes(serverID, includeAttributes, includeAttributesForDeletes) |
| | | && broker != null) |
| | | final boolean attrsModified = setEclIncludes( |
| | | getServerId(), includeAttributes, includeAttributesForDeletes); |
| | | if (attrsModified && broker != null) |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | | restartService(); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void restartService() |
| | | { |
| | | disableService(); |
| | | enableService(); |
| | | } |
| | | |
| | | /** |
| | | * This method should trigger an export of the replicated data. |
| | |
| | | Send an ack if it was requested and the group id is the same of the RS |
| | | one. Only Safe Read mode makes sense in DS for returning an ack. |
| | | */ |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | // Assured feature is supported starting from replication protocol V2 |
| | | if (msg.isAssured() |
| | | && broker.getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V2) |
| | | { |
| | | AssuredMode msgAssuredMode = msg.getAssuredMode(); |
| | | if (msgAssuredMode == AssuredMode.SAFE_READ_MODE) |
| | | if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE) |
| | | { |
| | | if (rsGroupId == groupId) |
| | | if (broker.getRsGroupId() == getGroupId()) |
| | | { |
| | | // Send the ack |
| | | AckMsg ackMsg = new AckMsg(msg.getCSN()); |
| | |
| | | ackMsg.setHasReplayError(true); |
| | | // -> replay error occurred in our server |
| | | List<Integer> idList = new ArrayList<Integer>(); |
| | | idList.add(serverID); |
| | | idList.add(getServerId()); |
| | | ackMsg.setFailedServers(idList); |
| | | } |
| | | broker.publish(ackMsg); |
| | |
| | | } |
| | | } |
| | | } |
| | | else if (assuredMode != AssuredMode.SAFE_DATA_MODE) |
| | | else if (getAssuredMode() != AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(Integer.toString(serverID), |
| | | msgAssuredMode.toString(), getBaseDNString(), msg.toString())); |
| | | logError(ERR_DS_UNKNOWN_ASSURED_MODE.get(String.valueOf(getServerId()), |
| | | msg.getAssuredMode().toString(), getBaseDNString(), |
| | | msg.toString())); |
| | | } |
| | | // Nothing to do in Assured safe data mode, only RS ack updates. |
| | | } |
| | |
| | | */ |
| | | protected void prepareWaitForAckIfAssuredEnabled(UpdateMsg msg) |
| | | { |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | /* |
| | | * If assured configured, set message accordingly to request an ack in the |
| | | * right assured mode. |
| | | * No ack requested for a RS with a different group id. Assured |
| | | * replication supported for the same locality, i.e: a topology working in |
| | | * the same |
| | | * geographical location). If we are connected to a RS which is not in our |
| | | * locality, no need to ask for an ack. |
| | | * No ack requested for a RS with a different group id. |
| | | * Assured replication supported for the same locality, |
| | | * i.e: a topology working in the same geographical location). |
| | | * If we are connected to a RS which is not in our locality, |
| | | * no need to ask for an ack. |
| | | */ |
| | | if (assured && rsGroupId == groupId) |
| | | if (needsAck()) |
| | | { |
| | | msg.setAssured(true); |
| | | msg.setAssuredMode(assuredMode); |
| | | if (assuredMode == AssuredMode.SAFE_DATA_MODE) |
| | | msg.setAssuredMode(getAssuredMode()); |
| | | if (getAssuredMode() == AssuredMode.SAFE_DATA_MODE) |
| | | { |
| | | msg.setSafeDataLevel(assuredSdLevel); |
| | | msg.setSafeDataLevel(getAssuredSdLevel()); |
| | | } |
| | | |
| | | // Add the assured message to the list of update that are waiting for acks |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean needsAck() |
| | | { |
| | | return isAssured() && broker.getRsGroupId() == getGroupId(); |
| | | } |
| | | |
| | | /** |
| | | * Wait for the processing of an assured message after it has been sent, if |
| | | * assured replication is configured, otherwise, do nothing. |
| | |
| | | protected void waitForAckIfAssuredEnabled(UpdateMsg msg) |
| | | throws TimeoutException |
| | | { |
| | | byte rsGroupId = broker.getRsGroupId(); |
| | | |
| | | // If assured mode configured, wait for acknowledgment for the just sent |
| | | // message |
| | | if (assured && rsGroupId == groupId) |
| | | if (needsAck()) |
| | | { |
| | | // Increment assured replication monitoring counters |
| | | switch (assuredMode) |
| | | switch (getAssuredMode()) |
| | | { |
| | | case SAFE_READ_MODE: |
| | | assuredSrSentUpdates.incrementAndGet(); |
| | |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("waitForAck method interrupted for replication " + |
| | | "baseDN: " + baseDN); |
| | | "baseDN: " + getBaseDN()); |
| | | } |
| | | break; |
| | | } |
| | | // Timeout ? |
| | | if ( (System.currentTimeMillis() - startTime) >= assuredTimeout ) |
| | | if ((System.currentTimeMillis() - startTime) >= getAssuredTimeout()) |
| | | { |
| | | /* |
| | | Timeout occurred, be sure that ack is not being received and if so, |
| | |
| | | } |
| | | |
| | | throw new TimeoutException("No ack received for message csn: " + csn |
| | | + " and replication domain: " + baseDN + " after " |
| | | + assuredTimeout + " ms."); |
| | | + " and replication domain: " + getBaseDN() + " after " |
| | | + getAssuredTimeout() + " ms."); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | // This exception may only be raised if assured replication is enabled |
| | | logError(NOTE_DS_ACK_TIMEOUT.get(getBaseDNString(), |
| | | Long.toString(assuredTimeout), update.toString())); |
| | | Long.toString(getAssuredTimeout()), update.toString())); |
| | | } |
| | | } |
| | | |
| | |
| | | * |
| | | * @return The GenerationID. |
| | | */ |
| | | public abstract long getGenerationID(); |
| | | public long getGenerationID() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | /** |
| | | * Subclasses should use this method to add additional monitoring |
| | | * information in the ReplicationDomain. |
| | | * Sets the generationId for this replication domain. |
| | | * |
| | | * @param generationId |
| | | * the generationId to set |
| | | */ |
| | | public void setGenerationID(long generationId) |
| | | { |
| | | this.generationId = generationId; |
| | | } |
| | | |
| | | /** |
| | | * Subclasses should use this method to add additional monitoring information |
| | | * in the ReplicationDomain. |
| | | * |
| | | * @return Additional monitoring attributes that will be added in the |
| | | * ReplicationDomain monitoring entry. |
| | |
| | | */ |
| | | public CSN getLastLocalChange() |
| | | { |
| | | return state.getCSN(serverID); |
| | | return state.getCSN(getServerId()); |
| | | } |
| | | |
| | | /** |
| | | * Gets and stores the assured replication configuration parameters. Returns a |
| | | * boolean indicating if the passed configuration has changed compared to |
| | | * previous values and the changes require a reconnection. |
| | | * |
| | | * @param config |
| | | * The configuration object |
| | | * @param allowReconnection |
| | | * Tells if one must reconnect if significant changes occurred |
| | | */ |
| | | protected void readAssuredConfig(ReplicationDomainCfg config, |
| | | boolean allowReconnection) |
| | | { |
| | | // Disconnect if required: changing configuration values before |
| | | // disconnection would make assured replication used immediately and |
| | | // disconnection could cause some timeouts error. |
| | | if (needReconnection(config) && allowReconnection) |
| | | { |
| | | disableService(); |
| | | |
| | | assuredConfig = config; |
| | | |
| | | enableService(); |
| | | } |
| | | } |
| | | |
| | | private boolean needReconnection(ReplicationDomainCfg cfg) |
| | | { |
| | | final AssuredMode assuredMode = getAssuredMode(); |
| | | switch (cfg.getAssuredType()) |
| | | { |
| | | case NOT_ASSURED: |
| | | if (isAssured()) |
| | | { |
| | | return true; |
| | | } |
| | | break; |
| | | case SAFE_DATA: |
| | | if (!isAssured() || assuredMode == SAFE_READ_MODE) |
| | | { |
| | | return true; |
| | | } |
| | | break; |
| | | case SAFE_READ: |
| | | if (!isAssured() || assuredMode == SAFE_DATA_MODE) |
| | | { |
| | | return true; |
| | | } |
| | | break; |
| | | } |
| | | |
| | | return isAssured() |
| | | && assuredMode == SAFE_DATA_MODE |
| | | && cfg.getAssuredSdLevel() != getAssuredSdLevel(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " " + this.baseDN + " " + this.serverID; |
| | | return getClass().getSimpleName() + " " + getBaseDN() + " " + getServerId(); |
| | | } |
| | | } |
| | |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.io.InputStream; |
| | | import java.io.OutputStream; |
| | | import java.util.Set; |
| | | import java.util.TreeSet; |
| | | |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | |
| | | public class DummyReplicationDomain extends ReplicationDomain |
| | | { |
| | | |
| | | private final long generationId; |
| | | |
| | | public DummyReplicationDomain(long generationId) |
| | | { |
| | | super(null, -1, 0); |
| | | this.generationId = generationId; |
| | | super(new DomainFakeCfg(null, -1, new TreeSet<String>()), generationId); |
| | | } |
| | | |
| | | @Override |
| | |
| | | return false; |
| | | } |
| | | |
| | | @Override |
| | | public long getGenerationID() |
| | | { |
| | | return this.generationId; |
| | | } |
| | | |
| | | } |
| | |
| | | replicationServer = new ReplicationServer(conf); |
| | | } |
| | | |
| | | private static DomainFakeCfg newConfig(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, long heartbeatInterval) |
| | | { |
| | | DomainFakeCfg fakeCfg = |
| | | new DomainFakeCfg(baseDN, serverID, replicationServers); |
| | | fakeCfg.setHeartbeatInterval(heartbeatInterval); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | return fakeCfg; |
| | | } |
| | | |
| | | /** |
| | | * This class is the minimum implementation of a Concrete ReplicationDomain |
| | | * used to be able to connect to the RS with a known genid. Also to be able |
| | |
| | | */ |
| | | private StringBuilder importString; |
| | | private int exportedEntryCount; |
| | | private long generationID = -1; |
| | | |
| | | public FakeReplicationDomain(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, long heartbeatInterval, |
| | | long generationId) throws ConfigException |
| | | { |
| | | super(baseDN, serverID, 100); |
| | | generationID = generationId; |
| | | DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers); |
| | | fakeCfg.setHeartbeatInterval(heartbeatInterval); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | startPublishService(fakeCfg); |
| | | super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval), |
| | | generationId); |
| | | startPublishService(getConfig()); |
| | | startListenService(); |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | @Override |
| | | public long getGenerationID() |
| | | { |
| | | return generationID; |
| | | } |
| | | |
| | | @Override |
| | | protected void importBackend(InputStream input) throws DirectoryException |
| | | { |
| | | byte[] buffer = new byte[1000]; |
| | |
| | | */ |
| | | public class TopologyViewTest extends ReplicationTestCase |
| | | { |
| | | // Server id definitions |
| | | /** Server id definitions */ |
| | | private static final int DS1_ID = 1; |
| | | private static final int DS2_ID = 2; |
| | | private static final int DS3_ID = 3; |
| | |
| | | private static final int RS2_ID = 52; |
| | | private static final int RS3_ID = 53; |
| | | |
| | | // Group id definitions |
| | | /** Group id definitions */ |
| | | private static final int DS1_GID = 1; |
| | | private static final int DS2_GID = 1; |
| | | private static final int DS3_GID = 2; |
| | |
| | | private static final int RS2_GID = 2; |
| | | private static final int RS3_GID = 3; |
| | | |
| | | // Assured conf definitions |
| | | /** Assured conf definitions */ |
| | | private static final AssuredType DS1_AT = AssuredType.NOT_ASSURED; |
| | | private static final int DS1_SDL = -1; |
| | | private static SortedSet<String> DS1_RU = new TreeSet<String>(); |
| | |
| | | private ReplicationServer rs2 = null; |
| | | private ReplicationServer rs3 = null; |
| | | |
| | | // The tracer object for the debug logger |
| | | /** The tracer object for the debug logger */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | private void debugInfo(String s) |
| | |
| | | return replicationDomain; |
| | | } |
| | | |
| | | // Definitions of steps for the test case |
| | | /** Definitions of steps for the test case */ |
| | | private static final int STEP_1 = 1; |
| | | private static final int STEP_2 = 2; |
| | | private static final int STEP_3 = 3; |
| | |
| | | } |
| | | |
| | | // Perform necessary conversions |
| | | boolean assuredFlag = (assuredType != AssuredType.NOT_ASSURED); |
| | | AssuredMode assMode = ( (assuredType == AssuredType.SAFE_READ) ? |
| | | AssuredMode.SAFE_READ_MODE : AssuredMode.SAFE_DATA_MODE); |
| | | List<String> urls = new ArrayList<String>(); |
| | | for(String str : refUrls) |
| | | { |
| | | urls.add(str); |
| | | } |
| | | boolean assuredFlag = assuredType != AssuredType.NOT_ASSURED; |
| | | AssuredMode assMode = assuredType == AssuredType.SAFE_READ |
| | | ? AssuredMode.SAFE_READ_MODE |
| | | : AssuredMode.SAFE_DATA_MODE; |
| | | |
| | | return new DSInfo(dsId, "dummy:1234", rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assMode, |
| | | (byte)assuredSdLevel, groupId, urls, eclIncludes, eclIncludes, protocolVersion); |
| | | (byte)assuredSdLevel, groupId, refUrls, eclIncludes, eclIncludes, protocolVersion); |
| | | } |
| | | |
| | | /** |
| | |
| | | /** |
| | | * Get the topo view of the current analyzed DS |
| | | */ |
| | | List<DSInfo> internalDsList = rd.getReplicasList(); |
| | | // Add info for DS itself: |
| | | // we need to clone the list as we don't want to modify the list kept |
| | | // inside the DS. |
| | | List<DSInfo> dsList = new ArrayList<DSInfo>(); |
| | | for (DSInfo aDsInfo : internalDsList) |
| | | { |
| | | dsList.add(aDsInfo); |
| | | } |
| | | int dsId = rd.getServerId(); |
| | | int rsId = rd.getRsServerId(); |
| | | ServerStatus status = rd.getStatus(); |
| | | boolean assuredFlag = rd.isAssured(); |
| | | AssuredMode assuredMode = rd.getAssuredMode(); |
| | | byte safeDataLevel = rd.getAssuredSdLevel(); |
| | | byte groupId = rd.getGroupId(); |
| | | List<String> refUrls = rd.getRefUrls(); |
| | | Set<String> eclInclude = rd.getEclIncludes(); |
| | | Set<String> eclIncludeForDeletes = rd.getEclIncludesForDeletes(); |
| | | short protocolVersion = ProtocolVersion.getCurrentVersion(); |
| | | DSInfo dsInfo = new DSInfo(dsId, "dummy:1234", rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, status, assuredFlag, assuredMode, |
| | | safeDataLevel, groupId, refUrls, eclInclude, eclIncludeForDeletes, protocolVersion); |
| | | dsList.add(dsInfo); |
| | | final DSInfo dsInfo = new DSInfo(rd.getServerId(), "dummy:1234", rd.getRsServerId(), |
| | | TEST_DN_WITH_ROOT_ENTRY_GENID, |
| | | rd.getStatus(), |
| | | rd.isAssured(), rd.getAssuredMode(), rd.getAssuredSdLevel(), |
| | | rd.getGroupId(), rd.getRefUrls(), |
| | | rd.getEclIncludes(), rd.getEclIncludesForDeletes(), |
| | | ProtocolVersion.getCurrentVersion()); |
| | | final List<DSInfo> dsList = new ArrayList<DSInfo>(rd.getReplicasList()); |
| | | dsList.add(dsInfo); |
| | | |
| | | TopoView dsTopoView = new TopoView(dsList, rd.getRsList()); |
| | | |
| | | /** |
| | | * Compare to what is the expected view |
| | | */ |
| | | |
| | | assertEquals(dsTopoView, theoricalTopoView, " in DSid=" + currentDsId); |
| | | } |
| | | } |
| | |
| | | */ |
| | | private class TopoView |
| | | { |
| | | private List<DSInfo> dsList = null; |
| | | private List<RSInfo> rsList = null; |
| | | private List<DSInfo> dsList; |
| | | private List<RSInfo> rsList; |
| | | |
| | | public TopoView(List<DSInfo> dsList, List<RSInfo> rsList) |
| | | { |
| | |
| | | @Override |
| | | public boolean equals(Object obj) |
| | | { |
| | | assertNotNull(obj); |
| | | assertFalse(obj.getClass() != this.getClass()); |
| | | |
| | | TopoView topoView = (TopoView) obj; |
| | | |
| | | // Check dsList |
| | | if (topoView.dsList.size() != dsList.size()) |
| | | if (obj == null || getClass() != obj.getClass()) |
| | | return false; |
| | | for (DSInfo dsInfo : topoView.dsList) |
| | | TopoView other = (TopoView) obj; |
| | | return checkLists(dsList, other.dsList) |
| | | && checkLists(rsList, other.rsList); |
| | | } |
| | | |
| | | private boolean checkLists(List<?> list, List<?> otherList) |
| | | { |
| | | if (otherList.size() != list.size()) |
| | | return false; |
| | | for (Object otherObj : otherList) |
| | | { |
| | | int found = 0; |
| | | for (DSInfo thisDsInfo : dsList) |
| | | for (Object thisObj : list) |
| | | { |
| | | if (thisDsInfo.equals(dsInfo)) |
| | | if (thisObj.equals(otherObj)) |
| | | found++; |
| | | } |
| | | // Not found |
| | |
| | | assertFalse(found > 1); |
| | | // Ok, found exactly once in the list, examine next structure |
| | | } |
| | | |
| | | // Check rsList |
| | | if (topoView.rsList.size() != rsList.size()) |
| | | return false; |
| | | for (RSInfo rsInfo : topoView.rsList) |
| | | { |
| | | int found = 0; |
| | | for (RSInfo thisRsInfo : rsList) |
| | | { |
| | | if (thisRsInfo.equals(rsInfo)) |
| | | found++; |
| | | } |
| | | // Not found |
| | | if (found == 0) |
| | | return false; |
| | | // Should never see twice as rsInfo structure in a dsList |
| | | assertFalse(found > 1); |
| | | // Ok, found exactly once in the list, examine next structure |
| | | } |
| | | |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | String dsStr = ""; |
| | | final StringBuilder sb = new StringBuilder("TopoView:"); |
| | | sb.append("\n----------------------------\n"); |
| | | sb.append("CONNECTED DS SERVERS:\n"); |
| | | for (DSInfo dsInfo : dsList) |
| | | { |
| | | dsStr += dsInfo.toString() + "\n----------------------------\n"; |
| | | sb.append(dsInfo).append("\n----------------------------\n"); |
| | | } |
| | | |
| | | String rsStr = ""; |
| | | sb.append("CONNECTED RS SERVERS:\n"); |
| | | for (RSInfo rsInfo : rsList) |
| | | { |
| | | rsStr += rsInfo.toString() + "\n----------------------------\n"; |
| | | sb.append(rsInfo).append("\n----------------------------\n"); |
| | | } |
| | | return sb.toString(); |
| | | } |
| | | |
| | | return ("TopoView:" + |
| | | "\n----------------------------\n" + "CONNECTED DS SERVERS:\n" + dsStr + |
| | | "CONNECTED RS SERVERS:\n" + rsStr); |
| | | private TopologyViewTest getOuterType() |
| | | { |
| | | return TopologyViewTest.this; |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private String getHostPort(int port) |
| | | { |
| | | return LOCAL_HOST_NAME + ":" + port; |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.AssuredType; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static java.util.Arrays.*; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | |
| | | * (no server state constructor version) |
| | | */ |
| | | private FakeReplicationDomain createFakeReplicationDomain(int serverId, |
| | | int groupId, int rsId, long generationId, boolean assured, |
| | | AssuredMode assuredMode, int safeDataLevel, long assuredTimeout, |
| | | int scenario) |
| | | throws Exception |
| | | int groupId, int rsId, long generationId, AssuredMode assuredMode, |
| | | int safeDataLevel, long assuredTimeout, int scenario) throws Exception |
| | | { |
| | | ReplicationDomainCfg config = newFakeCfg(serverId, getRsPort(rsId), groupId); |
| | | return createFakeReplicationDomain(config, groupId, rsId, generationId, assured, |
| | | assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState(), true); |
| | | return createFakeReplicationDomain(serverId, groupId, rsId, generationId, |
| | | assuredMode, safeDataLevel, assuredTimeout, scenario, |
| | | new ServerState(), true); |
| | | } |
| | | |
| | | private int getRsPort(int rsId) |
| | |
| | | * @throws Exception |
| | | */ |
| | | private FakeReplicationDomain createFakeReplicationDomain( |
| | | ReplicationDomainCfg config, int groupId, int rsId, long generationId, |
| | | boolean assured, AssuredMode assuredMode, int safeDataLevel, |
| | | int serverId, int groupId, int rsId, long generationId, |
| | | AssuredMode assuredMode, int safeDataLevel, |
| | | long assuredTimeout, int scenario, ServerState serverState, |
| | | boolean startListen) throws Exception |
| | | { |
| | | // Set port to right real RS according to its id |
| | | int rsPort = getRsPort(rsId); |
| | | final DomainFakeCfg config = newDomainConfig(serverId, groupId, rsId, |
| | | assuredMode, safeDataLevel, assuredTimeout); |
| | | return createFakeReplicationDomain(config, rsId, generationId, scenario, |
| | | serverState, startListen); |
| | | } |
| | | |
| | | FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain( |
| | | config.getBaseDN(), config.getServerId(), generationId, (byte) groupId, |
| | | assured, assuredMode, (byte) safeDataLevel, assuredTimeout, scenario, serverState); |
| | | private FakeReplicationDomain createFakeReplicationDomain( |
| | | ReplicationDomainCfg config, int rsId, long generationId, int scenario, |
| | | ServerState serverState, boolean startListen) throws Exception |
| | | { |
| | | FakeReplicationDomain fakeReplicationDomain = |
| | | new FakeReplicationDomain(config, generationId, scenario, serverState); |
| | | |
| | | fakeReplicationDomain.startPublishService(config); |
| | | if (startListen) |
| | |
| | | assertTrue(fakeReplicationDomain.isConnected()); |
| | | // Check connected server port |
| | | HostPort rd = HostPort.valueOf(fakeReplicationDomain.getReplicationServer()); |
| | | assertEquals(rd.getPort(), rsPort); |
| | | assertEquals(rd.getPort(), getRsPort(rsId)); |
| | | |
| | | return fakeReplicationDomain; |
| | | } |
| | | |
| | | private DomainFakeCfg newFakeCfg(int serverId, int rsPort, int groupId) throws Exception |
| | | private DomainFakeCfg newDomainConfig(int serverId, int groupId, int rsId, |
| | | AssuredMode assuredMode, int safeDataLevel, long assuredTimeout) |
| | | throws DirectoryException |
| | | { |
| | | DomainFakeCfg fakeCfg = new DomainFakeCfg( |
| | | DN.valueOf(TEST_ROOT_DN_STRING), serverId, newSortedSet("localhost:" + rsPort), groupId); |
| | | final int rsPort = getRsPort(rsId); |
| | | final DomainFakeCfg fakeCfg = new DomainFakeCfg( |
| | | DN.valueOf(TEST_ROOT_DN_STRING), serverId, newSortedSet("localhost:" + rsPort), |
| | | getAssuredType(assuredMode), |
| | | safeDataLevel, groupId, assuredTimeout, new TreeSet<String>()); |
| | | fakeCfg.setHeartbeatInterval(1000); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | return fakeCfg; |
| | | } |
| | | |
| | | private AssuredType getAssuredType(AssuredMode assuredMode) |
| | | { |
| | | if (assuredMode == null) |
| | | { |
| | | return AssuredType.NOT_ASSURED; |
| | | } |
| | | |
| | | switch (assuredMode) |
| | | { |
| | | case SAFE_READ_MODE: |
| | | return AssuredType.SAFE_READ; |
| | | case SAFE_DATA_MODE: |
| | | return AssuredType.SAFE_DATA; |
| | | } |
| | | throw new RuntimeException("Not implemented for " + assuredMode); |
| | | } |
| | | |
| | | /** |
| | | * Creates and connects a new fake replication server, using the passed scenario. |
| | | */ |
| | |
| | | { |
| | | /** The scenario this DS is expecting */ |
| | | private int scenario = -1; |
| | | private long generationId = -1; |
| | | |
| | | private CSNGenerator gen = null; |
| | | private CSNGenerator gen; |
| | | |
| | | /** False if a received update had assured parameters not as expected */ |
| | | private boolean everyUpdatesAreOk = true; |
| | |
| | | * behavior upon reception of updates) |
| | | * @throws org.opends.server.config.ConfigException |
| | | */ |
| | | public FakeReplicationDomain( |
| | | DN baseDN, |
| | | int serverID, |
| | | long generationId, |
| | | byte groupId, |
| | | boolean assured, |
| | | AssuredMode assuredMode, |
| | | byte safeDataLevel, |
| | | long assuredTimeout, |
| | | int scenario, |
| | | ServerState serverState) throws ConfigException |
| | | public FakeReplicationDomain(ReplicationDomainCfg config, |
| | | long generationId, int scenario, ServerState serverState) |
| | | throws ConfigException |
| | | { |
| | | super(baseDN, serverID, serverState); |
| | | this.generationId = generationId; |
| | | setGroupId(groupId); |
| | | setAssured(assured); |
| | | setAssuredMode(assuredMode); |
| | | setAssuredSdLevel(safeDataLevel); |
| | | setAssuredTimeout(assuredTimeout); |
| | | super(config, generationId, serverState); |
| | | this.scenario = scenario; |
| | | |
| | | gen = new CSNGenerator(serverID, 0L); |
| | | gen = new CSNGenerator(config.getServerId(), 0L); |
| | | } |
| | | |
| | | public boolean receivedUpdatesOk() |
| | |
| | | } |
| | | |
| | | @Override |
| | | public long getGenerationID() |
| | | { |
| | | return generationId; |
| | | } |
| | | |
| | | @Override |
| | | protected void importBackend(InputStream input) throws DirectoryException |
| | | { |
| | | // Not needed for this test |
| | |
| | | debugInfo("Fake DS " + getServerId() + " received update assured flag is wrong: " + updateMsg); |
| | | ok = false; |
| | | } |
| | | if (updateMsg.getAssuredMode() != getAssuredMode()) |
| | | { |
| | | if (isAssured() && updateMsg.getAssuredMode() != getAssuredMode()) |
| | | { // it is meaningless to have different assured mode when UpdateMsg is not assured |
| | | debugInfo("Fake DS " + getServerId() + " received update assured mode is wrong: " + updateMsg); |
| | | ok = false; |
| | | } |
| | |
| | | * @param assuredMode the expected assured mode of the incoming updates (also used for outgoing updates) |
| | | * @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates) |
| | | * @param groupId our group id |
| | | * @param baseDN the basedn we connect with, to the real RS |
| | | * @param baseDN the baseDN we connect with, to the real RS |
| | | * @param generationId the generation id we use at connection to real RS |
| | | */ |
| | | public FakeReplicationServer(int port, int serverId, boolean assured, |
| | |
| | | */ |
| | | |
| | | // Create real RS 1 |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, 0); |
| | | assertNotNull(rs1); |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0); |
| | | |
| | | /* |
| | | * Start main DS (the one which sends updates) |
| | |
| | | // Create and connect fake domain 1 to RS 1 |
| | | // Assured mode: SD, level 1 |
| | | fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, mainDsGid, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | /* |
| | | * Start one other fake DS |
| | |
| | | // this would timeout. If main DS group id is not the same as the real RS one, |
| | | // the update will even not come to real RS as assured |
| | | fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID, |
| | | DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | } |
| | | |
| | | /* |
| | |
| | | */ |
| | | |
| | | // Create real RS 1 |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, 0); |
| | | assertNotNull(rs1); |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0); |
| | | |
| | | /* |
| | | * Start main DS (the one which sends updates) |
| | |
| | | |
| | | // Create and connect fake domain 1 to RS 1 |
| | | fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | /* |
| | | * Start one other fake DS |
| | |
| | | if (otherFakeDS) |
| | | { |
| | | fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID, |
| | | otherFakeDsGenId, false, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | otherFakeDsGenId, null, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | } |
| | | |
| | | /* |
| | |
| | | */ |
| | | |
| | | // Create real RS 1 |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, 0); |
| | | assertNotNull(rs1); |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0); |
| | | |
| | | /* |
| | | * Start fake RS to make the RS have the default generation id |
| | |
| | | */ |
| | | int numberOfRealRSs = 3; |
| | | |
| | | // Create real RS 1 |
| | | // Create real RS 1, 2, 3 |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs1); |
| | | |
| | | // Create real RS 2 |
| | | rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs2); |
| | | |
| | | // Create real RS 3 |
| | | rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs3); |
| | | |
| | | /* |
| | | * Start DS that will send updates |
| | |
| | | // Wait for RSs to connect together |
| | | // Create and connect fake domain 1 to RS 1 |
| | | fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // Wait for RSs connections to be finished |
| | | // DS must see expected numbers of RSs |
| | |
| | | */ |
| | | |
| | | // Create real RS 1 |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, 0); |
| | | assertNotNull(rs1); |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0); |
| | | |
| | | /******************* |
| | | * Start main DS 1 (the one which sends updates) |
| | |
| | | // Create and connect DS 1 to RS 1 |
| | | // Assured mode: SR |
| | | fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | /* |
| | | * Send a first assured safe read update |
| | |
| | | // Create and connect DS 2 to RS 1 |
| | | // Assured mode: SR |
| | | ServerState serverState = fakeRd1.getServerState(); |
| | | ReplicationDomainCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID), DEFAULT_GID); |
| | | fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO, serverState, true); |
| | | fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO, serverState, true); |
| | | |
| | | // Wait for connections to be established |
| | | waitForStableTopo(fakeRd1, 1, 1); |
| | |
| | | */ |
| | | |
| | | // Create real RS 1 |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, 0); |
| | | assertNotNull(rs1); |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, testCase, 0); |
| | | |
| | | /* |
| | | * Start main DS 1 (the one which sends updates) |
| | | */ |
| | | fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | /* |
| | | * Start another fake DS 2 connected to RS |
| | | */ |
| | | fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO); |
| | | |
| | | /* |
| | | * Start another fake DS 3 connected to RS |
| | | */ |
| | | fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID, |
| | | otherFakeDsGenId, (otherFakeDsGid == DEFAULT_GID), |
| | | AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | otherFakeDsScen); |
| | | otherFakeDsGenId, otherFakeDsGid == DEFAULT_GID ? AssuredMode.SAFE_READ_MODE : null, |
| | | 1, LONG_TIMEOUT, otherFakeDsScen); |
| | | |
| | | /* |
| | | * Start fake RS (RS 1) connected to RS |
| | |
| | | */ |
| | | int numberOfRealRSs = 4; |
| | | |
| | | // Create real RS 1 |
| | | // Create real RS 1, 2, 3 |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs1); |
| | | |
| | | // Create real RS 2 |
| | | rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs2); |
| | | |
| | | // Create real RS 3 |
| | | rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs3); |
| | | |
| | | // Create real RS 4 (different GID 2) |
| | | rs4 = createReplicationServer(RS4_ID, OTHER_GID_BIS, SMALL_TIMEOUT, |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs4); |
| | | |
| | | /* |
| | | * Start DS 1 that will send assured updates |
| | |
| | | // Wait for RSs to connect together |
| | | // Create and connect fake domain 1 to RS 1 |
| | | fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // Wait for connections to be finished |
| | | // DS must see expected numbers of DSs/RSs |
| | |
| | | |
| | | // DS 2 connected to RS 1 |
| | | fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO); |
| | | |
| | | // DS 3 connected to RS 2 |
| | | fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, DEFAULT_GID, RS2_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO); |
| | | |
| | | // DS 4 connected to RS 3 |
| | | fakeRDs[4] = createFakeReplicationDomain(FDS4_ID, DEFAULT_GID, RS3_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO); |
| | | |
| | | // DS 5 connected to RS 3 |
| | | fakeRDs[5] = createFakeReplicationDomain(FDS5_ID, DEFAULT_GID, RS3_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO); |
| | | |
| | | /* |
| | | * Start DSs that will not receive updates from DS 1 as assured because |
| | |
| | | |
| | | // DS 6 connected to RS 1 |
| | | fakeRDs[6] = createFakeReplicationDomain(FDS6_ID, OTHER_GID, RS1_ID, |
| | | DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // DS 7 connected to RS 2 |
| | | fakeRDs[7] = createFakeReplicationDomain(FDS7_ID, OTHER_GID, RS2_ID, |
| | | DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // DS 8 connected to RS 3 |
| | | fakeRDs[8] = createFakeReplicationDomain(FDS8_ID, OTHER_GID, RS3_ID, |
| | | DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // DS 9 (GID 2) connected to RS 4 |
| | | fakeRDs[9] = createFakeReplicationDomain(FDS9_ID, OTHER_GID_BIS, RS4_ID, |
| | | DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | /* |
| | | * Start DSs that will not receive updates from DS 1 because |
| | |
| | | |
| | | // DS 10 connected to RS 1 |
| | | fakeRDs[10] = createFakeReplicationDomain(FDS10_ID, DEFAULT_GID, RS1_ID, |
| | | OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // DS 11 connected to RS 2 |
| | | fakeRDs[11] = createFakeReplicationDomain(FDS11_ID, DEFAULT_GID, RS2_ID, |
| | | OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // DS 12 connected to RS 3 |
| | | fakeRDs[12] = createFakeReplicationDomain(FDS12_ID, DEFAULT_GID, RS3_ID, |
| | | OTHER_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | OTHER_GENID, null, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // Wait for connections to be finished |
| | | // DS must see expected numbers of DSs/RSs |
| | |
| | | */ |
| | | int numberOfRealRSs = 2; |
| | | |
| | | // Create real RS 1 |
| | | // Create real RS 1, 2 |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs1); |
| | | |
| | | // Create real RS 2 |
| | | rs2 = createReplicationServer(RS2_ID, OTHER_GID, SMALL_TIMEOUT, |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs2); |
| | | |
| | | /* |
| | | * Start DSs with GID=DEFAULT_GID, connected to RS1 |
| | |
| | | |
| | | // DS 1 connected to RS 1 |
| | | fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // DS 2 connected to RS 1 |
| | | fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO); |
| | | |
| | | /* |
| | | * Start DSs with GID=OTHER_GID, connected to RS2 |
| | |
| | | |
| | | // DS 3 connected to RS 2 |
| | | fakeRDs[3] = createFakeReplicationDomain(FDS3_ID, OTHER_GID, RS2_ID, |
| | | DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO); |
| | | DEFAULT_GENID, null, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO); |
| | | |
| | | // DS 4 connected to RS 3 |
| | | fakeRDs[4] = createFakeReplicationDomain(FDS4_ID, OTHER_GID, RS2_ID, |
| | | DEFAULT_GENID, false, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO); |
| | | DEFAULT_GENID, null, 1, LONG_TIMEOUT, REPLY_OK_DS_SCENARIO); |
| | | |
| | | // Wait for connections to be finished |
| | | // DS must see expected numbers of DSs/RSs |
| | |
| | | */ |
| | | int numberOfRealRSs = 2; |
| | | |
| | | // Create real RS 1 |
| | | // Create real RS 1, 2 |
| | | rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT + 1000, // Be sure DS2 timeout is seen from DS1 |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs1); |
| | | |
| | | // Create real RS 2 |
| | | rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT, |
| | | testCase, numberOfRealRSs); |
| | | assertNotNull(rs2); |
| | | |
| | | /* |
| | | * Start 2 fake DSs |
| | |
| | | |
| | | // DS 1 connected to RS 1 |
| | | fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // DS 2 connected to RS 2 |
| | | fakeRDs[2] = createFakeReplicationDomain(FDS2_ID, fakeDsGid, RS2_ID, |
| | | fakeDsGenId, (fakeDsGid == DEFAULT_GID), |
| | | AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, fakeDsScen); |
| | | fakeDsGenId, fakeDsGid == DEFAULT_GID ? AssuredMode.SAFE_READ_MODE : null, |
| | | 1, LONG_TIMEOUT, fakeDsScen); |
| | | |
| | | // Wait for connections to be finished |
| | | // DS must see expected numbers of DSs/RSs |
| | |
| | | |
| | | // DS 1 connected to RS 1 |
| | | fakeRDs[1] = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | TIMEOUT_DS_SCENARIO); |
| | | DEFAULT_GENID, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, TIMEOUT_DS_SCENARIO); |
| | | |
| | | // DS 2 connected to RS 1 with low window to easily put it in DEGRADED status |
| | | DomainFakeCfg config = newFakeCfg(FDS2_ID, getRsPort(RS1_ID), DEFAULT_GID); |
| | | final DomainFakeCfg config = newDomainConfig(FDS2_ID, DEFAULT_GID, RS1_ID, |
| | | AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT); |
| | | config.setWindowSize(2); |
| | | fakeRDs[2] = createFakeReplicationDomain(config, DEFAULT_GID, RS1_ID, |
| | | DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, |
| | | REPLY_OK_DS_SCENARIO, new ServerState(), false); |
| | | fakeRDs[2] = createFakeReplicationDomain(config, RS1_ID, DEFAULT_GENID, |
| | | REPLY_OK_DS_SCENARIO, new ServerState(), false); |
| | | |
| | | // Wait for connections to be finished |
| | | // DS must see expected numbers of DSs/RSs |
| | |
| | | * |
| | | * |
| | | * Copyright 2008-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2013 ForgeRock AS |
| | | * Portions Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | |
| | | private int exportedEntryCount; |
| | | |
| | | private long generationID = 1; |
| | | |
| | | private FakeReplicationDomain(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, int window, long heartbeatInterval) |
| | | throws ConfigException |
| | | { |
| | | super(baseDN, serverID, 100); |
| | | super(newConfig(baseDN, serverID, replicationServers, window, |
| | | heartbeatInterval), 1); |
| | | startPublishService(getConfig()); |
| | | startListenService(); |
| | | } |
| | | |
| | | private static DomainFakeCfg newConfig(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, int window, long heartbeatInterval) |
| | | { |
| | | DomainFakeCfg fakeCfg = new DomainFakeCfg(baseDN, serverID, replicationServers); |
| | | fakeCfg.setHeartbeatInterval(heartbeatInterval); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | fakeCfg.setWindowSize(window); |
| | | startPublishService(fakeCfg); |
| | | startListenService(); |
| | | return fakeCfg; |
| | | } |
| | | |
| | | public FakeReplicationDomain(DN baseDN, int serverID, |
| | |
| | | } |
| | | |
| | | @Override |
| | | public long getGenerationID() |
| | | { |
| | | return generationID; |
| | | } |
| | | |
| | | @Override |
| | | protected void importBackend(InputStream input) throws DirectoryException |
| | | { |
| | | byte[] buffer = new byte[1000]; |
| | |
| | | return true; |
| | | } |
| | | |
| | | public void setGenerationID(long newGenerationID) |
| | | { |
| | | generationID = newGenerationID; |
| | | } |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2008-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2013 ForgeRock AS |
| | | * Portions Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.service; |
| | | |
| | |
| | | SortedSet<String> replicationServers, long heartbeatInterval, |
| | | BlockingQueue<UpdateMsg> queue) throws ConfigException |
| | | { |
| | | super(baseDN, serverID, 100); |
| | | super(newConfig(baseDN, serverID, replicationServers, heartbeatInterval), 1); |
| | | startPublishService(getConfig()); |
| | | startListenService(); |
| | | this.queue = queue; |
| | | } |
| | | |
| | | private static DomainFakeCfg newConfig(DN baseDN, int serverID, |
| | | SortedSet<String> replicationServers, long heartbeatInterval) |
| | | { |
| | | final DomainFakeCfg fakeCfg = |
| | | new DomainFakeCfg(baseDN, serverID, replicationServers); |
| | | fakeCfg.setHeartbeatInterval(heartbeatInterval); |
| | | fakeCfg.setChangetimeHeartbeatInterval(500); |
| | | startPublishService(fakeCfg); |
| | | startListenService(); |
| | | this.queue = queue; |
| | | return fakeCfg; |
| | | } |
| | | |
| | | private static final int IMPORT_SIZE = 100000000; |
| | |
| | | } |
| | | |
| | | @Override |
| | | public long getGenerationID() |
| | | { |
| | | return 1; |
| | | } |
| | | |
| | | @Override |
| | | protected void importBackend(InputStream input) throws DirectoryException |
| | | { |
| | | long startDate = System.currentTimeMillis(); |