From 13819a2e81db0422a7c8c186f838c7b243173170 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 03 Sep 2014 06:30:37 +0000
Subject: [PATCH] OPENDJ-1205 (CR-4428) Remove network layer from External ChangeLog implementation
---
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 154 ++++++++++----------------------------------------
1 files changed, 32 insertions(+), 122 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 6dd0fa4..67195ef 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -27,10 +27,16 @@
package org.opends.server.replication.server;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -43,12 +49,33 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.*;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
+import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
+import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
-import org.opends.server.types.*;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.DN;
+import org.opends.server.types.DebugLogLevel;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.HostPort;
+import org.opends.server.types.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -114,9 +141,6 @@
private final Map<Integer, ReplicationServerHandler> connectedRSs =
new ConcurrentHashMap<Integer, ReplicationServerHandler>();
- private final Queue<MessageHandler> otherHandlers =
- new ConcurrentLinkedQueue<MessageHandler>();
-
private final ReplicationDomainDB domainDB;
/** The ReplicationServer that created the current instance. */
private final ReplicationServer localReplicationServer;
@@ -368,11 +392,6 @@
addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
}
}
-
- // Push the message to the other subscribing handlers
- for (MessageHandler mHandler : otherHandlers) {
- mHandler.add(updateMsg);
- }
}
private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
@@ -1097,10 +1116,6 @@
{
unregisterServerHandler(sHandler, shutdown, true);
}
- else if (otherHandlers.contains(sHandler))
- {
- unregisterOtherHandler(sHandler);
- }
}
catch(Exception e)
{
@@ -1117,12 +1132,6 @@
}
}
- private void unregisterOtherHandler(MessageHandler mHandler)
- {
- unRegisterHandler(mHandler);
- mHandler.shutdown();
- }
-
private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
boolean isDirectoryServer)
{
@@ -1149,60 +1158,6 @@
}
/**
- * Stop the handler.
- * @param mHandler The handler to stop.
- */
- public void stopServer(MessageHandler mHandler)
- {
- // TODO JNR merge with stopServer(ServerHandler, boolean)
- if (debugEnabled())
- {
- debug("stopServer() on the message handler " + mHandler);
- }
- /*
- * We must prevent deadlock on replication server domain lock, when for
- * instance this code is called from dying ServerReader but also dying
- * ServerWriter at the same time, or from a thread that wants to shut down
- * the handler. So use a thread safe flag to know if the job must be done
- * or not (is already being processed or not).
- */
- if (!mHandler.engageShutdown())
- // Only do this once (prevent other thread to enter here again)
- {
- try
- {
- // Acquire lock on domain (see more details in comment of start() method
- // of ServerHandler)
- lock();
- }
- catch (InterruptedException ex)
- {
- // We can't deal with this here, so re-interrupt thread so that it is
- // caught during subsequent IO.
- Thread.currentThread().interrupt();
- return;
- }
-
- try
- {
- if (otherHandlers.contains(mHandler))
- {
- unregisterOtherHandler(mHandler);
- }
- }
- catch(Exception e)
- {
- logError(Message.raw(Category.SYNC, Severity.NOTICE,
- stackTraceToSingleLineString(e)));
- }
- finally
- {
- release();
- }
- }
- }
-
- /**
* Unregister this handler from the list of handlers registered to this
* domain.
* @param sHandler the provided handler to unregister.
@@ -2427,39 +2382,6 @@
return attributes;
}
- /**
- * Register in the domain an handler that subscribes to changes.
- * @param mHandler the provided subscribing handler.
- */
- public void registerHandler(MessageHandler mHandler)
- {
- this.otherHandlers.add(mHandler);
- }
-
- /**
- * Unregister from the domain an handler.
- * @param mHandler the provided unsubscribing handler.
- * @return Whether this handler has been unregistered with success.
- */
- public boolean unRegisterHandler(MessageHandler mHandler)
- {
- return this.otherHandlers.remove(mHandler);
- }
-
- /**
- * Returns the oldest known state for the domain, made of the oldest CSN
- * stored for each serverId.
- * <p>
- * Note: Because the replication changelogDB trimming always keep one change
- * whatever its date, the CSN contained in the returned state can be very old.
- *
- * @return the start state of the domain.
- */
- public ServerState getOldestState()
- {
- return domainDB.getDomainOldestCSNs(baseDN);
- }
-
private void sendTopologyMsg(String type, ServerHandler handler,
TopologyMsg msg)
{
@@ -2524,18 +2446,6 @@
}
}
-
-
- /**
- * Get the latest (more recent) trim date of the changelog dbs associated
- * to this domain.
- * @return The latest trim date.
- */
- public long getLatestDomainTrimDate()
- {
- return domainDB.getDomainLatestTrimDate(baseDN);
- }
-
/**
* Return the monitor instance name of the ReplicationServer that created the
* current instance.
--
Gitblit v1.10.0