From 7b84e53457bce1f0733afa87797afc9928568c52 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 09 Oct 2006 14:15:55 +0000
Subject: [PATCH] - Change the synchronization code so that the changelog server can now be used directly through the ChangelogBroker class as a client API.
---
opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java | 81 ++++++++++++++++++++++++++++++++--------
1 files changed, 65 insertions(+), 16 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
index 3a4d466..b265bb3 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/synchronization/ChangelogBroker.java
@@ -39,6 +39,8 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
import org.opends.server.changelog.ProtocolSession;
import org.opends.server.changelog.SocketSession;
@@ -47,6 +49,7 @@
import org.opends.server.protocols.internal.InternalSearchListener;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
@@ -63,43 +66,64 @@
{
private boolean shutdown = false;
private List<String> servers;
- private Short identifier;
private boolean connected = false;
- private SynchronizationDomain domain;
private final Object lock = new Object();
private String changelogServer = "Not connected";
private TreeSet<FakeOperation> replayOperations;
private ProtocolSession session = null;
+ private final ServerState state;
+ private final DN baseDn;
+ private final short serverID;
+ private int maxSendDelay;
+ private int maxReceiveDelay;
+ private int maxSendQueue;
+ private int maxReceiveQueue;
/**
* Creates a new Changelog Broker for a particular SynchronizationDomain.
*
- * @param domain The SynchronizationDomain for which the borker is created.
+ * @param state The ServerState that should be used by this broker
+ * when negociating the session with the changelog servers.
+ * @param baseDn The base DN that should be used by this broker
+ * when negociating the session with the changelog servers.
+ * @param serverID The server ID that should be used by this broker
+ * when negociating the session with the changelog servers.
+ * @param maxReceiveQueue The maximum size of the receive queue to use on
+ * the changelog server.
+ * @param maxReceiveDelay The maximum replication delay to use on the
+ * changelog server.
+ * @param maxSendQueue The maximum size of the send queue to use on
+ * the changelog server.
+ * @param maxSendDelay The maximum send delay to use on the changelog server.
*/
- public ChangelogBroker(SynchronizationDomain domain)
+ public ChangelogBroker(ServerState state, DN baseDn, short serverID,
+ int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
+ int maxSendDelay )
{
- this.domain = domain;
+ this.baseDn = baseDn;
+ this.serverID = serverID;
+ this.maxReceiveDelay = maxReceiveDelay;
+ this.maxSendDelay = maxSendDelay;
+ this.maxReceiveQueue = maxReceiveQueue;
+ this.maxSendQueue = maxSendQueue;
+ this.state = state;
replayOperations =
new TreeSet<FakeOperation>(new FakeOperationComparator());
}
-
/**
* Start the ChangelogBroker.
*
- * @param identifier identifier of the changelog
* @param servers list of servers used
* @throws Exception : in case of errors
*/
- public void start(Short identifier,
- List<String> servers)
+ public void start(List<String> servers)
throws Exception
{
/*
* Open Socket to the Changelog
* Send the Start message
*/
- this.identifier = identifier;
this.servers = servers;
if (servers.size() < 1)
{
@@ -147,7 +171,9 @@
/*
* Send our ServerStartMessage.
*/
- ServerStartMessage msg = domain.newServerStartMessage();
+ ServerStartMessage msg = new ServerStartMessage( serverID, baseDn,
+ maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+ state);
session.publish(msg);
@@ -167,10 +193,10 @@
* those changes and send them again to any changelog server.
*/
ChangeNumber changelogMaxChangeNumber =
- startMsg.getServerState().getMaxChangeNumber(identifier);
+ startMsg.getServerState().getMaxChangeNumber(serverID);
if (changelogMaxChangeNumber == null)
- changelogMaxChangeNumber = new ChangeNumber(0, 0, identifier);
- ChangeNumber ourMaxChangeNumber = domain.getMaxChangeNumber();
+ changelogMaxChangeNumber = new ChangeNumber(0, 0, serverID);
+ ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverID);
if ((ourMaxChangeNumber == null) ||
(ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
{
@@ -206,7 +232,7 @@
LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
attrs.add(Historical.HISTORICALATTRIBUTENAME);
InternalSearchOperation op = conn.processSearch(
- new ASN1OctetString(domain.getBaseDN().toString()),
+ new ASN1OctetString(baseDn.toString()),
SearchScope.WHOLE_SUBTREE,
DereferencePolicy.NEVER_DEREF_ALIASES,
0, 0, false, filter,
@@ -381,8 +407,10 @@
/**
* Receive a message.
* @return the received message
+ * @throws SocketTimeoutException if the tiemout set by setSoTimeout
+ * has expired
*/
- public SynchronizationMessage receive()
+ public SynchronizationMessage receive() throws SocketTimeoutException
{
while (shutdown == false)
{
@@ -392,6 +420,11 @@
return session.receive();
} catch (Exception e)
{
+ if (e instanceof SocketTimeoutException)
+ {
+ SocketTimeoutException e1 = (SocketTimeoutException) e;
+ throw e1;
+ }
if (shutdown == false)
{
synchronized (lock)
@@ -439,6 +472,22 @@
}
/**
+ * Set a timeout value.
+ * With this option set to a non-zero value, calls to the receive() method
+ * block for only this amount of time after which a
+ * java.net.SocketTimeoutException is raised.
+ * The Broker is valid and useable even after such an Exception is raised.
+ *
+ * @param timeout the specified timeout, in milliseconds.
+ * @throws SocketException if there is an error in the underlying protocol,
+ * such as a TCP error.
+ */
+ public void setSoTimeout(int timeout) throws SocketException
+ {
+ session.setSoTimeout(timeout);
+ }
+
+ /**
* Get the name of the changelog server to which this broker is currently
* connected.
*
--
Gitblit v1.10.0