From a592fe71c4c2e29a136f9700a2981f3dcbd7e114 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 22 Sep 2014 19:47:33 +0000
Subject: [PATCH] OPENDJ-1205 (CR-4428) Remove network layer from External ChangeLog implementation
---
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 140 ++++++++++------------------------------------
1 files changed, 31 insertions(+), 109 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index bca5601..5fec6a9 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -27,10 +27,16 @@
package org.opends.server.replication.server;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
@@ -41,12 +47,31 @@
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
-import org.opends.server.replication.common.*;
-import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
+import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.MonitorMsg;
+import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.ResetGenerationIdMsg;
+import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
-import org.opends.server.types.*;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.HostPort;
import org.forgerock.opendj.ldap.ResultCode;
import static org.opends.messages.ReplicationMessages.*;
@@ -112,9 +137,6 @@
private final Map<Integer, ReplicationServerHandler> connectedRSs =
new ConcurrentHashMap<Integer, ReplicationServerHandler>();
- private final Queue<MessageHandler> otherHandlers =
- new ConcurrentLinkedQueue<MessageHandler>();
-
private final ReplicationDomainDB domainDB;
/** The ReplicationServer that created the current instance. */
private final ReplicationServer localReplicationServer;
@@ -368,11 +390,6 @@
addUpdate(dsHandler, updateMsg, notAssuredUpdateMsg, assuredServers);
}
}
-
- // Push the message to the other subscribing handlers
- for (MessageHandler mHandler : otherHandlers) {
- mHandler.add(updateMsg);
- }
}
private boolean isDifferentGenerationId(ReplicationServerHandler rsHandler,
@@ -1086,10 +1103,6 @@
{
unregisterServerHandler(sHandler, shutdown, true);
}
- else if (otherHandlers.contains(sHandler))
- {
- unregisterOtherHandler(sHandler);
- }
}
catch(Exception e)
{
@@ -1105,12 +1118,6 @@
}
}
- private void unregisterOtherHandler(MessageHandler mHandler)
- {
- unRegisterHandler(mHandler);
- mHandler.shutdown();
- }
-
private void unregisterServerHandler(ServerHandler sHandler, boolean shutdown,
boolean isDirectoryServer)
{
@@ -1137,59 +1144,6 @@
}
/**
- * Stop the handler.
- * @param mHandler The handler to stop.
- */
- public void stopServer(MessageHandler mHandler)
- {
- // TODO JNR merge with stopServer(ServerHandler, boolean)
- if (logger.isTraceEnabled())
- {
- debug("stopServer() on the message handler " + mHandler);
- }
- /*
- * We must prevent deadlock on replication server domain lock, when for
- * instance this code is called from dying ServerReader but also dying
- * ServerWriter at the same time, or from a thread that wants to shut down
- * the handler. So use a thread safe flag to know if the job must be done
- * or not (is already being processed or not).
- */
- if (!mHandler.engageShutdown())
- // Only do this once (prevent other thread to enter here again)
- {
- try
- {
- // Acquire lock on domain (see more details in comment of start() method
- // of ServerHandler)
- lock();
- }
- catch (InterruptedException ex)
- {
- // We can't deal with this here, so re-interrupt thread so that it is
- // caught during subsequent IO.
- Thread.currentThread().interrupt();
- return;
- }
-
- try
- {
- if (otherHandlers.contains(mHandler))
- {
- unregisterOtherHandler(mHandler);
- }
- }
- catch(Exception e)
- {
- logger.error(LocalizableMessage.raw(stackTraceToSingleLineString(e)));
- }
- finally
- {
- release();
- }
- }
- }
-
- /**
* Unregister this handler from the list of handlers registered to this
* domain.
* @param sHandler the provided handler to unregister.
@@ -2395,25 +2349,6 @@
}
/**
- * Register in the domain an handler that subscribes to changes.
- * @param mHandler the provided subscribing handler.
- */
- public void registerHandler(MessageHandler mHandler)
- {
- this.otherHandlers.add(mHandler);
- }
-
- /**
- * Unregister from the domain an handler.
- * @param mHandler the provided unsubscribing handler.
- * @return Whether this handler has been unregistered with success.
- */
- public boolean unRegisterHandler(MessageHandler mHandler)
- {
- return this.otherHandlers.remove(mHandler);
- }
-
- /**
* Returns the oldest known state for the domain, made of the oldest CSN
* stored for each serverId.
* <p>
@@ -2427,8 +2362,7 @@
return domainDB.getDomainOldestCSNs(baseDN);
}
- private void sendTopologyMsg(String type, ServerHandler handler,
- TopologyMsg msg)
+ private void sendTopologyMsg(String type, ServerHandler handler, TopologyMsg msg)
{
for (int i = 1; i <= 2; i++)
{
@@ -2491,18 +2425,6 @@
}
}
-
-
- /**
- * Get the latest (more recent) trim date of the changelog dbs associated
- * to this domain.
- * @return The latest trim date.
- */
- public long getLatestDomainTrimDate()
- {
- return domainDB.getDomainLatestTrimDate(baseDN);
- }
-
/**
* Return the monitor instance name of the ReplicationServer that created the
* current instance.
--
Gitblit v1.10.0