| | |
| | | package org.opends.server.replication.server; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.*; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Collections; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.*; |
| | | import org.opends.server.replication.protocol.*; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.DSInfo; |
| | | import org.opends.server.replication.common.RSInfo; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.common.ServerStatus; |
| | | import org.opends.server.replication.common.StatusMachineEvent; |
| | | import org.opends.server.replication.protocol.AckMsg; |
| | | import org.opends.server.replication.protocol.ChangeStatusMsg; |
| | | import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg; |
| | | import org.opends.server.replication.protocol.ErrorMsg; |
| | | import org.opends.server.replication.protocol.MonitorMsg; |
| | | import org.opends.server.replication.protocol.MonitorRequestMsg; |
| | | import org.opends.server.replication.protocol.ReplicaOfflineMsg; |
| | | import org.opends.server.replication.protocol.ResetGenerationIdMsg; |
| | | import org.opends.server.replication.protocol.RoutableMsg; |
| | | import org.opends.server.replication.protocol.TopologyMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.HostPort; |
| | | import org.opends.server.types.ResultCode; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | |
| | | private final Map<Integer, ReplicationServerHandler> connectedRSs = |
| | | new ConcurrentHashMap<Integer, ReplicationServerHandler>(); |
| | | |
| | | private final Queue<MessageHandler> otherHandlers = |
| | | new ConcurrentLinkedQueue<MessageHandler>(); |
| | | |
| | | private final ReplicationDomainDB domainDB; |
| | | /** The ReplicationServer that created the current instance. */ |
| | | private final ReplicationServer localReplicationServer; |
| | |
| | | addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers); |
| | | } |
| | | } |
| | | |
| | | // Push the message to the other subscribing handlers |
| | | for (MessageHandler mHandler : otherHandlers) { |
| | | mHandler.add(updateMsg); |
| | | } |
| | | } |
| | | |
| | | private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler, |
| | |
| | | { |
| | | unregisterServerHandler(sHandler, shutdown, true); |
| | | } |
| | | else if (otherHandlers.contains(sHandler)) |
| | | { |
| | | unregisterOtherHandler(sHandler); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private void unregisterOtherHandler(MessageHandler mHandler) |
| | | { |
| | | unRegisterHandler(mHandler); |
| | | mHandler.shutdown(); |
| | | } |
| | | |
| | | private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown, |
| | | boolean isDirectoryServer) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Stop the handler. |
| | | * @param mHandler The handler to stop. |
| | | */ |
| | | public void stopServer(MessageHandler mHandler) |
| | | { |
| | | // TODO JNR merge with stopServer(ServerHandler, boolean) |
| | | if (debugEnabled()) |
| | | { |
| | | debug("stopServer() on the message handler " + mHandler); |
| | | } |
| | | /* |
| | | * We must prevent deadlock on replication server domain lock, when for |
| | | * instance this code is called from dying ServerReader but also dying |
| | | * ServerWriter at the same time, or from a thread that wants to shut down |
| | | * the handler. So use a thread safe flag to know if the job must be done |
| | | * or not (is already being processed or not). |
| | | */ |
| | | if (!mHandler.engageShutdown()) |
| | | // Only do this once (prevent other thread to enter here again) |
| | | { |
| | | try |
| | | { |
| | | // Acquire lock on domain (see more details in comment of start() method |
| | | // of ServerHandler) |
| | | lock(); |
| | | } |
| | | catch (InterruptedException ex) |
| | | { |
| | | // We can't deal with this here, so re-interrupt thread so that it is |
| | | // caught during subsequent IO. |
| | | Thread.currentThread().interrupt(); |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | if (otherHandlers.contains(mHandler)) |
| | | { |
| | | unregisterOtherHandler(mHandler); |
| | | } |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | logError(Message.raw(Category.SYNC, Severity.NOTICE, |
| | | stackTraceToSingleLineString(e))); |
| | | } |
| | | finally |
| | | { |
| | | release(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Unregister this handler from the list of handlers registered to this |
| | | * domain. |
| | | * @param sHandler the provided handler to unregister. |
| | |
| | | return attributes; |
| | | } |
| | | |
| | | /** |
| | | * Register in the domain an handler that subscribes to changes. |
| | | * @param mHandler the provided subscribing handler. |
| | | */ |
| | | public void registerHandler(MessageHandler mHandler) |
| | | { |
| | | this.otherHandlers.add(mHandler); |
| | | } |
| | | |
| | | /** |
| | | * Unregister from the domain an handler. |
| | | * @param mHandler the provided unsubscribing handler. |
| | | * @return Whether this handler has been unregistered with success. |
| | | */ |
| | | public boolean unRegisterHandler(MessageHandler mHandler) |
| | | { |
| | | return this.otherHandlers.remove(mHandler); |
| | | } |
| | | |
| | | /** |
| | | * Returns the oldest known state for the domain, made of the oldest CSN |
| | | * stored for each serverId. |
| | | * <p> |
| | | * Note: Because the replication changelogDB trimming always keep one change |
| | | * whatever its date, the CSN contained in the returned state can be very old. |
| | | * |
| | | * @return the start state of the domain. |
| | | */ |
| | | public ServerState getOldestState() |
| | | { |
| | | return domainDB.getDomainOldestCSNs(baseDN); |
| | | } |
| | | |
| | | private void sendTopologyMsg(String type, ServerHandler handler, |
| | | TopologyMsg msg) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Get the latest (more recent) trim date of the changelog dbs associated |
| | | * to this domain. |
| | | * @return The latest trim date. |
| | | */ |
| | | public long getLatestDomainTrimDate() |
| | | { |
| | | return domainDB.getDomainLatestTrimDate(baseDN); |
| | | } |
| | | |
| | | /** |
| | | * Return the monitor instance name of the ReplicationServer that created the |
| | | * current instance. |