From 2a3c23e2c86bdd75b1fe6e2caaf20bf9ec077e2e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Thu, 20 Dec 2007 09:55:58 +0000
Subject: [PATCH] The code checked in with revision 3561 cause some problems with hanging tests and failing tests.
---
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java | 737 +++++++++++++++++++-------------------------------------
1 files changed, 255 insertions(+), 482 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
index 049910b..94fddf0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
@@ -25,7 +25,6 @@
* Portions Copyright 2006-2007 Sun Microsystems, Inc.
*/
package org.opends.server.replication.plugin;
-
import org.opends.messages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -42,8 +41,6 @@
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
@@ -64,16 +61,17 @@
import org.opends.server.types.SearchResultReference;
import org.opends.server.types.SearchScope;
+
/**
* The broker for Multi-master Replication.
*/
public class ReplicationBroker implements InternalSearchListener
{
-
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
+
private boolean shutdown = false;
private Collection<String> servers;
private boolean connected = false;
@@ -97,23 +95,23 @@
private long generationId = -1;
private ReplSessionSecurity replSessionSecurity;
- // Trick for avoiding a inner class for many parameters return for
- // performHandshake method.
- private String tmpReadableServerName = null;
-
/**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
*/
private long heartbeatInterval = 0;
+
+
/**
* A thread to monitor heartbeats on the session.
*/
private HeartbeatMonitor heartbeatMonitor = null;
+
/**
* The number of times the connection was lost.
*/
private int numLostConnections = 0;
+
/**
* When the broker cannot connect to any replication server
* it log an error and keeps continuing every second.
@@ -123,6 +121,7 @@
* finally succeed to connect.
*/
private boolean connectionError = false;
+
private final Object connectPhaseLock = new Object();
/**
@@ -150,9 +149,9 @@
* @param replSessionSecurity The session security configuration.
*/
public ReplicationBroker(ServerState state, DN baseDn, short serverID,
- int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
- int maxSendDelay, int window, long heartbeatInterval,
- long generationId, ReplSessionSecurity replSessionSecurity)
+ int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
+ int maxSendDelay, int window, long heartbeatInterval,
+ long generationId, ReplSessionSecurity replSessionSecurity)
{
this.baseDn = baseDn;
this.serverID = serverID;
@@ -165,7 +164,7 @@
new TreeSet<FakeOperation>(new FakeOperationComparator());
this.rcvWindow = window;
this.maxRcvWindow = window;
- this.halfRcvWindow = window / 2;
+ this.halfRcvWindow = window/2;
this.heartbeatInterval = heartbeatInterval;
this.protocolVersion = ProtocolVersion.currentVersion();
this.generationId = generationId;
@@ -195,6 +194,7 @@
this.connect();
}
+
/**
* Connect to a ReplicationServer.
*
@@ -202,7 +202,7 @@
*/
private void connect()
{
- HashMap<String, ServerState> rsStates = new HashMap<String, ServerState>();
+ ReplServerStartMessage replServerStartMsg = null;
// Stop any existing heartbeat monitor from a previous session.
if (heartbeatMonitor != null)
@@ -211,121 +211,187 @@
heartbeatMonitor = null;
}
+ // checkState is true for the first loop on all replication servers
+ // looking for one already up-to-date.
+ // If we found some responding replication servers but none up-to-date
+ // then we set check-state to false and do a second loop where the first
+ // found will be the one elected and then we will update this replication
+ // server.
+ boolean checkState = true;
+ boolean receivedResponse = true;
+
+ // TODO: We are doing here 2 loops opening , closing , reopening session to
+ // the same servers .. risk to have 'same server id' erros.
+ // Would be better to do only one loop, keeping the best candidate while
+ // traversing the list of replication servers to connect to.
+ if (servers.size()==1)
+ {
+ checkState = false;
+ }
+
synchronized (connectPhaseLock)
{
- /*
- * Connect to each replication server and get their ServerState then find
- * out which one is the best to connect to.
- */
- for (String server : servers)
+ while ((!connected) && (!shutdown) && (receivedResponse))
{
- // Connect to server and get reply message
- ReplServerStartMessage replServerStartMsg =
- performHandshake(server, false);
- tmpReadableServerName = null; // Not needed now
-
- // Store reply message in list
- if (replServerStartMsg != null)
+ receivedResponse = false;
+ for (String server : servers)
{
- ServerState rsState = replServerStartMsg.getServerState();
- rsStates.put(server, rsState);
- }
- } // for servers
+ int separator = server.lastIndexOf(':');
+ String port = server.substring(separator + 1);
+ String hostname = server.substring(0, separator);
- ReplServerStartMessage replServerStartMsg = null;
-
- if (rsStates.size() > 0)
- {
-
- // At least one server answered, find the best one.
- String bestServer = computeBestReplicationServer(state, rsStates,
- serverID, baseDn);
-
- // Best found, now connect to this one
- replServerStartMsg = performHandshake(bestServer, true);
-
- if (replServerStartMsg != null)
- {
try
{
/*
+ * Open a socket connection to the next candidate.
+ */
+ InetSocketAddress ServerAddr = new InetSocketAddress(
+ InetAddress.getByName(hostname), Integer.parseInt(port));
+ Socket socket = new Socket();
+ socket.setReceiveBufferSize(1000000);
+ socket.setTcpNoDelay(true);
+ socket.connect(ServerAddr, 500);
+ session = replSessionSecurity.createClientSession(server, socket);
+ boolean isSslEncryption =
+ replSessionSecurity.isSslEncryption(server);
+ /*
+ * Send our ServerStartMessage.
+ */
+ ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
+ maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
+ halfRcvWindow*2, heartbeatInterval, state,
+ protocolVersion, generationId, isSslEncryption);
+ session.publish(msg);
+
+
+ /*
+ * Read the ReplServerStartMessage that should come back.
+ */
+ session.setSoTimeout(1000);
+ replServerStartMsg = (ReplServerStartMessage) session.receive();
+ receivedResponse = true;
+
+ /*
+ * We have sent our own protocol version to the replication server.
+ * The replication server will use the same one (or an older one
+ * if it is an old replication server).
+ */
+ protocolVersion = ProtocolVersion.minWithCurrent(
+ replServerStartMsg.getVersion());
+ session.setSoTimeout(timeout);
+
+ if (!isSslEncryption)
+ {
+ session.stopEncryption();
+ }
+
+ /*
* We must not publish changes to a replicationServer that has not
* seen all our previous changes because this could cause some
* other ldap servers to miss those changes.
* Check that the ReplicationServer has seen all our previous
* changes.
+ * If not, try another replicationServer.
+ * If no other replicationServer has seen all our changes, recover
+ * those changes and send them again to any replicationServer.
*/
ChangeNumber replServerMaxChangeNumber =
replServerStartMsg.getServerState().getMaxChangeNumber(serverID);
-
if (replServerMaxChangeNumber == null)
- {
replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
- }
ChangeNumber ourMaxChangeNumber =
state.getMaxChangeNumber(serverID);
-
- if ((ourMaxChangeNumber != null) &&
- (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
+ if ((ourMaxChangeNumber == null) ||
+ (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
{
-
- // Replication server is missing some of our changes: let's send
- // them to him.
- replayOperations.clear();
-
- Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
- logError(message);
-
- /*
- * Get all the changes that have not been seen by this
- * replication server and populate the replayOperations
- * list.
- */
- InternalSearchOperation op = seachForChangedEntries(
- baseDn, replServerMaxChangeNumber, this);
- if (op.getResultCode() != ResultCode.SUCCESS)
+ replicationServer = ServerAddr.toString();
+ maxSendWindow = replServerStartMsg.getWindowSize();
+ connected = true;
+ startHeartBeat();
+ break;
+ }
+ else
+ {
+ if (checkState == true)
{
- /*
- * An error happened trying to search for the updates
- * This server will start acepting again new updates but
- * some inconsistencies will stay between servers.
- * Log an error for the repair tool
- * that will need to resynchronize the servers.
+ /* This replicationServer is missing some
+ * of our changes, we are going to try another server
+ * but before log a notice message
*/
- message = ERR_CANNOT_RECOVER_CHANGES.get(
- baseDn.toNormalizedString());
- logError(message);
- } else
- {
- for (FakeOperation replayOp : replayOperations)
- {
- message = DEBUG_SENDING_CHANGE.get(replayOp.getChangeNumber().
- toString());
- logError(message);
- session.publish(replayOp.generateMessage());
- }
- message = DEBUG_CHANGES_SENT.get();
+ Message message = NOTE_CHANGELOG_MISSING_CHANGES.get(server,
+ baseDn.toNormalizedString());
logError(message);
}
- }
+ else
+ {
+ replayOperations.clear();
- replicationServer = tmpReadableServerName;
- maxSendWindow = replServerStartMsg.getWindowSize();
- connected = true;
- startHeartBeat();
- } catch (IOException e)
+ // TODO: i18n
+ logError(Message.raw("going to search for changes"));
+
+ /*
+ * Get all the changes that have not been seen by this
+ * replicationServer and populate the replayOperations
+ * list.
+ */
+ InternalSearchOperation op = seachForChangedEntries(
+ baseDn, replServerMaxChangeNumber, this);
+ if (op.getResultCode() != ResultCode.SUCCESS)
+ {
+ /*
+ * An error happened trying to search for the updates
+ * This server will start acepting again new updates but
+ * some inconsistencies will stay between servers.
+ * Log an error for the repair tool
+ * that will need to resynchronize the servers.
+ */
+ Message message = ERR_CANNOT_RECOVER_CHANGES.get(
+ baseDn.toNormalizedString());
+ logError(message);
+ replicationServer = ServerAddr.toString();
+ maxSendWindow = replServerStartMsg.getWindowSize();
+ connected = true;
+ startHeartBeat();
+ }
+ else
+ {
+ replicationServer = ServerAddr.toString();
+ maxSendWindow = replServerStartMsg.getWindowSize();
+ connected = true;
+ for (FakeOperation replayOp : replayOperations)
+ {
+ logError(Message.raw("sendingChange")); // TODO: i18n
+ session.publish(replayOp.generateMessage());
+ }
+ startHeartBeat();
+ logError(Message.raw("changes sent")); // TODO: i18n
+ break;
+ }
+ }
+ }
+ }
+ catch (ConnectException e)
{
- Message message = ERR_PUBLISHING_FAKE_OPS.get(
- baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
- stackTraceToSingleLineString(e));
- logError(message);
- } catch (Exception e)
+ /*
+ * There was no server waiting on this host:port
+ * Log a notice and try the next replicationServer in the list
+ */
+ if (!connectionError )
+ {
+ // the error message is only logged once to avoid overflowing
+ // the error log
+ Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
+ logError(message);
+ }
+ }
+ catch (Exception e)
{
- Message message = ERR_COMPUTING_FAKE_OPS.get(
- baseDn.toNormalizedString(), bestServer, e.getLocalizedMessage() +
- stackTraceToSingleLineString(e));
+ Message message = ERR_EXCEPTION_STARTING_SESSION.get(
+ baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
+ stackTraceToSingleLineString(e));
logError(message);
- } finally
+ }
+ finally
{
if (connected == false)
{
@@ -336,60 +402,81 @@
session.close();
} catch (IOException e)
{
- // The session was already closed, just ignore.
+ // The session was already closed, just ignore.
}
session = null;
}
}
}
- } // Could perform handshake with best
- } // Reached some servers
+ } // for servers
+
+ // We have traversed all the replication servers
+
+ if ((!connected) && (checkState == true) && receivedResponse)
+ {
+ /*
+ * We could not find a replicationServer that has seen all the
+ * changes that this server has already processed, start again
+ * the loop looking for any replicationServer.
+ */
+ Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
+ baseDn.toNormalizedString());
+ logError(message);
+ checkState = false;
+ }
+ }
+
+ // We have traversed all the replication servers as many times as needed
+ // to find one if one is up and running.
if (connected)
{
+ // This server has connected correctly.
// Log a message to let the administrator know that the failure was
// resolved.
- // Wakeup all the thread that were waiting on the window
+ // wakeup all the thread that were waiting on the window
// on the previous connection.
connectionError = false;
if (sendWindow != null)
- {
sendWindow.release(Integer.MAX_VALUE);
- }
this.sendWindow = new Semaphore(maxSendWindow);
connectPhaseLock.notify();
if ((replServerStartMsg.getGenerationId() == this.generationId) ||
- (replServerStartMsg.getGenerationId() == -1))
+ (replServerStartMsg.getGenerationId() == -1))
{
Message message =
NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
- baseDn.toString(),
- replicationServer,
- Long.toString(this.generationId));
+ baseDn.toString(),
+ replicationServer,
+ Long.toString(this.generationId));
logError(message);
- } else
+ }
+ else
{
Message message =
NOTE_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
- baseDn.toString(),
- replicationServer,
- Long.toString(this.generationId),
- Long.toString(replServerStartMsg.getGenerationId()));
+ baseDn.toString(),
+ replicationServer,
+ Long.toString(this.generationId),
+ Long.toString(replServerStartMsg.getGenerationId()));
logError(message);
}
- } else
+ }
+ else
{
/*
- * This server could not find any replicationServer. It's going to start
- * in degraded mode. Log a message.
+ * This server could not find any replicationServer
+ * It's going to start in degraded mode.
+ * Log a message
*/
if (!connectionError)
{
+ checkState = false;
connectionError = true;
connectPhaseLock.notify();
Message message =
- NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
+ NOTE_COULD_NOT_FIND_CHANGELOG.get(baseDn.toString());
logError(message);
}
}
@@ -397,315 +484,6 @@
}
/**
- * Connect to the provided server performing the handshake (start messages
- * exchange) and return the reply message from the replication server.
- *
- * @param server Server to connect to.
- * @param keepConnection Do we keep session opened or not after handshake.
- * @return The ReplServerStartMessage the server replied. Null if could not
- * get an answer.
- */
- public ReplServerStartMessage performHandshake(String server,
- boolean keepConnection)
- {
- ReplServerStartMessage replServerStartMsg = null;
-
- // Parse server string.
- int separator = server.lastIndexOf(':');
- String port = server.substring(separator + 1);
- String hostname = server.substring(0, separator);
-
- boolean error = false;
- try
- {
- /*
- * Open a socket connection to the next candidate.
- */
- int intPort = Integer.parseInt(port);
- InetSocketAddress serverAddr = new InetSocketAddress(
- InetAddress.getByName(hostname), intPort);
- tmpReadableServerName = serverAddr.toString();
- Socket socket = new Socket();
- socket.setReceiveBufferSize(1000000);
- socket.setTcpNoDelay(true);
- socket.connect(serverAddr, 500);
- session = replSessionSecurity.createClientSession(server, socket);
- boolean isSslEncryption =
- replSessionSecurity.isSslEncryption(server);
- /*
- * Send our ServerStartMessage.
- */
- ServerStartMessage msg = new ServerStartMessage(serverID, baseDn,
- maxReceiveDelay, maxReceiveQueue, maxSendDelay, maxSendQueue,
- halfRcvWindow * 2, heartbeatInterval, state,
- protocolVersion, generationId, isSslEncryption);
- session.publish(msg);
-
- /*
- * Read the ReplServerStartMessage that should come back.
- */
- session.setSoTimeout(1000);
- replServerStartMsg = (ReplServerStartMessage) session.receive();
-
- /*
- * We have sent our own protocol version to the replication server.
- * The replication server will use the same one (or an older one
- * if it is an old replication server).
- */
- protocolVersion = ProtocolVersion.minWithCurrent(
- replServerStartMsg.getVersion());
- session.setSoTimeout(timeout);
-
- if (!isSslEncryption)
- {
- session.stopEncryption();
- }
- } catch (ConnectException e)
- {
- /*
- * There was no server waiting on this host:port
- * Log a notice and try the next replicationServer in the list
- */
- if (!connectionError)
- {
- // the error message is only logged once to avoid overflowing
- // the error log
- Message message = NOTE_NO_CHANGELOG_SERVER_LISTENING.get(server);
- logError(message);
- }
- error = true;
- } catch (Exception e)
- {
- Message message = ERR_EXCEPTION_STARTING_SESSION.get(
- baseDn.toNormalizedString(), server, e.getLocalizedMessage() +
- stackTraceToSingleLineString(e));
- logError(message);
- error = true;
- }
-
- // Close session if requested
- if (!keepConnection || error)
- {
- if (session != null)
- {
- try
- {
- session.close();
- } catch (IOException e)
- {
- // The session was already closed, just ignore.
- }
- session = null;
- }
- if (error)
- {
- replServerStartMsg = null;
- } // Be sure to return null.
- }
-
- return replServerStartMsg;
- }
-
- /**
- * Returns the replication server that best fits our need so that we can
- * connect to it.
- *
- * Note: this method put as public static for unit testing purpose.
- *
- * @param myState The local server state.
- * @param rsStates The list of available replication servers and their
- * associated server state.
- * @param serverId The server id for the suffix we are working for.
- * @param baseDn The suffix for which we are working for.
- * @return The computed best replication server.
- */
- public static String computeBestReplicationServer(ServerState myState,
- HashMap<String, ServerState> rsStates, short serverId, DN baseDn)
- {
-
- /*
- * Find replication servers who are up to date (or more up to date than us,
- * if for instance we failed and restarted, having sent some changes to the
- * RS but without having time to store our own state) regarding our own
- * server id. Then, among them, choose the server that is the most up to
- * date regarding the whole topology.
- *
- * If no server is up to date regarding our own server id, find the one who
- * is the most up to date regarding our server id.
- */
-
- // Should never happen (sanity check)
- if ((myState == null) || (rsStates == null) || (rsStates.size() < 1) ||
- (baseDn == null))
- {
- return null;
- }
-
- String bestServer = null;
- // Servers up to dates with regard to our changes
- HashMap<String, ServerState> upToDateServers =
- new HashMap<String, ServerState>();
- // Servers late with regard to our changes
- HashMap<String, ServerState> lateOnes = new HashMap<String, ServerState>();
-
- /*
- * Start loop to differenciate up to date servers from late ones.
- */
- ChangeNumber myChangeNumber = myState.getMaxChangeNumber(serverId);
- if (myChangeNumber == null)
- {
- myChangeNumber = new ChangeNumber(0, 0, serverId);
- }
- for (String repServer : rsStates.keySet())
- {
-
- ServerState rsState = rsStates.get(repServer);
- ChangeNumber rsChangeNumber = rsState.getMaxChangeNumber(serverId);
- if (rsChangeNumber == null)
- {
- rsChangeNumber = new ChangeNumber(0, 0, serverId);
- }
-
- // Store state in right list
- if (myChangeNumber.olderOrEqual(rsChangeNumber))
- {
- upToDateServers.put(repServer, rsState);
- } else
- {
- lateOnes.put(repServer, rsState);
- }
- }
-
- if (upToDateServers.size() > 0)
- {
-
- /*
- * Some up to date servers, among them, choose the one that has the
- * maximum number of changes to send us. This is the most up to date one
- * regarding the whole topology. This server is the one which has the less
- * difference with the topology server state. For comparison, we need to
- * compute the difference for each server id with the topology server
- * state.
- */
-
- Message message = NOTE_FOUND_CHANGELOGS_WITH_MY_CHANGES.get(
- upToDateServers.size(),
- baseDn.toNormalizedString());
- logError(message);
-
- /*
- * First of all, compute the virtual server state for the whole topology,
- * which is composed of the most up to date change numbers for
- * each server id in the topology.
- */
- ServerState topoState = new ServerState();
- for (ServerState curState : upToDateServers.values())
- {
-
- Iterator<Short> it = curState.iterator();
- while (it.hasNext())
- {
- Short sId = it.next();
- ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
- if (curSidCn == null)
- {
- curSidCn = new ChangeNumber(0, 0, sId);
- }
- // Update topology state
- topoState.update(curSidCn);
- }
- } // For up to date servers
-
- // Min of the max shifts
- long minShift = -1L;
- for (String upServer : upToDateServers.keySet())
- {
-
- /*
- * Compute the maximum difference between the time of a server id's
- * change number and the time of the matching server id's change
- * number in the topology server state.
- *
- * Note: we could have used the sequence number here instead of the
- * timestamp, but this would have caused a problem when the sequence
- * number loops and comes back to 0 (computation would have becomen
- * meaningless).
- */
- long shift = -1L;
- ServerState curState = upToDateServers.get(upServer);
- Iterator<Short> it = curState.iterator();
- while (it.hasNext())
- {
- Short sId = it.next();
- ChangeNumber curSidCn = curState.getMaxChangeNumber(sId);
- if (curSidCn == null)
- {
- curSidCn = new ChangeNumber(0, 0, sId);
- }
- // Cannot be null as checked at construction time
- ChangeNumber topoCurSidCn = topoState.getMaxChangeNumber(sId);
- // Cannot be negative as topoState computed as being the max CN
- // for each server id in the topology
- long tmpShift = topoCurSidCn.getTime() - curSidCn.getTime();
- if (tmpShift > shift)
- {
- shift = tmpShift;
- }
- }
-
- if ((minShift < 0) // First time in loop
- || (shift < minShift))
- {
- // This sever is even closer to topo state
- bestServer = upServer;
- minShift = shift;
- }
- } // For up to date servers
-
- } else
- {
- /*
- * We could not find a replication server that has seen all the
- * changes that this server has already processed,
- */
- // lateOnes cannot be empty
- Message message = NOTE_COULD_NOT_FIND_CHANGELOG_WITH_MY_CHANGES.get(
- baseDn.toNormalizedString(), lateOnes.size());
- logError(message);
-
- // Min of the shifts
- long minShift = -1L;
- for (String lateServer : lateOnes.keySet())
- {
-
- /*
- * Choose the server who is the closest to us regarding our server id
- * (this is the most up to date regarding our server id).
- */
- ServerState curState = lateOnes.get(lateServer);
- ChangeNumber ourSidCn = curState.getMaxChangeNumber(serverId);
- if (ourSidCn == null)
- {
- ourSidCn = new ChangeNumber(0, 0, serverId);
- }
- // Cannot be negative as our Cn for our server id is strictly
- // greater than those of the servers in late server list
- long tmpShift = myChangeNumber.getTime() - ourSidCn.getTime();
-
- if ((minShift < 0) // First time in loop
- || (tmpShift < minShift))
- {
- // This sever is even closer to topo state
- bestServer = lateServer;
- minShift = tmpShift;
- }
- } // For late servers
- }
-
- return bestServer;
- }
-
- /**
* Search for the changes that happened since fromChangeNumber
* based on the historical attribute.
* @param baseDn the base DN
@@ -715,26 +493,26 @@
* @throws Exception when raised.
*/
public static InternalSearchOperation seachForChangedEntries(
- DN baseDn,
- ChangeNumber fromChangeNumber,
- InternalSearchListener resultListener)
- throws Exception
+ DN baseDn,
+ ChangeNumber fromChangeNumber,
+ InternalSearchListener resultListener)
+ throws Exception
{
InternalClientConnection conn =
InternalClientConnection.getRootConnection();
LDAPFilter filter = LDAPFilter.decode(
- "(" + Historical.HISTORICALATTRIBUTENAME +
- ">=dummy:" + fromChangeNumber + ")");
+ "("+ Historical.HISTORICALATTRIBUTENAME +
+ ">=dummy:" + fromChangeNumber + ")");
LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
attrs.add(Historical.HISTORICALATTRIBUTENAME);
attrs.add(Historical.ENTRYUIDNAME);
return conn.processSearch(
- new ASN1OctetString(baseDn.toString()),
- SearchScope.WHOLE_SUBTREE,
- DereferencePolicy.NEVER_DEREF_ALIASES,
- 0, 0, false, filter,
- attrs,
- resultListener);
+ new ASN1OctetString(baseDn.toString()),
+ SearchScope.WHOLE_SUBTREE,
+ DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, 0, false, filter,
+ attrs,
+ resultListener);
}
/**
@@ -746,13 +524,14 @@
if (heartbeatInterval > 0)
{
heartbeatMonitor =
- new HeartbeatMonitor("Replication Heartbeat Monitor on " +
- baseDn + " with " + getReplicationServer(),
- session, heartbeatInterval);
+ new HeartbeatMonitor("Replication Heartbeat Monitor on " +
+ baseDn + " with " + getReplicationServer(),
+ session, heartbeatInterval);
heartbeatMonitor.start();
}
}
+
/**
* restart the ReplicationBroker.
*/
@@ -777,7 +556,7 @@
}
} catch (IOException e1)
{
- // ignore
+ // ignore
}
if (failingSession == session)
@@ -793,7 +572,7 @@
{
MessageBuilder mb = new MessageBuilder();
mb.append(NOTE_EXCEPTION_RESTARTING_SESSION.get(
- baseDn.toNormalizedString(), e.getLocalizedMessage()));
+ baseDn.toNormalizedString(), e.getLocalizedMessage()));
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
}
@@ -804,12 +583,13 @@
Thread.sleep(500);
} catch (InterruptedException e)
{
- // ignore
+ // ignore
}
}
}
}
+
/**
* Publish a message to the other servers.
* @param msg the message to publish
@@ -818,7 +598,7 @@
{
boolean done = false;
- while (!done && !shutdown)
+ while (!done)
{
if (connectionError)
{
@@ -831,7 +611,7 @@
if (debugEnabled())
{
debugInfo("ReplicationBroker.publish() Publishing a " +
- " message is not possible due to existing connection error.");
+ " message is not possible due to existing connection error.");
}
return;
@@ -862,8 +642,9 @@
// want to hold off reconnection in case the connection dropped.
credit =
currentWindowSemaphore.tryAcquire(
- (long) 500, TimeUnit.MILLISECONDS);
- } else
+ (long) 500, TimeUnit.MILLISECONDS);
+ }
+ else
{
credit = true;
}
@@ -904,22 +685,24 @@
if (debugEnabled())
{
debugInfo("ReplicationBroker.publish() " +
- "IO exception raised : " + e.getLocalizedMessage());
+ "IO exception raised : " + e.getLocalizedMessage());
}
}
}
- } catch (InterruptedException e)
+ }
+ catch (InterruptedException e)
{
// just loop.
if (debugEnabled())
{
debugInfo("ReplicationBroker.publish() " +
- "Interrupted exception raised." + e.getLocalizedMessage());
+ "Interrupted exception raised." + e.getLocalizedMessage());
}
}
}
}
+
/**
* Receive a message.
* This method is not multithread safe and should either always be
@@ -947,7 +730,8 @@
{
WindowMessage windowMsg = (WindowMessage) msg;
sendWindow.release(windowMsg.getNumAck());
- } else
+ }
+ else
{
if (msg instanceof UpdateMessage)
{
@@ -968,11 +752,11 @@
if (shutdown == false)
{
Message message =
- NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
+ NOTE_DISCONNECTED_FROM_CHANGELOG.get(replicationServer);
logError(message);
debugInfo("ReplicationBroker.receive() " + baseDn +
- " Exception raised." + e + e.getLocalizedMessage());
+ " Exception raised." + e + e.getLocalizedMessage());
this.reStart(failingSession);
}
}
@@ -980,6 +764,7 @@
return null;
}
+
/**
* stop the server.
*/
@@ -988,10 +773,8 @@
replicationServer = "stopped";
shutdown = true;
connected = false;
- if (heartbeatMonitor != null)
- {
+ if (heartbeatMonitor!= null)
heartbeatMonitor.shutdown();
- }
try
{
if (debugEnabled())
@@ -1001,12 +784,9 @@
}
if (session != null)
- {
session.close();
- }
} catch (IOException e)
- {
- }
+ {}
}
/**
@@ -1054,13 +834,12 @@
{
return replicationServer;
}
-
/**
* {@inheritDoc}
*/
public void handleInternalSearchEntry(
- InternalSearchOperation searchOperation,
- SearchResultEntry searchEntry)
+ InternalSearchOperation searchOperation,
+ SearchResultEntry searchEntry)
{
/*
* Only deal with modify operation so far
@@ -1083,10 +862,10 @@
* {@inheritDoc}
*/
public void handleInternalSearchReference(
- InternalSearchOperation searchOperation,
- SearchResultReference searchReference)
+ InternalSearchOperation searchOperation,
+ SearchResultReference searchReference)
{
- // TODO to be implemented
+ // TODO to be implemented
}
/**
@@ -1127,12 +906,9 @@
public int getCurrentSendWindow()
{
if (connected)
- {
return sendWindow.availablePermits();
- } else
- {
+ else
return 0;
- }
}
/**
@@ -1144,6 +920,7 @@
return numLostConnections;
}
+
/**
* Change some config parameters.
*
@@ -1156,8 +933,8 @@
* @param heartbeatInterval The heartbeat interval.
*/
public void changeConfig(Collection<String> replicationServers,
- int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
- int maxSendDelay, int window, long heartbeatInterval)
+ int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
+ int maxSendDelay, int window, long heartbeatInterval)
{
this.servers = replicationServers;
this.maxRcvWindow = window;
@@ -1166,9 +943,9 @@
this.maxReceiveQueue = maxReceiveQueue;
this.maxSendDelay = maxSendDelay;
this.maxSendQueue = maxSendQueue;
- // TODO : Changing those parameters requires to either restart a new
- // session with the replicationServer or renegociate the parameters that
- // were sent in the ServerStart message
+ // TODO : Changing those parameters requires to either restart a new
+ // session with the replicationServer or renegociate the parameters that
+ // were sent in the ServerStart message
}
/**
@@ -1191,11 +968,7 @@
return !connectionError;
}
- private boolean debugEnabled()
- {
- return true;
- }
-
+ private boolean debugEnabled() { return true; }
private static final void debugInfo(String s)
{
logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
--
Gitblit v1.10.0