From b4f8838b15342670c31753a484abf0129e3c9653 Mon Sep 17 00:00:00 2001
From: jcduff <jcduff@localhost>
Date: Thu, 23 Oct 2008 14:04:24 +0000
Subject: [PATCH] The commit will bring the following features : - An updated version of the underlying database. BDB JE 3.3 is now used. - Attribute API refactoring providing a better abstraction and offering improved performances. - A new GUI called the Control-Panel to replace the Status-Panel: the specifications for this GUI are available on OpenDS Wiki and contains a link to a mockup. See <https://www.opends.org/wiki/page/ControlPanelUISpecification>. - Some changes in the replication protocol to implement "Assured Replication Mode". The specifications are on OpenDS Wiki at <https://www.opends.org/wiki/page/AssuredMode> and section 7 described some of the replication changes required to support this. Assured Replication is not finished, but the main replication protocol changes to support it are done. As explained by Gilles on an email on the Dev mailing list (http://markmail.org/message/46rgo3meq3vriy4a), with these changes the newer versions of OpenDS may not be able to replicate with OpenDS 1.0 instances. - Support for Service Tags on the platforms where the functionality is available and enabled. Specifications are published at <https://www.opends.org/wiki/page/OpenDSServiceTagEnabled>. For more information on Service Tags see <http://wikis.sun.com/display/ServiceTag/Sun+Service+Tag+FAQ>. - The Admin Connector service. In order to provide agentry of the OpenDS server at any time, a new service has been added, dedicated to the administration, configuration and monitoring of the server. An overview of the Admin Connector service and it's use is available on the OpenDS wiki <https://www.opends.org/wiki/page/ManagingAdministrationTrafficToTheServer> - Updates to the various command line tools to support the Admin Connector service. - Some internal re-architecting of the server to put the foundation of future developments such as virtual directory services. The new NetworkGroups and WorkFlow internal services which have been specified in <https://www.opends.org/wiki/page/BasicOperationRoutingThroughNetworkGroup> are now implemented. - Many bug fixes...
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 143 ++++++++++++++++++++++++++++++++---------------
1 files changed, 96 insertions(+), 47 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 5fe6a1e..0175414 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -44,7 +44,6 @@
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@@ -66,8 +65,8 @@
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeValue;
+import org.opends.server.types.AttributeBuilder;
+import org.opends.server.types.Attributes;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
@@ -114,7 +113,6 @@
private String localURL = "null";
private boolean shutdown = false;
- private short replicationServerId;
private ReplicationDbEnv dbEnv;
private int rcvWindow;
private int queueSize;
@@ -139,6 +137,19 @@
private boolean connectedInTopology = false;
private final Object connectedInTopologyLock = new Object();
+ /*
+ * Assured mode properties
+ */
+ // Timeout (in milliseconds) when waiting for acknowledgments
+ private long assuredTimeout = 1000;
+
+ // Group id
+ private byte groupId = (byte)1;
+
+ // Number of pending changes for a DS, considered as threshold value to put
+ // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled
+ private int degradedStatusThreshold = 5000;
+
/**
* The tracer object for the debug logger.
*/
@@ -156,7 +167,7 @@
super("Replication Server" + configuration.getReplicationPort());
replicationPort = configuration.getReplicationPort();
- replicationServerId = (short) configuration.getReplicationServerId();
+ serverId = (short) configuration.getReplicationServerId();
replicationServers = configuration.getReplicationServer();
if (replicationServers == null)
replicationServers = new ArrayList<String>();
@@ -187,9 +198,12 @@
Message msg = ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString());
throw new ConfigException(msg, e);
}
+ groupId = (byte)configuration.getGroupId();
+ assuredTimeout = configuration.getAssuredTimeout();
+ degradedStatusThreshold = configuration.getDegradedStatusThreshold();
replSessionSecurity = new ReplSessionSecurity(configuration);
- initialize(replicationServerId, replicationPort);
+ initialize(replicationPort);
configuration.addChangeListener(this);
DirectoryServer.registerMonitorProvider(this);
@@ -246,11 +260,11 @@
try
{
newSocket = listenSocket.accept();
- newSocket.setReceiveBufferSize(1000000);
newSocket.setTcpNoDelay(true);
newSocket.setKeepAlive(true);
ProtocolSession session =
- replSessionSecurity.createServerSession(newSocket);
+ replSessionSecurity.createServerSession(newSocket,
+ ReplSessionSecurity.HANDSHAKE_TIMEOUT);
if (session == null) // Error, go back to accept
continue;
ServerHandler handler = new ServerHandler(session, queueSize);
@@ -362,12 +376,12 @@
InetSocketAddress ServerAddr = new InetSocketAddress(
InetAddress.getByName(hostname), Integer.parseInt(port));
Socket socket = new Socket();
- socket.setReceiveBufferSize(1000000);
socket.setTcpNoDelay(true);
socket.connect(ServerAddr, 500);
ServerHandler handler = new ServerHandler(
- replSessionSecurity.createClientSession(serverURL, socket),
+ replSessionSecurity.createClientSession(serverURL, socket,
+ ReplSessionSecurity.HANDSHAKE_TIMEOUT),
queueSize);
handler.start(baseDn, serverId, this.serverURL, rcvWindow,
sslEncryption, this);
@@ -382,12 +396,11 @@
/**
* initialization function for the replicationServer.
*
- * @param changelogId The unique identifier for this replicationServer.
* @param changelogPort The port on which the replicationServer should
* listen.
*
*/
- private void initialize(short changelogId, int changelogPort)
+ private void initialize(int changelogPort)
{
shutdown = false;
@@ -400,11 +413,6 @@
this);
/*
- * create replicationServer replicationServerDomain
- */
- serverId = changelogId;
-
- /*
* Open replicationServer socket
*/
String localhostname = InetAddress.getLocalHost().getHostName();
@@ -412,7 +420,6 @@
serverURL = localhostname + ":" + String.valueOf(changelogPort);
localURL = localAdddress + ":" + String.valueOf(changelogPort);
listenSocket = new ServerSocket();
- listenSocket.setReceiveBufferSize(1000000);
listenSocket.bind(new InetSocketAddress(changelogPort));
/*
@@ -421,9 +428,10 @@
*/
if (debugEnabled())
TRACER.debugInfo("RS " +getMonitorInstanceName()+
- " creates connect threads");
+ " creates connect thread");
connectThread =
- new ReplicationServerConnectThread("Replication Server Connect", this);
+ new ReplicationServerConnectThread("Replication Server Connect " +
+ serverId , this);
connectThread.start();
// FIXME : Is it better to have the time to receive the ReplServerInfo
@@ -432,10 +440,11 @@
try { Thread.sleep(300);} catch(Exception e) {}
if (debugEnabled())
TRACER.debugInfo("RS " +getMonitorInstanceName()+
- " creates listen threads");
+ " creates listen thread");
listenThread =
- new ReplicationServerListenThread("Replication Server Listener", this);
+ new ReplicationServerListenThread("Replication Server Listener " +
+ serverId , this);
listenThread.start();
if (debugEnabled())
@@ -546,7 +555,7 @@
* DN given in parameter.
*
* @param id The serverId for which the dbHandler must be created.
- * @param baseDn The DN for which the dbHandler muste be created.
+ * @param baseDn The DN for which the dbHandler must be created.
* @return The new DB handler for this ReplicationServer and the serverId and
* DN given in parameter.
* @throws DatabaseException in case of underlying database problem.
@@ -594,7 +603,7 @@
* @param configuration The configuration to check.
* @param unacceptableReasons When the configuration is not acceptable, this
* table is use to return the reasons why this
- * configuration is not acceptbale.
+ * configuration is not acceptable.
*
* @return true if the configuration is acceptable, false other wise.
*/
@@ -666,7 +675,6 @@
serverURL = localhostname + ":" + String.valueOf(replicationPort);
localURL = localAdddress + ":" + String.valueOf(replicationPort);
listenSocket = new ServerSocket();
- listenSocket.setReceiveBufferSize(1000000);
listenSocket.bind(new InetSocketAddress(replicationPort));
listenThread =
@@ -690,6 +698,31 @@
}
}
+ // Update threshold value for status analyzers (stop them if requested
+ // value is 0)
+ if (degradedStatusThreshold != configuration.getDegradedStatusThreshold())
+ {
+ int oldThresholdValue = degradedStatusThreshold;
+ degradedStatusThreshold = configuration.getDegradedStatusThreshold();
+ for(ReplicationServerDomain rsd : baseDNs.values())
+ {
+ if (degradedStatusThreshold == 0)
+ {
+ // Requested to stop analyzers
+ rsd.stopStatusAnalyzer();
+ } else if (rsd.isRunningStatusAnalyzer())
+ {
+ // Update the threshold value for this running analyzer
+ rsd.updateStatusAnalyzer(degradedStatusThreshold);
+ } else if (oldThresholdValue == 0)
+ {
+ // Requested to start analyzers with provided threshold value
+ if (rsd.getConnectedDSs().size() > 0)
+ rsd.startStatusAnalyzer();
+ }
+ }
+ }
+
if ((configuration.getReplicationDBDirectory() != null) &&
(!dbDirname.equals(configuration.getReplicationDBDirectory())))
{
@@ -724,7 +757,7 @@
public String getMonitorInstanceName()
{
return "Replication Server " + this.replicationPort + " "
- + replicationServerId;
+ + serverId;
}
/**
@@ -757,31 +790,23 @@
* publish the server id and the port number.
*/
ArrayList<Attribute> attributes = new ArrayList<Attribute>();
- attributes.add(new Attribute("replication server id",
+ attributes.add(Attributes.create("replication server id",
String.valueOf(serverId)));
- attributes.add(new Attribute("replication server port",
+ attributes.add(Attributes.create("replication server port",
String.valueOf(replicationPort)));
/*
* Add all the base DNs that are known by this replication server.
*/
- AttributeType baseType=
- DirectoryServer.getAttributeType("base-dn", true);
- LinkedHashSet<AttributeValue> baseValues =
- new LinkedHashSet<AttributeValue>();
+ AttributeBuilder builder = new AttributeBuilder("base-dn");
for (DN base : baseDNs.keySet())
{
- baseValues.add(new AttributeValue(baseType, base. toString()));
+ builder.add(base.toString());
}
-
- Attribute bases = new Attribute(baseType, "base-dn", baseValues);
- attributes.add(bases);
+ attributes.add(builder.toAttribute());
// Publish to monitor the generation ID by replicationServerDomain
- AttributeType generationIdType=
- DirectoryServer.getAttributeType("base-dn-generation-id", true);
- LinkedHashSet<AttributeValue> generationIdValues =
- new LinkedHashSet<AttributeValue>();
+ builder = new AttributeBuilder("base-dn-generation-id");
for (DN base : baseDNs.keySet())
{
long generationId=-1;
@@ -789,12 +814,9 @@
getReplicationServerDomain(base, false);
if (replicationServerDomain != null)
generationId = replicationServerDomain.getGenerationId();
- generationIdValues.add(new AttributeValue(generationIdType,
- base.toString() + " " + generationId));
+ builder.add(base.toString() + " " + generationId);
}
- Attribute generationIds = new Attribute(generationIdType, "generation-id",
- generationIdValues);
- attributes.add(generationIds);
+ attributes.add(builder.toAttribute());
return attributes;
}
@@ -916,7 +938,7 @@
{
try
{
- if (!DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
+ if (DirectoryServer.getConfigHandler().entryExists(backendConfigEntryDN))
{
// Delete the replication backend
DirectoryServer.getConfigHandler().deleteEntry(backendConfigEntryDN,
@@ -964,7 +986,7 @@
boolean successful)
{
if (backend.getBackendID().equals(backendId))
- initialize(this.replicationServerId, this.replicationPort);
+ initialize(this.replicationPort);
}
/**
@@ -1041,6 +1063,33 @@
}
/**
+ * Get the assured mode timeout.
+ * @return The assured mode timeout.
+ */
+ public long getAssuredTimeout()
+ {
+ return assuredTimeout;
+ }
+
+ /**
+ * Get The replication server group id.
+ * @return The replication server group id.
+ */
+ public byte getGroupId()
+ {
+ return groupId;
+ }
+
+ /**
+ * Get the threshold value for status analyzer.
+ * @return The threshold value for status analyzer.
+ */
+ public int getDegradedStatusThreshold()
+ {
+ return degradedStatusThreshold;
+ }
+
+ /**
* Compute the list of replication servers that are not any
* more connected to this Replication Server and stop the
* corresponding handlers.
--
Gitblit v1.10.0