OPENDJ-1205 (CR-4428) Remove network layer from External ChangeLog implementation
After OPENDJ-1206, I could remove all old External ChangeLog (or ECL for short) related classes. I could also remove annoyingly strange APIs from various places of replication code.
ECLUpdateMsg.java, ServerStartECLMsg.java, StartECLSessionMsg.java, ECLServerHandler.java, ECLServerWriter.java, ECLSearchOperation.java, ECLWorkflowElement.java, ExternalChangeLogTest.java: REMOVED
Also removed package src/server/org/opends/server/workflowelement/externalchangelog as part of these deletes.
replication*.properties:
Removed now unused SEVERE_ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.
ReplicationMsg.java:
Deprecated old ECL message types.
ReplicationBroker.java, ReplicationServer.java:
Removed all ECL related code.
ReplicationServerDomain.java:
Removed the now unused otherHandlers field, stopServer(MessageHandler), registerHandler(MessageHandler), unRegisterHandler(MessageHandler), unregisterOtherHandler(), getOldestState() and getLatestDomainTrimDate()
ServerHandler.java
Reduced visibility has much as possible after removing ECLServerHandler.
MultimasterReplication.java:
Renamed getECLDisabledDomains() to , included "cn=changelog" by default + changed client code to not add it anymore.
LastCookieVirtualProvider.java:
Consequence of the change to MultimasterReplication.
ChangelogBackend.java:
Consequence of the change to MultimasterReplication.getECLDisabledDomains().
Removed SearchParams.requestType and replaced its usage with the new isCookieBasedSearch()
MultiDomainServerState.java, ServerState.java, ReplicationDomainDB.java, FileChangelogDB.java, JEChangelogDB.java:
Removed now unused methods / fields.
Made private methods / fields that are now only used inside their declaration classes.
ChangelogBackendTestCase.java, SynchronizationMsgTest.java, FileReplicaDBTest.java:
Updated or removed tests as a consequences of the whole change.
9 files deleted
14 files modified
| | |
| | | import org.forgerock.opendj.ldap.ModificationType; |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | import org.forgerock.opendj.ldap.SearchScope; |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.Configuration; |
| | | import org.opends.server.api.Backend; |
| | | import org.opends.server.config.ConfigConstants; |
| | |
| | | import static org.opends.messages.BackendMessages.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.config.ConfigConstants.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.replication.plugin.MultimasterReplication.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | |
| | | private static Set<DN> getExcludedBaseDNs() throws DirectoryException |
| | | { |
| | | final Set<DN> excludedDNs = new HashSet<DN>(); |
| | | for (String dn : getECLDisabledDomains()) |
| | | for (String dn : getExcludedChangelogDomains()) |
| | | { |
| | | excludedDNs.add(DN.valueOf(dn)); |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.common; |
| | | |
| | | import java.util.Set; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg; |
| | | import org.opends.server.api.VirtualAttributeProvider; |
| | | import org.opends.server.core.SearchOperation; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.VirtualAttributeRule; |
| | | |
| | | import static org.opends.messages.ExtensionMessages.*; |
| | | import static org.opends.server.replication.plugin.MultimasterReplication.*; |
| | | |
| | | /** |
| | | * This class implements a virtual attribute provider in the root-dse entry |
| | |
| | | { |
| | | if (replicationServer != null) |
| | | { |
| | | // Set a list of excluded domains (also exclude 'cn=changelog' itself) |
| | | Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains(); |
| | | excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT); |
| | | |
| | | String newestCookie = replicationServer.getNewestECLCookie(excludedDomains).toString(); |
| | | String newestCookie = replicationServer.getNewestECLCookie(getExcludedChangelogDomains()).toString(); |
| | | return Attributes.create(rule.getAttributeType(), newestCookie); |
| | | } |
| | | } |
| | |
| | | * when an error occurs |
| | | * @return the split state. |
| | | */ |
| | | public static Map<DN, ServerState> splitGenStateToServerStates( |
| | | private static Map<DN, ServerState> splitGenStateToServerStates( |
| | | String multiDomainServerState) throws DirectoryException |
| | | { |
| | | Map<DN, ServerState> startStates = new TreeMap<DN, ServerState>(); |
| | |
| | | return saved; |
| | | } |
| | | |
| | | /** |
| | | * Build a copy of the ServerState with only CSNs older than a provided |
| | | * timestamp. This is used when building the initial Cookie in the External |
| | | * Changelog, to cope with purged changes. |
| | | * |
| | | * @param timestamp |
| | | * The timestamp to compare the ServerState against |
| | | * @return a copy of the ServerState which only contains the CSNs older than |
| | | * csn. |
| | | */ |
| | | public ServerState duplicateOnlyOlderThan(long timestamp) |
| | | { |
| | | final CSN csn = new CSN(timestamp, 0, 0); |
| | | final ServerState newState = new ServerState(); |
| | | for (CSN change : serverIdToCSN.values()) |
| | | { |
| | | if (change.isOlderThan(csn)) |
| | | { |
| | | newState.serverIdToCSN.put(change.getServerId(), change); |
| | | } |
| | | } |
| | | return newState; |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.util.*; |
| | | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.HashSet; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.BlockingQueue; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | import org.opends.server.admin.server.ConfigurationAddListener; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.server.ConfigurationDeleteListener; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; |
| | | import org.opends.server.api.*; |
| | | import org.opends.server.api.Backend; |
| | | import org.opends.server.api.BackupTaskListener; |
| | | import org.opends.server.api.ExportTaskListener; |
| | | import org.opends.server.api.ImportTaskListener; |
| | | import org.opends.server.api.RestoreTaskListener; |
| | | import org.opends.server.api.SynchronizationProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.service.DSRSShutdownSync; |
| | | import org.opends.server.types.*; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | import org.opends.server.types.operation.*; |
| | | import org.opends.server.types.BackupConfig; |
| | | import org.opends.server.types.ConfigChangeResult; |
| | | import org.opends.server.types.Control; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.LDIFExportConfig; |
| | | import org.opends.server.types.LDIFImportConfig; |
| | | import org.opends.server.types.Modification; |
| | | import org.opends.server.types.Operation; |
| | | import org.opends.server.types.RestoreConfig; |
| | | import org.opends.server.types.SynchronizationProviderResult; |
| | | import org.opends.server.types.operation.PluginOperation; |
| | | import org.opends.server.types.operation.PostOperationAddOperation; |
| | | import org.opends.server.types.operation.PostOperationDeleteOperation; |
| | | import org.opends.server.types.operation.PostOperationModifyDNOperation; |
| | | import org.opends.server.types.operation.PostOperationModifyOperation; |
| | | import org.opends.server.types.operation.PostOperationOperation; |
| | | import org.opends.server.types.operation.PreOperationAddOperation; |
| | | import org.opends.server.types.operation.PreOperationDeleteOperation; |
| | | import org.opends.server.types.operation.PreOperationModifyDNOperation; |
| | | import org.opends.server.types.operation.PreOperationModifyOperation; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Gets the Set of baseDN of the domains which are disabled for the external |
| | | * changelog. |
| | | * Gets the Set of domain baseDN which are disabled for the external changelog. |
| | | * |
| | | * @return The Set of baseDNs which are disabled for the external changelog. |
| | | * @return The Set of domain baseDNs which are disabled for the external changelog. |
| | | */ |
| | | public static Set<String> getECLDisabledDomains() |
| | | public static Set<String> getExcludedChangelogDomains() |
| | | { |
| | | final Set<String> disabledBaseDNs = new HashSet<String>(domains.size()); |
| | | final Set<String> disabledBaseDNs = new HashSet<String>(domains.size() + 1); |
| | | disabledBaseDNs.add(DN_EXTERNAL_CHANGELOG_ROOT); |
| | | for (LDAPReplicationDomain domain : domains.values()) |
| | | { |
| | | if (!domain.isECLEnabled()) |
| | |
| | | static final byte MSG_TYPE_GENERIC_UPDATE = 29; |
| | | |
| | | // Added for protocol version 3 |
| | | @Deprecated |
| | | static final byte MSG_TYPE_START_ECL = 30; |
| | | @Deprecated |
| | | static final byte MSG_TYPE_START_ECL_SESSION = 31; |
| | | @Deprecated |
| | | static final byte MSG_TYPE_ECL_UPDATE = 32; |
| | | static final byte MSG_TYPE_CT_HEARTBEAT = 33; |
| | | |
| | |
| | | case MSG_TYPE_GENERIC_UPDATE: |
| | | return new UpdateMsg(buffer); |
| | | case MSG_TYPE_START_ECL: |
| | | return new ServerStartECLMsg(buffer); |
| | | case MSG_TYPE_START_ECL_SESSION: |
| | | return new StartECLSessionMsg(buffer); |
| | | case MSG_TYPE_ECL_UPDATE: |
| | | return new ECLUpdateMsg(buffer); |
| | | // Legacy versions never sent such messages to other instances (other JVMs). |
| | | // They were only used in the combined DS-RS case. |
| | | // It is safe to totally ignore these values since code now uses the ChangelogBackend. |
| | | return null; |
| | | case MSG_TYPE_CT_HEARTBEAT: |
| | | return new ChangeTimeHeartbeatMsg(buffer, protocolVersion); |
| | | case MSG_TYPE_REPL_SERVER_START_DS: |
| | |
| | | session, queueSize, this, rcvWindow); |
| | | rsHandler.startFromRemoteRS((ReplServerStartMsg) msg); |
| | | } |
| | | else if (msg instanceof ServerStartECLMsg) |
| | | { |
| | | ECLServerHandler eclHandler = new ECLServerHandler( |
| | | session, queueSize, this, rcvWindow); |
| | | eclHandler.startFromRemoteServer((ServerStartECLMsg) msg); |
| | | } |
| | | else |
| | | { |
| | | // We did not recognize the message, close session as what |
| | |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.*; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.MonitorMsg; |
| | | import org.opends.server.replication.protocol.MonitorRequestMsg; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.HostPort; |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | private final Map<Integer, ReplicationServerHandler> connectedRSs = |
| | | new ConcurrentHashMap<Integer, ReplicationServerHandler>(); |
| | | |
| | | private final Queue<MessageHandler> otherHandlers = |
| | | new ConcurrentLinkedQueue<MessageHandler>(); |
| | | |
| | | private final ReplicationDomainDB domainDB; |
| | | /** The ReplicationServer that created the current instance. */ |
| | | private final ReplicationServer localReplicationServer; |
| | |
| | | addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers); |
| | | } |
| | | } |
| | | |
| | | // Push the message to the other subscribing handlers |
| | | for (MessageHandler mHandler : otherHandlers) { |
| | | mHandler.add(updateMsg); |
| | | } |
| | | } |
| | | |
| | | private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler, |
| | |
| | | { |
| | | unregisterServerHandler(sHandler, shutdown, true); |
| | | } |
| | | else if (otherHandlers.contains(sHandler)) |
| | | { |
| | | unregisterOtherHandler(sHandler); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private void unregisterOtherHandler(MessageHandler mHandler) |
| | | { |
| | | unRegisterHandler(mHandler); |
| | | mHandler.shutdown(); |
| | | } |
| | | |
| | | private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown, |
| | | boolean isDirectoryServer) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Stop the handler. |
| | | * @param mHandler The handler to stop. |
| | | */ |
| | | public void stopServer(MessageHandler mHandler) |
| | | { |
| | | // TODO JNR merge with stopServer(ServerHandler, boolean) |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | debug("stopServer() on the message handler " + mHandler); |
| | | } |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | | * instance this code is called from dying ServerReader but also dying |
| | | * ServerWriter at the same time, or from a thread that wants to shut down |
| | | * the handler. So use a thread safe flag to know if the job must be done |
| | | * or not (is already being processed or not). |
| | | */ |
| | | if (!mHandler.engageShutdown()) |
| | | // Only do this once (prevent other thread to enter here again) |
| | | { |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | | } |
| | | catch (InterruptedException ex) |
| | | { |
| | | // We can't deal with this here, so re-interrupt thread so that it is |
| | | // caught during subsequent IO. |
| | | Thread.currentThread().interrupt(); |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | if (otherHandlers.contains(mHandler)) |
| | | { |
| | | unregisterOtherHandler(mHandler); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | release(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Unregister this handler from the list of handlers registered to this |
| | | * domain. |
| | | * @param sHandler the provided handler to unregister. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Register in the domain an handler that subscribes to changes. |
| | | * @param mHandler the provided subscribing handler. |
| | | */ |
| | | public void registerHandler(MessageHandler mHandler) |
| | | { |
| | | this.otherHandlers.add(mHandler); |
| | | } |
| | | |
| | | /** |
| | | * Unregister from the domain an handler. |
| | | * @param mHandler the provided unsubscribing handler. |
| | | * @return Whether this handler has been unregistered with success. |
| | | */ |
| | | public boolean unRegisterHandler(MessageHandler mHandler) |
| | | { |
| | | return this.otherHandlers.remove(mHandler); |
| | | } |
| | | |
| | | /** |
| | | * Returns the oldest known state for the domain, made of the oldest CSN |
| | | * stored for each serverId. |
| | | * <p> |
| | |
| | | return domainDB.getDomainOldestCSNs(baseDN); |
| | | } |
| | | |
| | | private void sendTopologyMsg(String type, ServerHandler handler, |
| | | TopologyMsg msg) |
| | | private void sendTopologyMsg(String type, ServerHandler handler, TopologyMsg msg) |
| | | { |
| | | for (int i = 1; i <= 2; i++) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get the latest (more recent) trim date of the changelog dbs associated |
| | | * to this domain. |
| | | * @return The latest trim date. |
| | | */ |
| | | public long getLatestDomainTrimDate() |
| | | { |
| | | return domainDB.getDomainLatestTrimDate(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Return the monitor instance name of the ReplicationServer that created the |
| | | * current instance. |
| | |
| | | /** |
| | | * The associated ServerWriter that sends messages to the remote server. |
| | | */ |
| | | protected ServerWriter writer; |
| | | private ServerWriter writer; |
| | | |
| | | /** |
| | | * The associated ServerReader that receives messages from the remote server. |
| | | */ |
| | | protected ServerReader reader; |
| | | private ServerReader reader; |
| | | |
| | | // window |
| | | private int rcvWindow; |
| | |
| | | /** |
| | | * Semaphore that the writer uses to control the flow to the remote server. |
| | | */ |
| | | protected Semaphore sendWindow; |
| | | private Semaphore sendWindow; |
| | | /** |
| | | * The initial size of the sending window. |
| | | */ |
| | | protected int sendWindowSize; |
| | | private int sendWindowSize; |
| | | /** |
| | | * remote generation id. |
| | | */ |
| | |
| | | * The time in milliseconds between heartbeats from the replication |
| | | * server. Zero means heartbeats are off. |
| | | */ |
| | | protected long heartbeatInterval = 0; |
| | | protected long heartbeatInterval; |
| | | |
| | | /** |
| | | * The thread that will send heartbeats. |
| | |
| | | /** |
| | | * Set when ServerWriter is stopping. |
| | | */ |
| | | protected volatile boolean shutdownWriter = false; |
| | | private volatile boolean shutdownWriter; |
| | | |
| | | /** |
| | | * Weight of this remote server. |
| | |
| | | } |
| | | |
| | | // Window stats |
| | | attributes.add(Attributes.create("max-send-window", String |
| | | .valueOf(sendWindowSize))); |
| | | attributes.add(Attributes.create("current-send-window", String |
| | | .valueOf(sendWindow.availablePermits()))); |
| | | attributes.add(Attributes.create("max-rcv-window", String |
| | | .valueOf(maxRcvWindow))); |
| | | attributes.add(Attributes.create("current-rcv-window", String |
| | | .valueOf(rcvWindow))); |
| | | attributes.add(Attributes.create("max-send-window", String.valueOf(sendWindowSize))); |
| | | attributes.add(Attributes.create("current-send-window", String.valueOf(sendWindow.availablePermits()))); |
| | | attributes.add(Attributes.create("max-rcv-window", String.valueOf(maxRcvWindow))); |
| | | attributes.add(Attributes.create("current-rcv-window", String.valueOf(rcvWindow))); |
| | | |
| | | // Encryption |
| | | attributes.add(Attributes.create("ssl-encryption", String |
| | | .valueOf(session.isEncrypted()))); |
| | | attributes.add(Attributes.create("ssl-encryption", String.valueOf(session.isEncrypted()))); |
| | | |
| | | // Data generation |
| | | attributes.add(Attributes.create("generation-id", String |
| | | .valueOf(generationId))); |
| | | attributes.add(Attributes.create("generation-id", String.valueOf(generationId))); |
| | | |
| | | return attributes; |
| | | } |
| | |
| | | } |
| | | |
| | | /** |
| | | * Requests to shutdown the writer. |
| | | */ |
| | | protected void shutdownWriter() |
| | | { |
| | | shutdownWriter = true; |
| | | } |
| | | |
| | | /** |
| | | * Shutdown This ServerHandler. |
| | | */ |
| | | @Override |
| | | public void shutdown() |
| | | { |
| | | shutdownWriter(); |
| | | shutdownWriter = true; |
| | | setConsumerActive(false); |
| | | super.shutdown(); |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Log the messages involved in the Topology/StartSession handshake. |
| | | * @param inStartECLSessionMsg The message received first. |
| | | */ |
| | | protected void logStartECLSessionHandshake( |
| | | StartECLSessionMsg inStartECLSessionMsg) |
| | | { |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | logger.trace("In " + this.replicationServer.getMonitorInstanceName() |
| | | + ", " + getClass().getSimpleName() + " " + this + " :" |
| | | + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Process a Ack message received. |
| | | * @param ack the message received. |
| | | */ |
| | |
| | | ServerState getDomainNewestCSNs(DN baseDN); |
| | | |
| | | /** |
| | | * Retrieves the latest trim date for the specified replication domain. |
| | | * <p> |
| | | * FIXME will be removed when ECLServerHandler will not be responsible anymore |
| | | * for lazily building the ChangeNumberIndexDB. |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @return the domain latest trim date |
| | | */ |
| | | long getDomainLatestTrimDate(DN baseDN); |
| | | |
| | | /** |
| | | * Removes all the data relating to the specified replication domain and |
| | | * shutdown all its replica databases. In particular, it will: |
| | | * <ol> |
| | |
| | | */ |
| | | public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB |
| | | { |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainLatestTrimDate(DN baseDN) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | |
| | | */ |
| | | private long purgeDelayInMillis; |
| | | private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); |
| | | private volatile long latestPurgeDate; |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainLatestTrimDate(final DN baseDN) |
| | | { |
| | | return latestPurgeDate; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexDB getChangeNumberIndexDB() |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | |
| | | } |
| | | } |
| | | |
| | | latestPurgeDate = purgeTimestamp; |
| | | |
| | | jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); |
| | | } |
| | | catch (InterruptedException e) |
| | |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.HostPort; |
| | | import org.opends.server.util.ServerConstants; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | |
| | | { |
| | | shutdown = false; |
| | | this.rcvWindow = getMaxRcvWindow(); |
| | | connect(); |
| | | connectAsDataServer(); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private void connect() |
| | | { |
| | | if (getBaseDN().toNormalizedString().equalsIgnoreCase( |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT)) |
| | | { |
| | | connectAsECL(); |
| | | } |
| | | else |
| | | { |
| | | connectAsDataServer(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Contacts all replication servers to get information from them and being |
| | | * able to choose the more suitable. |
| | |
| | | for (String serverUrl : getReplicationServerUrls()) |
| | | { |
| | | // Connect to server + get and store info about it |
| | | final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false); |
| | | final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false); |
| | | final ReplicationServerInfo rsInfo = rs.rsInfo; |
| | | if (rsInfo != null) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Special aspects of connecting as ECL (External Change Log) compared to |
| | | * connecting as data server are : |
| | | * <ul> |
| | | * <li>1 single RS configured</li> |
| | | * <li>so no choice of the preferred RS</li> |
| | | * <li>?? Heartbeat</li> |
| | | * <li>Start handshake is : |
| | | * |
| | | * <pre> |
| | | * Broker ---> StartECLMsg ---> RS |
| | | * <---- ReplServerStartMsg --- |
| | | * ---> StartSessionECLMsg --> RS |
| | | * </pre> |
| | | * |
| | | * </li> |
| | | * </ul> |
| | | */ |
| | | private void connectAsECL() |
| | | { |
| | | // FIXME:ECL List of RS to connect is for now limited to one RS only |
| | | final String bestServerURL = getReplicationServerUrls().iterator().next(); |
| | | final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true); |
| | | if (rs.isConnected()) |
| | | { |
| | | performECLPhaseTwoHandshake(bestServerURL, rs); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Connect to a ReplicationServer. |
| | | * |
| | | * Handshake sequences between a DS and a RS is divided into 2 logical |
| | |
| | | + evals.getBestRS()); |
| | | |
| | | final ConnectedRS electedRS = performPhaseOneHandshake( |
| | | evals.getBestRS().getServerURL(), true, false); |
| | | evals.getBestRS().getServerURL(), true); |
| | | final ReplicationServerInfo electedRsInfo = electedRS.rsInfo; |
| | | if (electedRsInfo != null) |
| | | { |
| | |
| | | * Do we keep session opened or not after handshake. Use true if want |
| | | * to perform handshake phase 2 with the same session and keep the |
| | | * session to create as the current one. |
| | | * @param isECL |
| | | * Indicates whether or not the an ECL handshake is to be performed. |
| | | * @return The answer from the server . Null if could not get an answer. |
| | | */ |
| | | private ConnectedRS performPhaseOneHandshake(String serverURL, |
| | | boolean keepSession, boolean isECL) |
| | | private ConnectedRS performPhaseOneHandshake(String serverURL, boolean keepSession) |
| | | { |
| | | Session newSession = null; |
| | | Socket socket = null; |
| | |
| | | socket.bind(local); |
| | | } |
| | | int timeoutMS = MultimasterReplication.getConnectionTimeoutMS(); |
| | | socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), |
| | | timeoutMS); |
| | | socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(), timeoutMS); |
| | | newSession = replSessionSecurity.createClientSession(socket, timeoutMS); |
| | | boolean isSslEncryption = replSessionSecurity.isSslEncryption(); |
| | | |
| | |
| | | final HostPort hp = new HostPort( |
| | | socket.getLocalAddress().getHostName(), socket.getLocalPort()); |
| | | final String url = hp.toString(); |
| | | final StartMsg serverStartMsg; |
| | | if (!isECL) |
| | | { |
| | | serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | else |
| | | { |
| | | serverStartMsg = new ServerStartECLMsg(url, 0, 0, 0, 0, |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | } |
| | | final StartMsg serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(), |
| | | getMaxRcvWindow(), config.getHeartbeatInterval(), state, |
| | | getGenerationID(), isSslEncryption, getGroupId()); |
| | | newSession.publish(serverStartMsg); |
| | | |
| | | // Read the ReplServerStartMsg or ReplServerStartDSMsg that should |
| | |
| | | return setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Performs the second phase handshake for External Change Log (send |
| | | * StartSessionMsg and receive TopologyMsg messages exchange) and return the |
| | | * reply message from the replication server. |
| | | * |
| | | * @param server Server we are connecting with. |
| | | */ |
| | | private void performECLPhaseTwoHandshake(String server, ConnectedRS rs) |
| | | { |
| | | try |
| | | { |
| | | // Send our Start Session |
| | | final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg(); |
| | | startECLSessionMsg.setOperationId("-1"); |
| | | rs.session.publish(startECLSessionMsg); |
| | | |
| | | // FIXME ECL In the handshake phase two, should RS send back a topo msg ? |
| | | if (logger.isTraceEnabled()) |
| | | { |
| | | debugInfo("RB HANDSHAKE SENT:\n" + startECLSessionMsg); |
| | | } |
| | | |
| | | // Alright set the timeout to the desired value |
| | | rs.session.setSoTimeout(timeout); |
| | | setConnectedRS(rs); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logger.warn(WARN_EXCEPTION_STARTING_SESSION_PHASE, getServerId(), server, getBaseDN().toNormalizedString(), |
| | | stackTraceToSingleLineString(e)); |
| | | |
| | | rs.session.close(); |
| | | setConnectedRS(ConnectedRS.noConnectedRS()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Performs the second phase handshake (send StartSessionMsg and receive |
| | | * TopologyMsg messages exchange) and return the reply message from the |
| | |
| | | |
| | | try |
| | | { |
| | | connect(); |
| | | connectAsDataServer(); |
| | | rs = connectedRS.get(); |
| | | } |
| | | catch (Exception e) |
| | |
| | | Map<Integer, ReplicationServerInfo> previousRsInfos) |
| | | { |
| | | this.rsServerId = rsServerId; |
| | | this.replicaInfos = dsInfosToKeep; |
| | | this.replicaInfos = dsInfosToKeep == null |
| | | ? Collections.<Integer, DSInfo>emptyMap() : dsInfosToKeep; |
| | | this.rsInfos = computeRSInfos(dsServerId, newRSInfos, |
| | | previousRsInfos, configuredReplicationServerUrls); |
| | | } |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "rsServerId=" + rsServerId + ", replicaInfos=" + replicaInfos |
| | | return getClass().getSimpleName() |
| | | + " rsServerId=" + rsServerId |
| | | + ", replicaInfos=" + replicaInfos.values() |
| | | + ", rsInfos=" + rsInfos.values(); |
| | | } |
| | | } |
| | |
| | | import org.opends.server.protocols.internal.InternalClientConnection; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType; |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg.Persistent; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.opends.server.workflowelement.localbackend.LocalBackendAddOperation; |
| | |
| | | msg1.getBytes(getCurrentVersion()), getCurrentVersion()); |
| | | } |
| | | |
| | | @Test(enabled=true) |
| | | public void eclUpdateMsg() |
| | | throws Exception |
| | | { |
| | | // create a msg to put in the eclupdatemsg |
| | | InternalClientConnection connection = |
| | | InternalClientConnection.getRootConnection(); |
| | | DeleteOperation deleteOp = |
| | | new DeleteOperationBasis(connection, 1, 1,null, DN.valueOf("cn=t1")); |
| | | LocalBackendDeleteOperation op = new LocalBackendDeleteOperation(deleteOp); |
| | | CSN csn = new CSN(TimeThread.getTime(), 123, 45); |
| | | op.setAttachment(SYNCHROCONTEXT, new DeleteContext(csn, "uniqueid")); |
| | | DeleteMsg delmsg = new DeleteMsg(op); |
| | | long changeNumber = 21; |
| | | |
| | | DN baseDN = DN.valueOf("dc=example,dc=com"); |
| | | |
| | | // create a cookie |
| | | MultiDomainServerState cookie = |
| | | new MultiDomainServerState( |
| | | "o=test:000001210b6f21e904b100000001 000001210b6f21e904b200000001;" + |
| | | "o=test2:000001210b6f21e904b100000002 000001210b6f21e904b200000002;"); |
| | | |
| | | // Constructor test |
| | | ECLUpdateMsg msg1 = new ECLUpdateMsg(delmsg, cookie, baseDN, changeNumber); |
| | | assertTrue(msg1.getCookie().equalsTo(cookie)); |
| | | assertEquals(msg1.getBaseDN(), baseDN); |
| | | assertEquals(msg1.getChangeNumber(), changeNumber); |
| | | DeleteMsg delmsg2 = (DeleteMsg)msg1.getUpdateMsg(); |
| | | assertEquals(delmsg.compareTo(delmsg2), 0); |
| | | |
| | | // Constructor test (with byte[]) |
| | | ECLUpdateMsg msg2 = new ECLUpdateMsg(msg1.getBytes(getCurrentVersion())); |
| | | assertTrue(msg2.getCookie().equalsTo(msg2.getCookie())); |
| | | assertTrue(msg2.getCookie().equalsTo(cookie)); |
| | | assertEquals(msg2.getBaseDN(), msg1.getBaseDN()); |
| | | assertEquals(msg2.getBaseDN(), baseDN); |
| | | assertEquals(msg2.getChangeNumber(), msg1.getChangeNumber()); |
| | | assertEquals(msg2.getChangeNumber(), changeNumber); |
| | | |
| | | DeleteMsg delmsg1 = (DeleteMsg)msg1.getUpdateMsg(); |
| | | delmsg2 = (DeleteMsg)msg2.getUpdateMsg(); |
| | | assertEquals(delmsg2.compareTo(delmsg), 0); |
| | | assertEquals(delmsg2.compareTo(delmsg1), 0); |
| | | } |
| | | |
| | | @DataProvider(name="createServerStartData") |
| | | public Object[][] createServerStartData() throws Exception |
| | | { |
| | |
| | | newMsg.getDegradedStatusThreshold()); |
| | | } |
| | | |
| | | @DataProvider(name="createReplServerStartDSData") |
| | | public Object[][] createReplServerStartDSData() throws Exception |
| | | { |
| | | DN baseDN = TEST_ROOT_DN; |
| | | |
| | | final ServerState state1 = new ServerState(); |
| | | state1.update(new CSN(0, 0, 0)); |
| | | final ServerState state2 = new ServerState(); |
| | | state2.update(new CSN(75, 5, 263)); |
| | | final ServerState state3 = new ServerState(); |
| | | state3.update(new CSN(123, 5, 98)); |
| | | |
| | | return new Object[][] |
| | | { |
| | | {1, baseDN, 0, "localhost:8989", state1, 0L, (byte)0, 0, 0, 0}, |
| | | {16, baseDN, 100, "anotherHost:1025", state2, 1245L, (byte)25, 3456, 3, 31512}, |
| | | {36, baseDN, 100, "anotherHostAgain:8017", state3, 6841L, (byte)32, 2496, 630, 9524}, |
| | | }; |
| | | } |
| | | |
| | | /** |
| | | * Test that ReplServerStartDSMsg encoding and decoding works |
| | | * by checking that : msg == new ReplServerStartMsg(msg.getBytes()). |
| | | */ |
| | | @Test(dataProvider="createReplServerStartDSData") |
| | | public void replServerStartDSMsgTest(int serverId, DN baseDN, int window, |
| | | String url, ServerState state, long genId, byte groupId, int degTh, |
| | | int weight, int connectedDSNumber) throws Exception |
| | | { |
| | | ReplServerStartDSMsg msg = new ReplServerStartDSMsg(serverId, |
| | | url, baseDN, window, state, genId, |
| | | true, groupId, degTh, weight, connectedDSNumber); |
| | | ReplServerStartDSMsg newMsg = new ReplServerStartDSMsg(msg.getBytes(getCurrentVersion())); |
| | | assertEquals(msg.getServerId(), newMsg.getServerId()); |
| | | assertEquals(msg.getServerURL(), newMsg.getServerURL()); |
| | | assertEquals(msg.getBaseDN(), newMsg.getBaseDN()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getServerState().getCSN(1), |
| | | newMsg.getServerState().getCSN(1)); |
| | | assertEquals(newMsg.getVersion(), getCurrentVersion()); |
| | | assertEquals(msg.getGenerationId(), newMsg.getGenerationId()); |
| | | assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption()); |
| | | assertEquals(msg.getGroupId(), newMsg.getGroupId()); |
| | | assertEquals(msg.getDegradedStatusThreshold(), |
| | | newMsg.getDegradedStatusThreshold()); |
| | | assertEquals(msg.getWeight(), newMsg.getWeight()); |
| | | assertEquals(msg.getConnectedDSNumber(), newMsg.getConnectedDSNumber()); |
| | | } |
| | | |
| | | /** |
| | | * Test that StopMsg encoding and decoding works |
| | | * by checking that : msg == new StopMsg(msg.getBytes()). |
| | |
| | | assertEquals(test.getBytes(), newMsg.getPayload()); |
| | | } |
| | | |
| | | /** |
| | | * Test that ServerStartMsg encoding and decoding works |
| | | * by checking that : msg == new ServerStartMsg(msg.getBytes()). |
| | | */ |
| | | @Test(enabled=true,dataProvider="createServerStartData") |
| | | public void startECLMsgTest(int serverId, DN baseDN, int window, |
| | | ServerState state, long genId, boolean sslEncryption, byte groupId) throws Exception |
| | | { |
| | | ServerStartECLMsg msg = new ServerStartECLMsg( |
| | | "localhost:1234", window, window, window, window, window, window, state, |
| | | genId, sslEncryption, groupId); |
| | | ServerStartECLMsg newMsg = new ServerStartECLMsg(msg.getBytes(getCurrentVersion())); |
| | | assertEquals(msg.getServerURL(), newMsg.getServerURL()); |
| | | assertEquals(msg.getMaxReceiveDelay(), newMsg.getMaxReceiveDelay()); |
| | | assertEquals(msg.getMaxReceiveQueue(), newMsg.getMaxReceiveQueue()); |
| | | assertEquals(msg.getMaxSendDelay(), newMsg.getMaxSendDelay()); |
| | | assertEquals(msg.getMaxSendQueue(), newMsg.getMaxSendQueue()); |
| | | assertEquals(msg.getWindowSize(), newMsg.getWindowSize()); |
| | | assertEquals(msg.getHeartbeatInterval(), newMsg.getHeartbeatInterval()); |
| | | assertEquals(msg.getSSLEncryption(), newMsg.getSSLEncryption()); |
| | | assertEquals(msg.getServerState().getCSN(1), |
| | | newMsg.getServerState().getCSN(1)); |
| | | assertEquals(newMsg.getVersion(), getCurrentVersion()); |
| | | assertEquals(msg.getGenerationId(), newMsg.getGenerationId()); |
| | | assertEquals(msg.getGroupId(), newMsg.getGroupId()); |
| | | } |
| | | |
| | | /** |
| | | * Test StartSessionMsg encoding and decoding. |
| | | */ |
| | | @Test() |
| | | public void startECLSessionMsgTest() throws Exception |
| | | { |
| | | // data |
| | | CSN csn = new CSN(TimeThread.getTime(), 123, 45); |
| | | ServerState state = new ServerState(); |
| | | assertTrue(state.update(new CSN(75, 5,263))); |
| | | |
| | | // create original |
| | | StartECLSessionMsg msg = new StartECLSessionMsg(); |
| | | msg.setCSN(csn); |
| | | msg.setCrossDomainServerState("fakegenstate"); |
| | | msg.setPersistent(Persistent.PERSISTENT); |
| | | msg.setFirstChangeNumber(13); |
| | | msg.setLastChangeNumber(14); |
| | | msg.setECLRequestType(ECLRequestType.REQUEST_TYPE_EQUALS_REPL_CHANGE_NUMBER); |
| | | msg.setOperationId("fakeopid"); |
| | | String dn1 = "cn=admin data"; |
| | | String dn2 = "cn=config"; |
| | | msg.setExcludedDNs(newSet(dn1, dn2)); |
| | | |
| | | // create copy |
| | | StartECLSessionMsg newMsg = new StartECLSessionMsg(msg.getBytes(getCurrentVersion())); |
| | | // test equality between the two copies |
| | | assertEquals(msg.getCSN(), newMsg.getCSN()); |
| | | assertEquals(msg.getPersistent(), newMsg.getPersistent()); |
| | | assertEquals(msg.getFirstChangeNumber(), newMsg.getFirstChangeNumber()); |
| | | assertEquals(msg.getECLRequestType(), newMsg.getECLRequestType()); |
| | | assertEquals(msg.getLastChangeNumber(), newMsg.getLastChangeNumber()); |
| | | assertTrue(msg.getCrossDomainServerState().equalsIgnoreCase(newMsg.getCrossDomainServerState())); |
| | | assertTrue(msg.getOperationId().equalsIgnoreCase(newMsg.getOperationId())); |
| | | Assertions.assertThat(newMsg.getExcludedBaseDNs()).containsOnly(dn1, dn2); |
| | | } |
| | | |
| | | private int perfRep = 100000; |
| | | |
| | | |