OPENDJ-1205 (CR-4428) Remove network layer from External ChangeLog implementation
After OPENDJ-1206, I could remove all old External ChangeLog (or ECL for short) related classes. I could also remove annoyingly strange APIs from various places of replication code.
ECLUpdateMsg.java, ServerStartECLMsg.java, StartECLSessionMsg.java, ECLServerHandler.java, ECLServerWriter.java, ECLSearchOperation.java, ECLWorkflowElement.java, ExternalChangeLogTest.java: REMOVED
Also removed package src/server/org/opends/server/workflowelement/externalchangelog as part of these deletes.
replication*.properties:
Removed now unused SEVERE_ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.
ReplicationMsg.java:
Deprecated old ECL message types.
ReplicationBroker.java, ReplicationServer.java:
Removed all ECL related code.
ReplicationServerDomain.java:
Removed the now unused otherHandlers field, stopServer(MessageHandler), registerHandler(MessageHandler), unRegisterHandler(MessageHandler), unregisterOtherHandler(), getOldestState() and getLatestDomainTrimDate()
ServerHandler.java
Reduced visibility has much as possible after removing ECLServerHandler.
MultimasterReplication.java:
Renamed getECLDisabledDomains() to , included "cn=changelog" by default + changed client code to not add it anymore.
LastCookieVirtualProvider.java:
Consequence of the change to MultimasterReplication.
ChangelogBackend.java:
Consequence of the change to MultimasterReplication.getECLDisabledDomains().
Removed SearchParams.requestType and replaced its usage with the new isCookieBasedSearch()
MultiDomainServerState.java, ServerState.java, ReplicationDomainDB.java, FileChangelogDB.java, JEChangelogDB.java:
Removed now unused methods / fields.
Made private methods / fields that are now only used inside their declaration classes.
ChangelogBackendTestCase.java, SynchronizationMsgTest.java, FileReplicaDBTest.java:
Updated or removed tests as a consequences of the whole change.
9 files deleted
16 files modified
| | |
| | | import static org.opends.server.config.ConfigConstants.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*; |
| | | import static org.opends.server.replication.plugin.MultimasterReplication.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.LDIFWriter.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | | import org.opends.server.replication.protocol.LDAPUpdateMsg; |
| | | import org.opends.server.replication.protocol.ModifyCommonMsg; |
| | | import org.opends.server.replication.protocol.ModifyDNMsg; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | |
| | | { |
| | | return 1; |
| | | } |
| | | // Search with cookie mode to count all update messages |
| | | final SearchParams params = new SearchParams(getExcludedDomains()); |
| | | params.requestType = REQUEST_TYPE_FROM_COOKIE; |
| | | |
| | | // Search with cookie mode to count all update messages cross replica |
| | | final SearchParams params = new SearchParams(getExcludedChangelogDomains()); |
| | | params.cookie = new MultiDomainServerState(); |
| | | NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation(); |
| | | try |
| | | { |
| | | final NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation(); |
| | | search0(params, searchOp); |
| | | return searchOp.numSubordinates; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES.get( |
| | | CHANGELOG_BASE_DN.toString(), stackTraceToSingleLineString(e))); |
| | | } |
| | | return searchOp.numSubordinates; |
| | | } |
| | | |
| | | private Set<String> getExcludedDomains() |
| | | { |
| | | final Set<String> domains = MultimasterReplication.getECLDisabledDomains(); |
| | | domains.add(DN_EXTERNAL_CHANGELOG_ROOT); |
| | | return domains; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | private SearchParams buildSearchParameters(final SearchOperation searchOperation) throws DirectoryException |
| | | { |
| | | final SearchParams params = new SearchParams(getExcludedDomains()); |
| | | final SearchParams params = new SearchParams(getExcludedChangelogDomains()); |
| | | final ExternalChangelogRequestControl eclRequestControl = |
| | | searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER); |
| | | if (eclRequestControl == null) |
| | | if (eclRequestControl != null) |
| | | { |
| | | params.requestType = REQUEST_TYPE_FROM_CHANGE_NUMBER; |
| | | } |
| | | else |
| | | { |
| | | params.requestType = REQUEST_TYPE_FROM_COOKIE; |
| | | params.cookie = eclRequestControl.getCookie(); |
| | | } |
| | | return params; |
| | |
| | | */ |
| | | static class SearchParams |
| | | { |
| | | private ECLRequestType requestType; |
| | | private final Set<String> excludedBaseDNs; |
| | | private long lowestChangeNumber = -1; |
| | | private long highestChangeNumber = -1; |
| | |
| | | */ |
| | | SearchParams() |
| | | { |
| | | this.excludedBaseDNs = Collections.emptySet(); |
| | | this(Collections.<String> emptySet()); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns whether this search is cookie based. |
| | | * |
| | | * @return true if this search is cookie-based, false if this search is |
| | | * change number-based. |
| | | */ |
| | | private boolean isCookieBasedSearch() |
| | | { |
| | | return cookie != null; |
| | | } |
| | | |
| | | /** |
| | | * Indicates if provided change number is compatible with last change |
| | | * number. |
| | | * |
| | |
| | | private void search0(final SearchParams searchParams, final SearchOperation searchOperation) |
| | | throws DirectoryException, ChangelogException |
| | | { |
| | | switch (searchParams.requestType) |
| | | if (searchParams.isCookieBasedSearch()) |
| | | { |
| | | case REQUEST_TYPE_FROM_CHANGE_NUMBER: |
| | | searchFromChangeNumber(searchParams, searchOperation); |
| | | break; |
| | | case REQUEST_TYPE_FROM_COOKIE: |
| | | searchFromCookie(searchParams, searchOperation); |
| | | break; |
| | | default: |
| | | // not handled |
| | | } |
| | | else |
| | | { |
| | | searchFromChangeNumber(searchParams, searchOperation); |
| | | } |
| | | } |
| | | |
| | |
| | | import org.opends.server.api.VirtualAttributeProvider; |
| | | import org.opends.server.core.SearchOperation; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | | import static org.opends.messages.ExtensionMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.plugin.MultimasterReplication.*; |
| | | |
| | | /** |
| | | * This class implements a virtual attribute provider in the root-dse entry |
| | |
| | | { |
| | | if (replicationServer != null) |
| | | { |
| | | // Set a list of excluded domains (also exclude 'cn=changelog' itself) |
| | | Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains(); |
| | | excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | |
| | | String newestCookie = replicationServer.getNewestECLCookie(excludedDomains).toString(); |
| | | String newestCookie = replicationServer.getNewestECLCookie(getExcludedChangelogDomains()).toString(); |
| | | final ByteString cookie = ByteString.valueOf(newestCookie); |
| | | return Collections.singleton(AttributeValues.create(cookie, cookie)); |
| | | } |
| | |
| | | * when an error occurs |
| | | * @return the split state. |
| | | */ |
| | | public static Map<DN, ServerState> splitGenStateToServerStates( |
| | | private static Map<DN, ServerState> splitGenStateToServerStates( |
| | | String multiDomainServerState) throws DirectoryException |
| | | { |
| | | Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>(); |
| | |
| | | return saved; |
| | | } |
| | | |
| | | /** |
| | | * Build a copy of the ServerState with only CSNs older than a provided |
| | | * timestamp. This is used when building the initial Cookie in the External |
| | | * Changelog, to cope with purged changes. |
| | | * |
| | | * @param timestamp |
| | | * The timestamp to compare the ServerState against |
| | | * @return a copy of the ServerState which only contains the CSNs older than |
| | | * csn. |
| | | */ |
| | | public ServerState duplicateOnlyOlderThan(long timestamp) |
| | | { |
| | | final CSN csn = new CSN(timestamp, 0, 0); |
| | | final ServerState newState = new ServerState(); |
| | | for (CSN change : serverIdToCSN.values()) |
| | | { |
| | | if (change.isOlderThan(csn)) |
| | | { |
| | | newState.serverIdToCSN.put(change.getServerId(), change); |
| | | } |
| | | } |
| | | return newState; |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.util.*; |
| | | import java.util.ArrayList; |
| | | import java.util.HashSet; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | |
| | | import org.opends.server.admin.server.ConfigurationDeleteListener; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; |
| | | import org.opends.server.api.*; |
| | | import org.opends.server.api.Backend; |
| | | import org.opends.server.api.BackupTaskListener; |
| | | import org.opends.server.api.ExportTaskListener; |
| | | import org.opends.server.api.ImportTaskListener; |
| | | import org.opends.server.api.RestoreTaskListener; |
| | | import org.opends.server.api.SynchronizationProvider; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.service.DSRSShutdownSync; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.types.operation.*; |
| | | import org.opends.server.types.BackupConfig; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.Control; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.LDIFExportConfig; |
| | | import org.opends.server.types.LDIFImportConfig; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.Operation; |
| | | import org.opends.server.types.RestoreConfig; |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.types.SynchronizationProviderResult; |
| | | import org.opends.server.types.operation.PluginOperation; |
| | | import org.opends.server.types.operation.PostOperationAddOperation; |
| | | import org.opends.server.types.operation.PostOperationDeleteOperation; |
| | | import org.opends.server.types.operation.PostOperationModifyDNOperation; |
| | | import org.opends.server.types.operation.PostOperationModifyOperation; |
| | | import org.opends.server.types.operation.PostOperationOperation; |
| | | import org.opends.server.types.operation.PreOperationAddOperation; |
| | | import org.opends.server.types.operation.PreOperationDeleteOperation; |
| | | import org.opends.server.types.operation.PreOperationModifyDNOperation; |
| | | import org.opends.server.types.operation.PreOperationModifyOperation; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Gets the Set of baseDN of the domains which are disabled for the external |
| | | * changelog. |
| | | * Gets the Set of domain baseDN which are disabled for the external changelog. |
| | | * |
| | | * @return The Set of baseDNs which are disabled for the external changelog. |
| | | * @return The Set of domain baseDNs which are disabled for the external changelog. |
| | | */ |
| | | public static Set<String> getECLDisabledDomains() |
| | | public static Set<String> getExcludedChangelogDomains() |
| | | { |
| | | final Set<String> disabledBaseDNs = new HashSet<String>(domains.size()); |
| | | final Set<String> disabledBaseDNs = new HashSet<String>(domains.size() + 1); |
| | | disabledBaseDNs.add(DN_EXTERNAL_CHANGELOG_ROOT); |
| | | for (LDAPReplicationDomain domain : domains.values()) |
| | | { |
| | | if (!domain.isECLEnabled()) |
| | |
| | | static final byte MSG_TYPE_GENERIC_UPDATE = 29; |
| | | |
| | | // Added for protocol version 3 |
| | | @Deprecated |
| | | static final byte MSG_TYPE_START_ECL = 30; |
| | | @Deprecated |
| | | static final byte MSG_TYPE_START_ECL_SESSION = 31; |
| | | @Deprecated |
| | | static final byte MSG_TYPE_ECL_UPDATE = 32; |
| | | static final byte MSG_TYPE_CT_HEARTBEAT = 33; |
| | | |
| | |
| | | case MSG_TYPE_GENERIC_UPDATE: |
| | | return new UpdateMsg(buffer); |
| | | case MSG_TYPE_START_ECL: |
| | | return new ServerStartECLMsg(buffer); |
| | | case MSG_TYPE_START_ECL_SESSION: |
| | | return new StartECLSessionMsg(buffer); |
| | | case MSG_TYPE_ECL_UPDATE: |
| | | return new ECLUpdateMsg(buffer); |
| | | // Legacy versions never sent such messages to other instances (other JVMs). |
| | | // They were only used in the combined DS-RS case. |
| | | // It is safe to totally ignore these values since code now uses the ChangelogBackend. |
| | | return null; |
| | | case MSG_TYPE_CT_HEARTBEAT: |
| | | return new ChangeTimeHeartbeatMsg(buffer, protocolVersion); |
| | | case MSG_TYPE_REPL_SERVER_START_DS: |
| | |
| | | session, queueSize, this, rcvWindow); |
| | | rsHandler.startFromRemoteRS((ReplServerStartMsg) msg); |
| | | } |
| | | else if (msg instanceof ServerStartECLMsg) |
| | | { |
| | | ECLServerHandler eclHandler = new ECLServerHandler( |
| | | session, queueSize, this, rcvWindow); |
| | | eclHandler.startFromRemoteServer((ServerStartECLMsg) msg); |
| | | } |
| | | else |
| | | { |
| | | // We did not recognize the message, close session as what |
| | |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.*; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.MonitorMsg; |
| | | import org.opends.server.replication.protocol.MonitorRequestMsg; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.HostPort; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | |
| | | private final Map<Integer, ReplicationServerHandler> connectedRSs = |
| | | new ConcurrentHashMap<Integer, ReplicationServerHandler>(); |
| | | |
| | | private final Queue<MessageHandler> otherHandlers = |
| | | new ConcurrentLinkedQueue<MessageHandler>(); |
| | | |
| | | private final ReplicationDomainDB domainDB; |
| | | /** The ReplicationServer that created the current instance. */ |
| | | private final ReplicationServer localReplicationServer; |
| | |
| | | addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers); |
| | | } |
| | | } |
| | | |
| | | // Push the message to the other subscribing handlers |
| | | for (MessageHandler mHandler : otherHandlers) { |
| | | mHandler.add(updateMsg); |
| | | } |
| | | } |
| | | |
| | | private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler, |
| | |
| | | { |
| | | unregisterServerHandler(sHandler, shutdown, true); |
| | | } |
| | | else if (otherHandlers.contains(sHandler)) |
| | | { |
| | | unregisterOtherHandler(sHandler); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private void unregisterOtherHandler(MessageHandler mHandler) |
| | | { |
| | | unRegisterHandler(mHandler); |
| | | mHandler.shutdown(); |
| | | } |
| | | |
| | | private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown, |
| | | boolean isDirectoryServer) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Stop the handler. |
| | | * @param mHandler The handler to stop. |
| | | */ |
| | | public void stopServer(MessageHandler mHandler) |
| | | { |
| | | // TODO JNR merge with stopServer(ServerHandler, boolean) |
| | | if (debugEnabled()) |
| | | { |
| | | debug("stopServer() on the message handler " + mHandler); |
| | | } |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | | * instance this code is called from dying ServerReader but also dying |
| | | * ServerWriter at the same time, or from a thread that wants to shut down |
| | | * the handler. So use a thread safe flag to know if the job must be done |
| | | * or not (is already being processed or not). |
| | | */ |
| | | if (!mHandler.engageShutdown()) |
| | | // Only do this once (prevent other thread to enter here again) |
| | | { |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | | } |
| | | catch (InterruptedException ex) |
| | | { |
| | | // We can't deal with this here, so re-interrupt thread so that it is |
| | | // caught during subsequent IO. |
| | | Thread.currentThread().interrupt(); |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | if (otherHandlers.contains(mHandler)) |
| | | { |
| | | unregisterOtherHandler(mHandler); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | release(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Unregister this handler from the list of handlers registered to this |
| | | * domain. |
| | | * @param sHandler the provided handler to unregister. |
| | |
| | | return attributes; |
| | | } |
| | | |
| | | /** |
| | | * Register in the domain an handler that subscribes to changes. |
| | | * @param mHandler the provided subscribing handler. |
| | | */ |
| | | public void registerHandler(MessageHandler mHandler) |
| | | { |
| | | this.otherHandlers.add(mHandler); |
| | | } |
| | | |
| | | /** |
| | | * Unregister from the domain an handler. |
| | | * @param mHandler the provided unsubscribing handler. |
| | | * @return Whether this handler has been unregistered with success. |
| | | */ |
| | | public boolean unRegisterHandler(MessageHandler mHandler) |
| | | { |
| | | return this.otherHandlers.remove(mHandler); |
| | | } |
| | | |
| | | /** |
| | | * Returns the oldest known state for the domain, made of the oldest CSN |
| | | * stored for each serverId. |
| | | * <p> |
| | | * Note: Because the replication changelogDB trimming always keep one change |
| | | * whatever its date, the CSN contained in the returned state can be very old. |
| | | * |
| | | * @return the start state of the domain. |
| | | */ |
| | | public ServerState getOldestState() |
| | | { |
| | | return domainDB.getDomainOldestCSNs(baseDN); |
| | | } |
| | | |
| | | private void sendTopologyMsg(String type, ServerHandler handler, |
| | | TopologyMsg msg) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get the latest (more recent) trim date of the changelog dbs associated |
| | | * to this domain. |
| | | * @return The latest trim date. |
| | | */ |
| | | public long getLatestDomainTrimDate() |
| | | { |
| | | return domainDB.getDomainLatestTrimDate(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Return the monitor instance name of the ReplicationServer that created the |
| | | * current instance. |
| | |
| | | /** |
| | | * The associated ServerWriter that sends messages to the remote server. |
| | | */ |
| | | protected ServerWriter writer; |
| | | private ServerWriter writer; |
| | | |
| | | /** |
| | | * The associated ServerReader that receives messages from the remote server. |
| | | */ |
| | | protected ServerReader reader; |
| | | private ServerReader reader; |
| | | |
| | | // window |
| | | private int rcvWindow; |
| | |
| | | /** |
| | | * Semaphore that the writer uses to control the flow to the remote server. |
| | | */ |
| | | protected Semaphore sendWindow; |
| | | private Semaphore sendWindow; |
| | | /** |
| | | * The initial size of the sending window. |
| | | */ |
| | | protected int sendWindowSize; |
| | | private int sendWindowSize; |
| | | /** |
| | | * remote generation id. |
| | | */ |
| | |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | protected long heartbeatInterval = 0; |
| | | protected long heartbeatInterval; |
| | | |
| | | /** |
| | | * The thread that will send heartbeats. |
| | |
| | | /** |
| | | * Set when ServerWriter is stopping. |
| | | */ |
| | | protected volatile boolean shutdownWriter = false; |
| | | private volatile boolean shutdownWriter; |
| | | |
| | | /** |
| | | * Weight of this remote server. |
| | |
| | | } |
| | | |
| | | // Window stats |
| | | attributes.add(Attributes.create("max-send-window", String |
| | | .valueOf(sendWindowSize))); |
| | | attributes.add(Attributes.create("current-send-window", String |
| | | .valueOf(sendWindow.availablePermits()))); |
| | | attributes.add(Attributes.create("max-rcv-window", String |
| | | .valueOf(maxRcvWindow))); |
| | | attributes.add(Attributes.create("current-rcv-window", String |
| | | .valueOf(rcvWindow))); |
| | | attributes.add(Attributes.create("max-send-window", String.valueOf(sendWindowSize))); |
| | | attributes.add(Attributes.create("current-send-window", String.valueOf(sendWindow.availablePermits()))); |
| | | attributes.add(Attributes.create("max-rcv-window", String.valueOf(maxRcvWindow))); |
| | | attributes.add(Attributes.create("current-rcv-window", String.valueOf(rcvWindow))); |
| | | |
| | | // Encryption |
| | | attributes.add(Attributes.create("ssl-encryption", String |
| | | .valueOf(session.isEncrypted()))); |
| | | attributes.add(Attributes.create("ssl-encryption", String.valueOf(session.isEncrypted()))); |
| | | |
| | | // Data generation |
| | | attributes.add(Attributes.create("generation-id", String |
| | | .valueOf(generationId))); |
| | | attributes.add(Attributes.create("generation-id", String.valueOf(generationId))); |
| | | |
| | | return attributes; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Requests to shutdown the writer. |
| | | */ |
| | | protected void shutdownWriter() |
| | | { |
| | | shutdownWriter = true; |
| | | } |
| | | |
| | | /** |
| | | * Shutdown This ServerHandler. |
| | | */ |
| | | @Override |
| | | public void shutdown() |
| | | { |
| | | shutdownWriter(); |
| | | shutdownWriter = true; |
| | | setConsumerActive(false); |
| | | super.shutdown(); |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Log the messages involved in the Topology/StartSession handshake. |
| | | * @param inStartECLSessionMsg The message received first. |
| | | */ |
| | | protected void logStartECLSessionHandshake( |
| | | StartECLSessionMsg inStartECLSessionMsg) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName() |
| | | + ", " + getClass().getSimpleName() + " " + this + " :" |
| | | + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Process a Ack message received. |
| | | * @param ack the message received. |
| | | */ |
| | |
| | | { |
| | | |
| | | /** |
| | | * Returns the oldest {@link CSN}s from the replicaDBs for each serverId in |
| | | * the specified replication domain. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @return a new ServerState object holding the {serverId => oldest CSN} |
| | | * mapping. If a replica DB is empty or closed, the oldest CSN will be |
| | | * null for that replica. The caller owns the generated ServerState. |
| | | */ |
| | | ServerState getDomainOldestCSNs(DN baseDN); |
| | | |
| | | /** |
| | | * Returns the newest {@link CSN}s from the replicaDBs for each serverId in |
| | | * the specified replication domain. |
| | | * |
| | |
| | | ServerState getDomainNewestCSNs(DN baseDN); |
| | | |
| | | /** |
| | | * Retrieves the latest trim date for the specified replication domain. |
| | | * <p> |
| | | * FIXME will be removed when ECLServerHandler will not be responsible anymore |
| | | * for lazily building the ChangeNumberIndexDB. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @return the domain latest trim date |
| | | */ |
| | | long getDomainLatestTrimDate(DN baseDN); |
| | | |
| | | /** |
| | | * Removes all the data relating to the specified replication domain and |
| | | * shutdown all its replica databases. In particular, it will: |
| | | * <ol> |
| | |
| | | */ |
| | | private long purgeDelayInMillis; |
| | | private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); |
| | | private volatile long latestPurgeDate; |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | | private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | | new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY); |
| | | |
| | | /** |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ServerState getDomainOldestCSNs(DN baseDN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | | for (FileReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | result.update(replicaDB.getOldestCSN()); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ServerState getDomainNewestCSNs(DN baseDN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainLatestTrimDate(final DN baseDN) |
| | | { |
| | | return latestPurgeDate; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexDB getChangeNumberIndexDB() |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | |
| | | } |
| | | } |
| | | |
| | | latestPurgeDate = purgeTimestamp; |
| | | |
| | | jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | } |
| | | catch (InterruptedException e) |
| | |
| | | */ |
| | | private long purgeDelayInMillis; |
| | | private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); |
| | | private volatile long latestPurgeDate; |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ServerState getDomainOldestCSNs(DN baseDN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | result.update(replicaDB.getOldestCSN()); |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ServerState getDomainNewestCSNs(DN baseDN) |
| | | { |
| | | final ServerState result = new ServerState(); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainLatestTrimDate(final DN baseDN) |
| | | { |
| | | return latestPurgeDate; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexDB getChangeNumberIndexDB() |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | |
| | | } |
| | | } |
| | | |
| | | latestPurgeDate = purgeTimestamp; |
| | | |
| | | jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | } |
| | | catch (InterruptedException e) |
| | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.HostPort; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | | connect(); |
| | | connectAsDataServer(); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private void connect() |
| | | { |
| | | if (getBaseDN().toNormalizedString().equalsIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) |
| | | { |
| | | connectAsECL(); |
| | | } |
| | | else |
| | | { |
| | | connectAsDataServer(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Contacts all replication servers to get information from them and being |
| | | * able to choose the more suitable. |
| | |
| | | for (String serverUrl : getReplicationServerUrls()) |
| | | { |
| | | // Connect to server + get and store info about it |
| | | final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false); |
| | | final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false); |
| | | final ReplicationServerInfo rsInfo = rs.rsInfo; |
| | | if (rsInfo != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Special aspects of connecting as ECL (External Change Log) compared to |
| | | * connecting as data server are : |
| | | * <ul> |
| | | * <li>1 single RS configured</li> |
| | | * <li>so no choice of the preferred RS</li> |
| | | * <li>?? Heartbeat</li> |
| | | * <li>Start handshake is : |
| | | * |
| | | * <pre> |
| | | * Broker ---> StartECLMsg ---> RS |
| | | * <---- ReplServerStartMsg --- |
| | | * ---> StartSessionECLMsg --> RS |
| | | * </pre> |
| | | * |
| | | * </li> |
| | | * </ul> |
| | | */ |
| | | private void connectAsECL() |
| | | { |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | final String bestServerURL = getReplicationServerUrls().iterator().next(); |
| | | final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true); |
| | | if (rs.isConnected()) |
| | | { |
| | | performECLPhaseTwoHandshake(bestServerURL, rs); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Connect to a ReplicationServer. |
| | | * |
| | | * Handshake sequences between a DS and a RS is divided into 2 logical |
| | |
| | | + evals.getBestRS()); |
| | | |
| | | final ConnectedRS electedRS = performPhaseOneHandshake( |
| | | evals.getBestRS().getServerURL(), true, false); |
| | | evals.getBestRS().getServerURL(), true); |
| | | final ReplicationServerInfo electedRsInfo = electedRS.rsInfo; |
| | | if (electedRsInfo != null) |
| | | { |
| | |
| | | * Do we keep session opened or not after handshake. Use true if want |
| | | * to perform handshake phase 2 with the same session and keep the |
| | | * session to create as the current one. |
| | | * @param isECL |
| | | * Indicates whether or not the an ECL handshake is to be performed. |
| | | * @return The answer from the server . Null if could not get an answer. |
| | | */ |
| | | private ConnectedRS performPhaseOneHandshake(String serverURL, |
| | | boolean keepSession, boolean isECL) |
| | | private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession) |
| | | { |
| | | Session newSession = null; |
| | | Socket socket = null; |
| | |
| | | socket.setReceiveBufferSize(1000000); |
| | | socket.setTcpNoDelay(true); |
| | | int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); |
| | | socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), |
| | | timeoutMS); |
| | | socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS); |
| | | newSession = replSessionSecurity.createClientSession(socket, timeoutMS); |
| | | boolean isSslEncryption = replSessionSecurity.isSslEncryption(); |
| | | |
| | |
| | | final HostPort hp = new HostPort( |
| | | socket.getLocalAddress().getHostName(), socket.getLocalPort()); |
| | | final String url = hp.toString(); |
| | | final StartMsg serverStartMsg; |
| | | if (!isECL) |
| | | { |
| | | serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), |
| | | final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | else |
| | | { |
| | | serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0, |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | newSession.publish(serverStartMsg); |
| | | |
| | | // Read the ReplServerStartMsg or ReplServerStartDSMsg that should |
| | |
| | | return setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Performs the second phase handshake for External Change Log (send |
| | | * StartSessionMsg and receive TopologyMsg messages exchange) and return the |
| | | * reply message from the replication server. |
| | | * |
| | | * @param server Server we are connecting with. |
| | | */ |
| | | private void performECLPhaseTwoHandshake(String server, ConnectedRS rs) |
| | | { |
| | | try |
| | | { |
| | | // Send our Start Session |
| | | final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg(); |
| | | startECLSessionMsg.setOperationId("-1"); |
| | | rs.session.publish(startECLSessionMsg); |
| | | |
| | | // FIXME ECL In the handshake phase two, should RS send back a topo msg ? |
| | | if (debugEnabled()) |
| | | { |
| | | debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg); |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | rs.session.setSoTimeout(timeout); |
| | | setConnectedRS(rs); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get( |
| | | getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e))); |
| | | |
| | | rs.session.close(); |
| | | setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Performs the second phase handshake (send StartSessionMsg and receive |
| | | * TopologyMsg messages exchange) and return the reply message from the |
| | |
| | | |
| | | try |
| | | { |
| | | connect(); |
| | | connectAsDataServer(); |
| | | rs = connectedRS.get(); |
| | | } |
| | | catch (Exception e) |
| | |
| | | Map<Integer, ReplicationServerInfo> previousRsInfos) |
| | | { |
| | | this.rsServerId = rsServerId; |
| | | this.replicaInfos = dsInfosToKeep; |
| | | this.replicaInfos = dsInfosToKeep == null |
| | | ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep; |
| | | this.rsInfos = computeRSInfos(dsServerId, newRSInfos, |
| | | previousRsInfos, configuredReplicationServerUrls); |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos |
| | | return getClass().getSimpleName() |
| | | + " rsServerId=" + rsServerId |
| | | + ", replicaInfos=" + replicaInfos.values() |
| | | + ", rsInfos=" + rsInfos.values(); |
| | | } |
| | | } |
| | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.DomainFakeCfg; |
| | | import org.opends.server.replication.plugin.ExternalChangelogDomainFakeCfg; |
| | | import org.opends.server.replication.plugin.LDAPReplicationDomain; |
| | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; |
| | | import org.opends.server.replication.service.DSRSShutdownSync; |
| | | import org.opends.server.replication.service.ReplicationBroker; |
| | |
| | | generateDeleteMsg(TEST_ROOT_DN_STRING, csn9, test, 9)); |
| | | |
| | | // ensure oldest state is correct for each suffix and for each server id |
| | | final ServerState oldestState = getDomainOldestState(DN_OTEST); |
| | | assertEquals(oldestState.getCSN(SERVER_ID_1), csn1); |
| | | assertEquals(oldestState.getCSN(serverId22), csn7); |
| | | isOldestCSNForReplica(DN_OTEST, csn1); |
| | | isOldestCSNForReplica(DN_OTEST, csn7); |
| | | |
| | | final ServerState oldestState2 = getDomainOldestState(DN_OTEST2); |
| | | assertEquals(oldestState2.getCSN(SERVER_ID_2), csn2); |
| | | assertEquals(oldestState2.getCSN(serverId11), csn6); |
| | | isOldestCSNForReplica(DN_OTEST2, csn2); |
| | | isOldestCSNForReplica(DN_OTEST2, csn6); |
| | | |
| | | // test last cookie on root DSE |
| | | MultiDomainServerState expectedLastCookie = |
| | |
| | | finally |
| | | { |
| | | removeBackend(backendForSecondSuffix); |
| | | //replicationServer.getChangelogDB().getReplicationDomainDB().removeDomain(ROOT_DN_OTEST2); |
| | | } |
| | | } |
| | | |
| | | private void isOldestCSNForReplica(DN baseDN, CSN csn) throws Exception |
| | | { |
| | | final ReplicationDomainDB domainDB = replicationServer.getChangelogDB().getReplicationDomainDB(); |
| | | final DBCursor<UpdateMsg> cursor = |
| | | domainDB.getCursorFrom(baseDN, csn.getServerId(), null, PositionStrategy.ON_MATCHING_KEY); |
| | | try { |
| | | assertTrue(cursor.next(), |
| | | "Expected to be to find at least one change in replicaDB(" + baseDN + " " + csn.getServerId() + ")"); |
| | | assertEquals(cursor.getRecord().getCSN(), csn); |
| | | }finally{ |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | |
| | | return results; |
| | | } |
| | | |
| | | private ServerState getDomainOldestState(DN baseDN) |
| | | { |
| | | return replicationServer.getReplicationServerDomain(baseDN).getOldestState(); |
| | | } |
| | | |
| | | private void assertSearchParameters(SearchParams searchParams, long firstChangeNumber, |
| | | long lastChangeNumber, CSN csn) throws Exception |
| | | { |
| | |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg.Persistent; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.workflowelement.localbackend.LocalBackendAddOperation; |
| | |
| | | msg1.getBytes(getCurrentVersion()), getCurrentVersion()); |
| | | } |
| | | |
| | | @Test(enabled=true) |
| | | public void eclUpdateMsg() |
| | | throws Exception |
| | | { |
| | | // create a msg to put in the eclupdatemsg |
| | | InternalClientConnection connection = |
| | | InternalClientConnection.getRootConnection(); |
| | | DeleteOperation deleteOp = |
| | | new DeleteOperationBasis(connection, 1, 1,null, DN.decode("cn=t1")); |
| | | LocalBackendDeleteOperation op = new LocalBackendDeleteOperation(deleteOp); |
| | | CSN csn = new CSN(TimeThread.getTime(), 123, 45); |
| | | op.setAttachment(SYNCHROCONTEXT, new DeleteContext(csn, "uniqueid")); |
| | | DeleteMsg delmsg = new DeleteMsg(op); |
| | | long changeNumber = 21; |
| | | |
| | | DN baseDN = DN.decode("dc=example,dc=com"); |
| | | |
| | | // create a cookie |
| | | MultiDomainServerState cookie = |
| | | new MultiDomainServerState( |
| | | "o=test:000001210b6f21e904b100000001 000001210b6f21e904b200000001;" + |
| | | "o=test2:000001210b6f21e904b100000002 000001210b6f21e904b200000002;"); |
| | | |
| | | // Constructor test |
| | | ECLUpdateMsg msg1 = new ECLUpdateMsg(delmsg, cookie, baseDN, changeNumber); |
| | | assertTrue(msg1.getCookie().equalsTo(cookie)); |
| | | assertEquals(msg1.getBaseDN(), baseDN); |
| | | assertEquals(msg1.getChangeNumber(), changeNumber); |
| | | DeleteMsg delmsg2 = (DeleteMsg)msg1.getUpdateMsg(); |
| | | assertEquals(delmsg.compareTo(delmsg2), 0); |
| | | |
| | | // Constructor test (with byte[]) |
| | | ECLUpdateMsg msg2 = new ECLUpdateMsg(msg1.getBytes(getCurrentVersion())); |
| | | assertTrue(msg2.getCookie().equalsTo(msg2.getCookie())); |
| | | assertTrue(msg2.getCookie().equalsTo(cookie)); |
| | | assertEquals(msg2.getBaseDN(), msg1.getBaseDN()); |
| | | assertEquals(msg2.getBaseDN(), baseDN); |
| | | assertEquals(msg2.getChangeNumber(), msg1.getChangeNumber()); |
| | | assertEquals(msg2.getChangeNumber(), changeNumber); |
| | | |
| | | DeleteMsg delmsg1 = (DeleteMsg)msg1.getUpdateMsg(); |
| | | delmsg2 = (DeleteMsg)msg2.getUpdateMsg(); |
| | | assertEquals(delmsg2.compareTo(delmsg), 0); |
| | | assertEquals(delmsg2.compareTo(delmsg1), 0); |
| | | } |
| | | |
| | | @DataProvider(name="createServerStartData") |
| | | public Object[][] createServerStartData() throws Exception |
| | | { |
| | |
| | | newMsg.getDegradedStatusThreshold()); |
| | | } |
| | | |
| | | @DataProvider(name="createReplServerStartDSData") |
| | | public Object[][] createReplServerStartDSData() throws Exception |
| | | { |
| | | DN baseDN = TEST_ROOT_DN; |
| | | |
| | | final ServerState state1 = new ServerState(); |
| | | state1.update(new CSN(0, 0, 0)); |
| | | final ServerState state2 = new ServerState(); |
| | | state2.update(new CSN(75, 5, 263)); |
| | | final ServerState state3 = new ServerState(); |
| | | state3.update(new CSN(123, 5, 98)); |
| | | |
| | | return new Object[][] |
| | | { |
| | | {1, baseDN, 0, "localhost:8989", state1, 0L, (byte)0, 0, 0, 0}, |
| | | {16, baseDN, 100, "anotherHost:1025", state2, 1245L, (byte)25, 3456, 3, 31512}, |
| | | {36, baseDN, 100, "anotherHostAgain:8017", state3, 6841L, (byte)32, 2496, 630, 9524}, |
| | | }; |
| | | } |
| | | |
| | | /** |
| | | * Test that ReplServerStartDSMsg encoding and decoding works |
| | | * by checking that : msg == new ReplServerStartMsg(msg.getBytes()). |
| | | */ |
| | | @Test(dataProvider="createReplServerStartDSData") |
| | | public void replServerStartDSMsgTest(int serverId, DN baseDN, int window, |
| | | String url, ServerState state, long genId, byte groupId, int degTh, |
| | | int weight, int connectedDSNumber) throws Exception |
| | | { |
| | | ReplServerStartDSMsg msg = new ReplServerStartDSMsg(serverId, |
| | | url, baseDN, window, state, genId, |
| | | true, groupId, degTh, weight, connectedDSNumber); |
| | | ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes(getCurrentVersion())); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getServerURL(), newMsg.getServerURL()); |
| | | assertEquals(msg.getBaseDN(), newMsg.getBaseDN()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getServerState().getCSN(1), |
| | | newMsg.getServerState().getCSN(1)); |
| | | assertEquals(newMsg.getVersion(), getCurrentVersion()); |
| | | assertEquals(msg.getGenerationId(), newMsg.getGenerationId()); |
| | | assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption()); |
| | | assertEquals(msg.getGroupId(), newMsg.getGroupId()); |
| | | assertEquals(msg.getDegradedStatusThreshold(), |
| | | newMsg.getDegradedStatusThreshold()); |
| | | assertEquals(msg.getWeight(), newMsg.getWeight()); |
| | | assertEquals(msg.getConnectedDSNumber(), newMsg.getConnectedDSNumber()); |
| | | } |
| | | |
| | | /** |
| | | * Test that StopMsg encoding and decoding works |
| | | * by checking that : msg == new StopMsg(msg.getBytes()). |
| | |
| | | assertEquals(test.getBytes(), newMsg.getPayload()); |
| | | } |
| | | |
| | | /** |
| | | * Test that ServerStartMsg encoding and decoding works |
| | | * by checking that : msg == new ServerStartMsg(msg.getBytes()). |
| | | */ |
| | | @Test(enabled=true,dataProvider="createServerStartData") |
| | | public void startECLMsgTest(int serverId, DN baseDN, int window, |
| | | ServerState state, long genId, boolean sslEncryption, byte groupId) throws Exception |
| | | { |
| | | ServerStartECLMsg msg = new ServerStartECLMsg( |
| | | "localhost:1234", window, window, window, window, window, window, state, |
| | | genId, sslEncryption, groupId); |
| | | ServerStartECLMsg newMsg = new ServerStartECLMsg(msg.getBytes(getCurrentVersion())); |
| | | assertEquals(msg.getServerURL(), newMsg.getServerURL()); |
| | | assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay()); |
| | | assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue()); |
| | | assertEquals(msg.getMaxSendDelay(), newMsg.getMaxSendDelay()); |
| | | assertEquals(msg.getMaxSendQueue(), newMsg.getMaxSendQueue()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval()); |
| | | assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption()); |
| | | assertEquals(msg.getServerState().getCSN(1), |
| | | newMsg.getServerState().getCSN(1)); |
| | | assertEquals(newMsg.getVersion(), getCurrentVersion()); |
| | | assertEquals(msg.getGenerationId(), newMsg.getGenerationId()); |
| | | assertEquals(msg.getGroupId(), newMsg.getGroupId()); |
| | | } |
| | | |
| | | /** |
| | | * Test StartSessionMsg encoding and decoding. |
| | | */ |
| | | @Test() |
| | | public void startECLSessionMsgTest() throws Exception |
| | | { |
| | | // data |
| | | CSN csn = new CSN(TimeThread.getTime(), 123, 45); |
| | | ServerState state = new ServerState(); |
| | | assertTrue(state.update(new CSN(75, 5,263))); |
| | | |
| | | // create original |
| | | StartECLSessionMsg msg = new StartECLSessionMsg(); |
| | | msg.setCSN(csn); |
| | | msg.setCrossDomainServerState("fakegenstate"); |
| | | msg.setPersistent(Persistent.PERSISTENT); |
| | | msg.setFirstChangeNumber(13); |
| | | msg.setLastChangeNumber(14); |
| | | msg.setECLRequestType(ECLRequestType.REQUEST_TYPE_EQUALS_REPL_CHANGE_NUMBER); |
| | | msg.setOperationId("fakeopid"); |
| | | String dn1 = "cn=admin data"; |
| | | String dn2 = "cn=config"; |
| | | msg.setExcludedDNs(newSet(dn1, dn2)); |
| | | |
| | | // create copy |
| | | StartECLSessionMsg newMsg = new StartECLSessionMsg(msg.getBytes(getCurrentVersion())); |
| | | // test equality between the two copies |
| | | assertEquals(msg.getCSN(), newMsg.getCSN()); |
| | | assertEquals(msg.getPersistent(), newMsg.getPersistent()); |
| | | assertEquals(msg.getFirstChangeNumber(), newMsg.getFirstChangeNumber()); |
| | | assertEquals(msg.getECLRequestType(), newMsg.getECLRequestType()); |
| | | assertEquals(msg.getLastChangeNumber(), newMsg.getLastChangeNumber()); |
| | | assertTrue(msg.getCrossDomainServerState().equalsIgnoreCase(newMsg.getCrossDomainServerState())); |
| | | assertTrue(msg.getOperationId().equalsIgnoreCase(newMsg.getOperationId())); |
| | | Assertions.assertThat(newMsg.getExcludedBaseDNs()).containsOnly(dn1, dn2); |
| | | } |
| | | |
| | | private int perfRep = 100000; |
| | | |
| | | |
| | |
| | | |
| | | assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]); |
| | | assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); |
| | | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[0]); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2]); |
| | | assertLimits(replicaDB, csns[0], csns[2]); |
| | | |
| | | DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid"); |
| | | replicaDB.add(update4); |
| | |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid")); |
| | | |
| | | assertEquals(csns[0], replicaDB.getOldestCSN()); |
| | | assertEquals(csns[2], replicaDB.getNewestCSN()); |
| | | assertLimits(replicaDB, csns[0], csns[2]); |
| | | |
| | | // Clear DB and check it is cleared. |
| | | replicaDB.clear(); |
| | | |
| | | assertEquals(null, replicaDB.getOldestCSN()); |
| | | assertEquals(null, replicaDB.getNewestCSN()); |
| | | assertLimits(replicaDB, null, null); |
| | | } |
| | | finally |
| | | { |
| | |
| | | mySeqnum+=2; |
| | | } |
| | | waitChangesArePersisted(replicaDB, max, counterWindow); |
| | | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | assertLimits(replicaDB, csns[1], csns[max]); |
| | | |
| | | // Now we want to test that after closing and reopening the db, the |
| | | // counting algo is well reinitialized and when new messages are added |
| | |
| | | replicaDB.shutdown(); |
| | | |
| | | replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv); |
| | | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | assertLimits(replicaDB, csns[1], csns[max]); |
| | | |
| | | // Populate the db with 'max' msg |
| | | for (int i=max+1; i<=2 * max; i++) |
| | |
| | | mySeqnum+=2; |
| | | } |
| | | waitChangesArePersisted(replicaDB, 2 * max, counterWindow); |
| | | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN"); |
| | | assertLimits(replicaDB, csns[1], csns[2 * max]); |
| | | |
| | | replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0)); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertLimits(FileReplicaDB replicaDB, CSN oldestCSN, CSN newestCSN) |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(replicaDB.getOldestCSN()).as("Wrong oldest CSN").isEqualTo(oldestCSN); |
| | | softly.assertThat(replicaDB.getNewestCSN()).as("Wrong newest CSN").isEqualTo(newestCSN); |
| | | softly.assertAll(); |
| | | } |
| | | |
| | | private void shutdown(FileReplicaDB replicaDB) |
| | | { |
| | | if (replicaDB != null) |