From 5d1c7d62d688c64af2627ceb8b1556ef313954ec Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 14 Aug 2013 14:22:52 +0000
Subject: [PATCH] ReplicationServerDomain.java Renamed directoryServers to connectedDSs. Renamed replicationServers to connectedRSs. Removed the useless getConnectedLDAPservers(), replaced with getConnectedDSs(). Renamed a few local variables.
---
opends/src/server/org/opends/server/replication/server/DataServerHandler.java | 179 +++++-------
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 172 +++++-------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java | 422 ++++++++++++--------------------
3 files changed, 311 insertions(+), 462 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 2dbda07..8ec6405 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -29,7 +29,6 @@
import java.io.IOException;
import java.util.*;
-import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.server.replication.common.*;
@@ -39,6 +38,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachine.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
@@ -92,76 +92,10 @@
*/
public void changeStatusForResetGenId(long newGenId) throws IOException
{
- final int localRsServerId = replicationServer.getServerId();
-
- StatusMachineEvent event;
- if (newGenId == -1)
+ StatusMachineEvent event = getStatusMachineEvent(newGenId);
+ if (event == null)
{
- // The generation id is being made invalid, let's put the DS
- // into BAD_GEN_ID_STATUS
- event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
- } else
- {
- if (newGenId == generationId)
- {
- if (status == ServerStatus.BAD_GEN_ID_STATUS)
- {
- // This server has the good new reference generation id.
- // Close connection with him to force his reconnection: DS will
- // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
-
- if (debugEnabled())
- {
- TRACER.debugInfo(
- "In RS " + localRsServerId +
- ", closing connection to DS " + getServerId() +
- " for baseDn " + getBaseDN() +
- " to force reconnection as new local" +
- " generationId and remote one match and DS is in bad gen id: " +
- newGenId);
- }
-
- // Connection closure must not be done calling RSD.stopHandler() as it
- // would rewait the RSD lock that we already must have entering this
- // method. This would lead to a reentrant lock which we do not want.
- // So simply close the session, this will make the hang up appear
- // after the reader thread that took the RSD lock releases it.
- if (session != null
- // V4 protocol introduced a StopMsg to properly close the
- // connection between servers
- && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- try
- {
- session.publish(new StopMsg());
- }
- catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
- }
-
- // NOT_CONNECTED_STATUS is the last one in RS session life: handler
- // will soon disappear after this method call...
- status = ServerStatus.NOT_CONNECTED_STATUS;
- return;
- } else
- {
- if (debugEnabled())
- {
- TRACER.debugInfo("In RS " + localRsServerId + ". DS "
- + getServerId() + " for baseDn " + getBaseDN()
- + " has already generation id " + newGenId
- + " so no ChangeStatusMsg sent to him.");
- }
- return;
- }
- } else
- {
- // This server has a bad generation id compared to new reference one,
- // let's put it into BAD_GEN_ID_STATUS
- event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
- }
+ return;
}
if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
@@ -170,7 +104,7 @@
// Prevent useless error message (full update status cannot lead to bad
// gen status)
Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
- Integer.toString(localRsServerId),
+ Integer.toString(replicationServer.getServerId()),
getBaseDN(),
Integer.toString(serverId),
Long.toString(generationId),
@@ -179,30 +113,73 @@
return;
}
- ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
+ changeStatus(event, "for reset gen id");
+ }
- if (newStatus == ServerStatus.INVALID_STATUS)
+ private StatusMachineEvent getStatusMachineEvent(long newGenId)
+ {
+ if (newGenId == -1)
{
- Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(getBaseDN(),
- Integer.toString(serverId), status.toString(), event.toString());
- logError(msg);
- return;
+ // The generation id is being made invalid, let's put the DS
+ // into BAD_GEN_ID_STATUS
+ return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
+ }
+ if (newGenId != generationId)
+ {
+ // This server has a bad generation id compared to new reference one,
+ // let's put it into BAD_GEN_ID_STATUS
+ return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
}
- // Send message requesting to change the DS status
- ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
- ServerStatus.INVALID_STATUS);
+ if (status != ServerStatus.BAD_GEN_ID_STATUS)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("In RS " + replicationServer.getServerId()
+ + ", DS " + getServerId() + " for baseDn " + getBaseDN()
+ + " has already generation id " + newGenId
+ + " so no ChangeStatusMsg sent to him.");
+ }
+ return null;
+ }
+
+ // This server has the good new reference generation id.
+ // Close connection with him to force his reconnection: DS will
+ // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
if (debugEnabled())
{
- TRACER.debugInfo("In RS " + localRsServerId
- + " Sending change status for reset gen id to " + getServerId()
- + " for baseDn " + getBaseDN() + ":\n" + csMsg);
+ TRACER.debugInfo("In RS " + replicationServer.getServerId()
+ + ", closing connection to DS " + getServerId() + " for baseDn "
+ + getBaseDN() + " to force reconnection as new local"
+ + " generationId and remote one match and DS is in bad gen id: "
+ + newGenId);
}
- session.publish(csMsg);
+ // Connection closure must not be done calling RSD.stopHandler() as it
+ // would rewait the RSD lock that we already must have entering this
+ // method. This would lead to a reentrant lock which we do not want.
+ // So simply close the session, this will make the hang up appear
+ // after the reader thread that took the RSD lock releases it.
+ if (session != null
+ // V4 protocol introduced a StopMsg to properly close the
+ // connection between servers
+ && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ try
+ {
+ session.publish(new StopMsg());
+ }
+ catch (IOException ioe)
+ {
+ // Anyway, going to close session, so nothing to do
+ }
+ }
- status = newStatus;
+ // NOT_CONNECTED_STATUS is the last one in RS session life: handler
+ // will soon disappear after this method call...
+ status = ServerStatus.NOT_CONNECTED_STATUS;
+ return null;
}
/**
@@ -215,6 +192,12 @@
public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event)
throws IOException
{
+ return changeStatus(event, "from status analyzer");
+ }
+
+ private ServerStatus changeStatus(StatusMachineEvent event, String origin)
+ throws IOException
+ {
// Check state machine allows this new status (Sanity check)
ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
if (newStatus == ServerStatus.INVALID_STATUS)
@@ -222,22 +205,20 @@
Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(getBaseDN(),
Integer.toString(serverId), status.toString(), event.toString());
logError(msg);
- // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS
- // and vice versa. We may are being trying to change the status while for
- // instance another status has just been entered: e.g a full update has
- // just been engaged. In that case, just ignore attempt to change the
- // status
+ // Only change allowed is from NORMAL_STATUS to DEGRADED_STATUS and vice
+ // versa. We may be trying to change the status while another status has
+ // just been entered: e.g a full update has just been engaged.
+ // In that case, just ignore attempt to change the status
return newStatus;
}
// Send message requesting to change the DS status
- ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
- ServerStatus.INVALID_STATUS);
+ ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, INVALID_STATUS);
if (debugEnabled())
{
TRACER.debugInfo("In RS " + replicationServer.getServerId()
- + " Sending change status from status analyzer to " + getServerId()
+ + " Sending change status " + origin + " to " + getServerId()
+ " for baseDn " + getBaseDN() + ":\n" + csMsg);
}
@@ -589,7 +570,7 @@
localGenerationId, sslEncryption, getLocalGroupId(),
replicationServer.getDegradedStatusThreshold(),
replicationServer.getWeight(),
- replicationServerDomain.getConnectedLDAPservers().size());
+ replicationServerDomain.getConnectedDSs().size());
}
send(startMsg);
@@ -626,16 +607,10 @@
* receiving a StopMsg to properly stop the handshake procedure.
* @return the startSessionMsg received or null DS sent a stop message to
* not finish the handshake.
- * @throws DirectoryException
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws DataFormatException
- * @throws NotSupportedOldVersionPDUException
+ * @throws Exception
*/
private StartSessionMsg waitAndProcessStartSessionFromRemoteDS()
- throws DirectoryException, IOException, ClassNotFoundException,
- DataFormatException,
- NotSupportedOldVersionPDUException
+ throws Exception
{
ReplicationMsg msg = session.receive();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 940192b..31156f3 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -92,7 +92,7 @@
* we are currently publishing the first update in the balanced tree is the
* next change that we must push to this particular server.
*/
- private final Map<Integer, DataServerHandler> directoryServers =
+ private final Map<Integer, DataServerHandler> connectedDSs =
new ConcurrentHashMap<Integer, DataServerHandler>();
/**
@@ -101,7 +101,7 @@
* in the balanced tree is the next change that we must push to this
* particular server.
*/
- private final Map<Integer, ReplicationServerHandler> replicationServers =
+ private final Map<Integer, ReplicationServerHandler> connectedRSs =
new ConcurrentHashMap<Integer, ReplicationServerHandler>();
private final Queue<MessageHandler> otherHandlers =
@@ -358,12 +358,10 @@
*/
NotAssuredUpdateMsg notAssuredUpdate = null;
- /*
- * Push the message to the replication servers
- */
+ // Push the message to the replication servers
if (sourceHandler.isDataServer())
{
- for (ReplicationServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : connectedRSs.values())
{
/**
* Ignore updates to RS with bad gen id
@@ -392,10 +390,8 @@
}
}
- /*
- * Push the message to the LDAP servers
- */
- for (DataServerHandler handler : directoryServers.values())
+ // Push the message to the LDAP servers
+ for (DataServerHandler handler : connectedDSs.values())
{
// Don't forward the change to the server that just sent it
if (handler == sourceHandler)
@@ -576,7 +572,7 @@
}
// Look for DS eligible for assured
- for (DataServerHandler handler : directoryServers.values())
+ for (DataServerHandler handler : connectedDSs.values())
{
// Don't forward the change to the server that just sent it
if (handler == sourceHandler)
@@ -735,7 +731,7 @@
private void collectRSsEligibleForAssuredReplication(byte groupId,
List<Integer> expectedServers)
{
- for (ReplicationServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : connectedRSs.values())
{
if (handler.getGroupId() == groupId
// No ack expected from a RS with different group id
@@ -908,9 +904,8 @@
List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers();
for (Integer serverId : serversInTimeout)
{
- ServerHandler expectedDSInTimeout = directoryServers.get(serverId);
- ServerHandler expectedRSInTimeout =
- replicationServers.get(serverId);
+ ServerHandler expectedDSInTimeout = connectedDSs.get(serverId);
+ ServerHandler expectedRSInTimeout = connectedRSs.get(serverId);
if (expectedDSInTimeout != null)
{
if (safeRead)
@@ -950,7 +945,7 @@
*/
public void stopReplicationServers(Collection<String> replServers)
{
- for (ReplicationServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : connectedRSs.values())
{
if (replServers.contains(handler.getServerAddressURL()))
{
@@ -968,13 +963,13 @@
public void stopAllServers(boolean shutdown)
{
// Close session with other replication servers
- for (ReplicationServerHandler serverHandler : replicationServers.values())
+ for (ReplicationServerHandler serverHandler : connectedRSs.values())
{
stopServer(serverHandler, shutdown);
}
// Close session with other LDAP servers
- for (DataServerHandler serverHandler : directoryServers.values())
+ for (DataServerHandler serverHandler : connectedDSs.values())
{
stopServer(serverHandler, shutdown);
}
@@ -988,12 +983,12 @@
*/
public boolean checkForDuplicateDS(DataServerHandler handler)
{
- if (directoryServers.containsKey(handler.getServerId()))
+ if (connectedDSs.containsKey(handler.getServerId()))
{
// looks like two connected LDAP servers have the same serverId
Message message = ERR_DUPLICATE_SERVER_ID.get(
localReplicationServer.getMonitorInstanceName(),
- directoryServers.get(handler.getServerId()).toString(),
+ connectedDSs.get(handler.getServerId()).toString(),
handler.toString(), handler.getServerId());
logError(message);
return false;
@@ -1047,7 +1042,7 @@
try
{
// Stop useless monitoring publisher if no more RS or DS in domain
- if ( (directoryServers.size() + replicationServers.size() )== 1)
+ if ( (connectedDSs.size() + connectedRSs.size() )== 1)
{
if (debugEnabled())
{
@@ -1062,7 +1057,7 @@
if (handler.isReplicationServer())
{
- if (replicationServers.containsKey(handler.getServerId()))
+ if (connectedRSs.containsKey(handler.getServerId()))
{
unregisterServerHandler(handler);
handler.shutdown();
@@ -1076,11 +1071,11 @@
buildAndSendTopoInfoToDSs(null);
}
}
- } else if (directoryServers.containsKey(handler.getServerId()))
+ } else if (connectedDSs.containsKey(handler.getServerId()))
{
// If this is the last DS for the domain,
// shutdown the status analyzer
- if (directoryServers.size() == 1)
+ if (connectedDSs.size() == 1)
{
if (debugEnabled())
{
@@ -1193,11 +1188,11 @@
{
if (handler.isReplicationServer())
{
- replicationServers.remove(handler.getServerId());
+ connectedRSs.remove(handler.getServerId());
}
else
{
- directoryServers.remove(handler.getServerId());
+ connectedDSs.remove(handler.getServerId());
}
}
@@ -1224,9 +1219,9 @@
// topology and the generationId has never been saved, then we can reset
// it and the next LDAP server to connect will become the new reference.
boolean lDAPServersConnectedInTheTopology = false;
- if (directoryServers.isEmpty())
+ if (connectedDSs.isEmpty())
{
- for (ReplicationServerHandler rsh : replicationServers.values())
+ for (ReplicationServerHandler rsh : connectedRSs.values())
{
if (generationId != rsh.getGenerationId())
{
@@ -1283,7 +1278,7 @@
throws DirectoryException
{
ReplicationServerHandler oldHandler =
- replicationServers.get(handler.getServerId());
+ connectedRSs.get(handler.getServerId());
if (oldHandler != null)
{
if (oldHandler.getServerAddressURL().equals(
@@ -1339,7 +1334,7 @@
public Set<String> getChangelogs()
{
Set<String> results = new LinkedHashSet<String>();
- for (ReplicationServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : connectedRSs.values())
{
results.add(handler.getServerAddressURL());
}
@@ -1358,22 +1353,6 @@
}
/**
- * Returns as a set of String the list of LDAP servers connected to us.
- * Each string is the serverID of a connected LDAP server.
- *
- * @return The set of connected LDAP servers
- */
- public List<String> getConnectedLDAPservers()
- {
- List<String> results = new ArrayList<String>(0);
- for (DataServerHandler handler : directoryServers.values())
- {
- results.add(String.valueOf(handler.getServerId()));
- }
- return results;
- }
-
- /**
* Creates and returns an iterator.
* When the iterator is not used anymore, the caller MUST call the
* ReplicationIterator.releaseCursor() method to free the resources
@@ -1493,7 +1472,7 @@
{
// Send to all replication servers with a least one remote
// server connected
- for (ReplicationServerHandler rsh : replicationServers.values())
+ for (ReplicationServerHandler rsh : connectedRSs.values())
{
if (rsh.hasRemoteLDAPServers())
{
@@ -1503,7 +1482,7 @@
}
// Sends to all connected LDAP servers
- for (DataServerHandler destinationHandler : directoryServers.values())
+ for (DataServerHandler destinationHandler : connectedDSs.values())
{
// Don't loop on the sender
if (destinationHandler == senderHandler)
@@ -1516,7 +1495,7 @@
{
// Destination is one server
DataServerHandler destinationHandler =
- directoryServers.get(msg.getDestination());
+ connectedDSs.get(msg.getDestination());
if (destinationHandler != null)
{
servers.add(destinationHandler);
@@ -1527,13 +1506,13 @@
// have the targeted server connected.
if (senderHandler.isDataServer())
{
- for (ReplicationServerHandler h : replicationServers.values())
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
{
// Send to all replication servers with a least one remote
// server connected
- if (h.isRemoteLDAPServer(msg.getDestination()))
+ if (rsHandler.isRemoteLDAPServer(msg.getDestination()))
{
- servers.add(h);
+ servers.add(rsHandler);
}
}
}
@@ -1808,14 +1787,14 @@
// from the states stored in the serverHandler.
// - the server state
// - the older missing change
- for (DataServerHandler lsh : this.directoryServers.values())
+ for (DataServerHandler lsh : this.connectedDSs.values())
{
monitorMsg.setServerState(lsh.getServerId(),
lsh.getServerState(), lsh.getApproxFirstMissingDate(), true);
}
// Same for the connected RS
- for (ReplicationServerHandler rsh : this.replicationServers.values())
+ for (ReplicationServerHandler rsh : this.connectedRSs.values())
{
monitorMsg.setServerState(rsh.getServerId(),
rsh.getServerState(), rsh.getApproxFirstMissingDate(), false);
@@ -1891,7 +1870,7 @@
*/
public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne)
{
- for (DataServerHandler handler : directoryServers.values())
+ for (DataServerHandler handler : connectedDSs.values())
{
if (notThisOne == null || handler != notThisOne)
// All except passed one
@@ -1932,7 +1911,7 @@
public void buildAndSendTopoInfoToRSs()
{
TopologyMsg topoMsg = createTopologyMsgForRS();
- for (ReplicationServerHandler handler : replicationServers.values())
+ for (ReplicationServerHandler handler : connectedRSs.values())
{
for (int i=1; i<=2; i++)
{
@@ -1976,7 +1955,7 @@
List<DSInfo> dsInfos = new ArrayList<DSInfo>();
// Go through every DSs
- for (DataServerHandler serverHandler : directoryServers.values())
+ for (DataServerHandler serverHandler : connectedDSs.values())
{
dsInfos.add(serverHandler.toDSInfo());
}
@@ -2001,7 +1980,7 @@
{
// Go through every DSs (except recipient of msg)
List<DSInfo> dsInfos = new ArrayList<DSInfo>();
- for (DataServerHandler serverHandler : directoryServers.values())
+ for (DataServerHandler serverHandler : connectedDSs.values())
{
if (serverHandler.getServerId() == destDsId)
{
@@ -2017,7 +1996,7 @@
// Go through every peer RSs (and get their connected DSs), also add info
// for RSs
- for (ReplicationServerHandler serverHandler : replicationServers.values())
+ for (ReplicationServerHandler serverHandler : connectedRSs.values())
{
rsInfos.add(serverHandler.toRSInfo());
@@ -2150,7 +2129,7 @@
// If we are the first replication server warned,
// then forwards the reset message to the remote replication servers
- for (ServerHandler rsHandler : replicationServers.values())
+ for (ServerHandler rsHandler : connectedRSs.values())
{
try
{
@@ -2170,7 +2149,7 @@
// Change status of the connected DSs according to the requested new
// reference generation id
- for (DataServerHandler dsHandler : directoryServers.values())
+ for (DataServerHandler dsHandler : connectedDSs.values())
{
try
{
@@ -2362,8 +2341,7 @@
// TODO: i18n
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
- e.getMessage() + " " +
- stackTraceToSingleLineString(e)));
+ e.getMessage() + " " + stackTraceToSingleLineString(e)));
logError(mb.toMessage());
}
}
@@ -2400,10 +2378,10 @@
+ " given local generation Id=" + this.generationId);
}
- ServerHandler handler = replicationServers.get(serverId);
+ ServerHandler handler = connectedRSs.get(serverId);
if (handler == null)
{
- handler = directoryServers.get(serverId);
+ handler = connectedDSs.get(serverId);
if (handler == null)
{
return false;
@@ -2549,7 +2527,7 @@
initializePendingMonitorData();
// Send the monitor requests to the connected replication servers.
- for (ReplicationServerHandler rs : replicationServers.values())
+ for (ReplicationServerHandler rs : connectedRSs.values())
{
// Add server ID to pending table.
int serverId = rs.getServerId();
@@ -2657,13 +2635,12 @@
// So for a given DS connected we can take the state and the max from
// the DS/state.
- for (ServerHandler ds : directoryServers.values())
+ for (ServerHandler ds : connectedDSs.values())
{
int serverID = ds.getServerId();
// the state comes from the state stored in the SH
- ServerState dsState = ds.getServerState()
- .duplicate();
+ ServerState dsState = ds.getServerState().duplicate();
// the max CN sent by that LS also comes from the SH
ChangeNumber maxcn = dsState.getChangeNumber(serverID);
@@ -2725,46 +2702,44 @@
pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
// Store the remote LDAP servers states
- Iterator<Integer> lsidIterator = msg.ldapIterator();
- while (lsidIterator.hasNext())
+ Iterator<Integer> dsServerIdIterator = msg.ldapIterator();
+ while (dsServerIdIterator.hasNext())
{
- int sid = lsidIterator.next();
- ServerState dsServerState = msg.getLDAPServerState(sid);
+ int dsServerId = dsServerIdIterator.next();
+ ServerState dsServerState = msg.getLDAPServerState(dsServerId);
pendingMonitorData.setMaxCNs(dsServerState);
- pendingMonitorData.setLDAPServerState(sid, dsServerState);
- pendingMonitorData.setFirstMissingDate(sid,
- msg.getLDAPApproxFirstMissingDate(sid));
+ pendingMonitorData.setLDAPServerState(dsServerId, dsServerState);
+ pendingMonitorData.setFirstMissingDate(dsServerId,
+ msg.getLDAPApproxFirstMissingDate(dsServerId));
}
// Process the latency reported by the remote RSi on its connections
// to the other RSes
- Iterator<Integer> rsidIterator = msg.rsIterator();
- while (rsidIterator.hasNext())
+ Iterator<Integer> rsServerIdIterator = msg.rsIterator();
+ while (rsServerIdIterator.hasNext())
{
- int rsid = rsidIterator.next();
- if (rsid == localReplicationServer.getServerId())
+ int rsServerId = rsServerIdIterator.next();
+ long newFmd = msg.getRSApproxFirstMissingDate(rsServerId);
+ if (rsServerId == localReplicationServer.getServerId())
{
// this is the latency of the remote RSi regarding the current RS
// let's update the fmd of my connected LS
- for (ServerHandler connectedlsh : directoryServers
- .values())
+ for (DataServerHandler connectedDS : connectedDSs.values())
{
- int connectedlsid = connectedlsh.getServerId();
- Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
- pendingMonitorData.setFirstMissingDate(connectedlsid, newfmd);
+ int connectedServerId = connectedDS.getServerId();
+ pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd);
}
}
else
{
// this is the latency of the remote RSi regarding another RSj
// let's update the latency of the LSes connected to RSj
- ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
+ ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId);
if (rsjHdr != null)
{
- for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds())
+ for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds())
{
- Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
- pendingMonitorData.setFirstMissingDate(remotelsid, newfmd);
+ pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd);
}
}
}
@@ -2773,8 +2748,8 @@
catch (RuntimeException e)
{
// FIXME: do we really expect these???
- logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e
- .getMessage() + stackTraceToSingleLineString(e)));
+ logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
+ e.getMessage() + stackTraceToSingleLineString(e)));
}
finally
{
@@ -2808,7 +2783,7 @@
*/
public Map<Integer, DataServerHandler> getConnectedDSs()
{
- return directoryServers;
+ return connectedDSs;
}
/**
@@ -2817,7 +2792,7 @@
*/
public Map<Integer, ReplicationServerHandler> getConnectedRSs()
{
- return replicationServers;
+ return connectedRSs;
}
@@ -3131,9 +3106,8 @@
result.update(mostRecentDbCN);
}
} catch (Exception e) {
- Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(
- " " + stackTraceToSingleLineString(e));
- logError(errMessage);
+ logError(ERR_WRITER_UNEXPECTED_EXCEPTION
+ .get(stackTraceToSingleLineString(e)));
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
}
@@ -3236,13 +3210,13 @@
private boolean isServerConnected(int serverId)
{
- if (directoryServers.containsKey(serverId))
+ if (connectedDSs.containsKey(serverId))
{
return true;
}
// not directly connected
- for (ReplicationServerHandler rsHandler : replicationServers.values())
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
{
if (rsHandler.isRemoteLDAPServer(serverId))
{
@@ -3283,7 +3257,7 @@
{
// If we are the first replication server warned,
// then forwards the message to the remote replication servers
- for (ReplicationServerHandler rsHandler : replicationServers.values())
+ for (ReplicationServerHandler rsHandler : connectedRSs.values())
{
try
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index 68601a4..47976f4 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -27,23 +27,17 @@
*/
package org.opends.server.replication;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.messages.TaskMessages.*;
-import static org.opends.server.config.ConfigConstants.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-import static org.testng.Assert.*;
-
import java.io.File;
import java.net.SocketTimeoutException;
import java.util.*;
+import org.assertj.core.api.Assertions;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
+import org.opends.server.core.AddOperation;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
@@ -63,6 +57,14 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.messages.TaskMessages.*;
+import static org.opends.server.config.ConfigConstants.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+import static org.testng.Assert.*;
+
/**
* Tests contained here:
*
@@ -202,7 +204,7 @@
"ds-task-initialize-replica-server-id: all");
}
- // Tests that entries have been written in the db
+ /** Tests that entries have been written in the db */
private void testEntriesInDb()
{
log("TestEntriesInDb");
@@ -260,62 +262,19 @@
* @param expectedDone The expected number of entries to be processed.
*/
private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
- long expectedLeft, long expectedDone)
+ long expectedLeft, long expectedDone) throws Exception
{
log("waitTaskCompleted " + taskEntry.toLDIFString());
- try
+
{
- // FIXME - Factorize with TasksTestCase
- // Wait until the task completes.
- int timeout = 2000;
-
- AttributeType completionTimeType = DirectoryServer.getAttributeType(
- ATTR_TASK_COMPLETION_TIME.toLowerCase());
- SearchFilter filter =
- SearchFilter.createFilterFromString("(objectclass=*)");
- Entry resultEntry = null;
- String completionTime = null;
- long startMillisecs = System.currentTimeMillis();
- do
- {
- InternalSearchOperation searchOperation =
- connection.processSearch(taskEntry.getDN(),
- SearchScope.BASE_OBJECT,
- filter);
- try
- {
- resultEntry = searchOperation.getSearchEntries().getFirst();
- } catch (Exception e)
- {
- // FIXME How is this possible? Must be issue 858.
- fail("Task entry was not returned from the search.");
- continue;
- }
- completionTime =
- resultEntry.getAttributeValue(completionTimeType,
- DirectoryStringSyntax.DECODER);
-
- if (completionTime == null)
- {
- if (System.currentTimeMillis() - startMillisecs > 1000*timeout)
- {
- break;
- }
- Thread.sleep(100);
- }
- } while (completionTime == null);
-
- if (completionTime == null)
- {
- fail("The task had not completed after " + timeout + " seconds.");
- }
+ Entry resultEntry = getCompletionTime(taskEntry);
// Check that the task state is as expected.
AttributeType taskStateType =
- DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
+ DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
String stateString =
- resultEntry.getAttributeValue(taskStateType,
- DirectoryStringSyntax.DECODER);
+ resultEntry.getAttributeValue(taskStateType,
+ DirectoryStringSyntax.DECODER);
TaskState taskState = TaskState.fromString(stateString);
assertEquals(taskState, expectedState,
"The task completed in an unexpected state");
@@ -323,7 +282,7 @@
// Check that the task contains some log messages.
AttributeType logMessagesType = DirectoryServer.getAttributeType(
ATTR_TASK_LOG_MESSAGES.toLowerCase());
- ArrayList<String> logMessages = new ArrayList<String>();
+ List<String> logMessages = new ArrayList<String>();
resultEntry.getAttributeValues(logMessagesType,
DirectoryStringSyntax.DECODER,
logMessages);
@@ -333,88 +292,94 @@
fail("No log messages were written to the task entry on a failed task");
}
- try
- {
- // Check that the task state is as expected.
- taskStateType =
- DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
- stateString =
- resultEntry.getAttributeValue(taskStateType,
- DirectoryStringSyntax.DECODER);
+ // Check that the task state is as expected.
+ assertAttributeValue(resultEntry, ATTR_TASK_INITIALIZE_LEFT,
+ expectedLeft, "The number of entries to process is not correct.");
- assertEquals(Long.decode(stateString).longValue(),expectedLeft,
- "The number of entries to process is not correct.");
-
- // Check that the task state is as expected.
- taskStateType =
- DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
- stateString =
- resultEntry.getAttributeValue(taskStateType,
- DirectoryStringSyntax.DECODER);
-
- assertEquals(Long.decode(stateString).longValue(),expectedDone,
- "The number of entries processed is not correct.");
-
- }
- catch(Exception e)
- {
- fail("Exception"+ e.getMessage()+e.getStackTrace());
- }
-
+ // Check that the task state is as expected.
+ assertAttributeValue(resultEntry, ATTR_TASK_INITIALIZE_DONE,
+ expectedDone, "The number of entries processed is not correct.");
}
- catch(Exception e)
+ }
+
+ private Entry getCompletionTime(Entry taskEntry) throws Exception
+ {
+ // FIXME - Factorize with TasksTestCase
+ // Wait until the task completes.
+ int timeout = 2000;
+
+ AttributeType completionTimeType = DirectoryServer.getAttributeType(
+ ATTR_TASK_COMPLETION_TIME.toLowerCase());
+ SearchFilter filter =
+ SearchFilter.createFilterFromString("(objectclass=*)");
+
+ long startMillisecs = System.currentTimeMillis();
+ do
{
- fail("Exception"+ e.getMessage()+e.getStackTrace());
+ InternalSearchOperation searchOperation = connection.processSearch(
+ taskEntry.getDN(), SearchScope.BASE_OBJECT, filter);
+ Entry resultEntry = searchOperation.getSearchEntries().getFirst();
+
+ String completionTime = resultEntry.getAttributeValue(
+ completionTimeType, DirectoryStringSyntax.DECODER);
+
+ if (completionTime != null)
+ {
+ return resultEntry;
+ }
+
+ if (System.currentTimeMillis() - startMillisecs > 1000 * timeout)
+ {
+ fail("The task had not completed after " + timeout + " seconds.");
+ }
+ Thread.sleep(100);
}
+ while (true);
+ }
+
+ private void assertAttributeValue(Entry resultEntry, String lowerAttrName,
+ long expected, String message) throws DirectoryException
+ {
+ AttributeType type = DirectoryServer.getAttributeType(lowerAttrName, true);
+ String value = resultEntry.getAttributeValue(type, DirectoryStringSyntax.DECODER);
+ assertEquals(Long.decode(value).longValue(), expected, message);
}
/**
* Add to the current DB the entries necessary to the test.
*/
- private void addTestEntriesToDB()
+ private void addTestEntriesToDB() throws Exception
{
- try
+ for (String ldifEntry : updatedEntries)
{
- for (String ldifEntry : updatedEntries)
- {
- Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
- addTestEntryToDB(entry);
- // They will be removed at the end of the test
- entryList.addLast(entry.getDN());
- }
- log("addTestEntriesToDB : " + updatedEntries.length + " successfully added to DB");
+ Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
+ addTestEntryToDB(entry);
+ // They will be removed at the end of the test
+ entryList.addLast(entry.getDN());
}
- catch(Exception e)
- {
- fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
- }
+ log("addTestEntriesToDB : " + updatedEntries.length
+ + " successfully added to DB");
}
private void addTestEntryToDB(Entry entry)
{
- try
+ AddOperation addOp =
+ new AddOperationBasis(connection, InternalClientConnection
+ .nextOperationID(), InternalClientConnection.nextMessageID(), null,
+ entry.getDN(), entry.getObjectClasses(), entry.getUserAttributes(),
+ entry.getOperationalAttributes());
+ addOp.setInternalOperation(true);
+ addOp.run();
+ if (addOp.getResultCode() != ResultCode.SUCCESS)
{
- AddOperationBasis addOp = new AddOperationBasis(connection,
- InternalClientConnection.nextOperationID(), InternalClientConnection
- .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
- entry.getUserAttributes(), entry.getOperationalAttributes());
- addOp.setInternalOperation(true);
- addOp.run();
- if (addOp.getResultCode() != ResultCode.SUCCESS)
- {
- log("addEntry: Failed" + addOp.getResultCode());
- }
+ log("addEntry: Failed" + addOp.getResultCode());
+ }
- // They will be removed at the end of the test
- entryList.addLast(entry.getDN());
- }
- catch(Exception e)
- {
- fail("addTestEntryToDB Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
- }
+ // They will be removed at the end of the test
+ entryList.addLast(entry.getDN());
}
- /*
+ /**
* Creates entries necessary to the test.
*/
private String[] newLDIFEntries(int entriesCnt)
@@ -426,8 +391,6 @@
bigAttributeValue[i] = Integer.toString(i).charAt(0);
String[] entries = new String[entriesCnt + 2];
- String filler = "000000000000000000000000000000000000";
-
entries[0] = "dn: " + EXAMPLE_DN + "\n"
+ "objectClass: top\n"
+ "objectClass: domain\n"
@@ -441,6 +404,7 @@
+ "entryUUID: 21111111-1111-1111-1111-111111111112\n"
+ "\n";
+ String filler = "000000000000000000000000000000000000";
for (int i=0; i<entriesCnt; i++)
{
String useri="0000"+i;
@@ -472,36 +436,25 @@
private void makeBrokerPublishEntries(ReplicationBroker broker,
int senderID, int destinationServerID, int requestorID)
{
- // Send entries
- try
+ RoutableMsg initTargetMessage =
+ new InitializeTargetMsg(EXAMPLE_DN, server2ID, destinationServerID,
+ requestorID, updatedEntries.length, initWindow);
+ broker.publish(initTargetMessage);
+
+ int cnt = 0;
+ for (String entry : updatedEntries)
{
- RoutableMsg initTargetMessage =
- new InitializeTargetMsg(
- EXAMPLE_DN, server2ID, destinationServerID, requestorID,
- updatedEntries.length, initWindow);
- broker.publish(initTargetMessage);
+ log("Broker will publish 1 entry: bytes:" + entry.length());
- int cnt = 0;
- for (String entry : updatedEntries)
- {
- log("Broker will publish 1 entry: bytes:"+ entry.length());
-
- EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID,
- entry.getBytes(), ++cnt);
- broker.publish(entryMsg);
- }
-
- DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID);
- broker.publish(doneMsg);
-
- log("Broker " + senderID + " published entries");
-
+ EntryMsg entryMsg =
+ new EntryMsg(senderID, destinationServerID, entry.getBytes(), ++cnt);
+ broker.publish(entryMsg);
}
- catch(Exception e)
- {
- fail("makeBrokerPublishEntries Exception:"+ e.getMessage() + " "
- + stackTraceToSingleLineString(e));
- }
+
+ DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID);
+ broker.publish(doneMsg);
+
+ log("Broker " + senderID + " published entries");
}
void receiveUpdatedEntries(ReplicationBroker broker, int serverID,
@@ -565,8 +518,7 @@
broker.setGenerationID(EMPTY_DN_GENID);
broker.reStart(true);
- try { Thread.sleep(500); } catch(Exception e) {}
-
+ sleep(500);
}
/**
@@ -598,19 +550,18 @@
* @param changelogId The serverID of the replicationServer to create.
* @return The new replicationServer.
*/
- private ReplicationServer createChangelogServer(int changelogId, String testCase)
+ private ReplicationServer createChangelogServer(int changelogId,
+ String testCase) throws Exception
{
SortedSet<String> servers = new TreeSet<String>();
- try
- {
- if (changelogId != changelog1ID)
- servers.add("localhost:" + getChangelogPort(changelog1ID));
- if (changelogId != changelog2ID)
- servers.add("localhost:" + getChangelogPort(changelog2ID));
- if (changelogId != changelog3ID)
- servers.add("localhost:" + getChangelogPort(changelog3ID));
+ if (changelogId != changelog1ID)
+ servers.add("localhost:" + getChangelogPort(changelog1ID));
+ if (changelogId != changelog2ID)
+ servers.add("localhost:" + getChangelogPort(changelog2ID));
+ if (changelogId != changelog3ID)
+ servers.add("localhost:" + getChangelogPort(changelog3ID));
- ReplServerFakeConfiguration conf =
+ ReplServerFakeConfiguration conf =
new ReplServerFakeConfiguration(
getChangelogPort(changelogId),
"initOnlineTest" + getChangelogPort(changelogId) + testCase + "Db",
@@ -619,16 +570,10 @@
0,
100,
servers);
- ReplicationServer replicationServer = new ReplicationServer(conf);
- Thread.sleep(1000);
+ ReplicationServer replicationServer = new ReplicationServer(conf);
+ Thread.sleep(1000);
- return replicationServer;
- }
- catch (Exception e)
- {
- fail("createChangelog" + stackTraceToSingleLineString(e));
- }
- return null;
+ return replicationServer;
}
/**
@@ -636,52 +581,42 @@
* replication Server ID.
* @param changelogID
*/
- private void connectServer1ToChangelog(int changelogID)
+ private void connectServer1ToChangelog(int changelogID) throws Exception
{
connectServer1ToChangelog(changelogID, 0);
}
- private void connectServer1ToChangelog(int changelogID, int heartbeat)
+ private void connectServer1ToChangelog(int changelogID, int heartbeat) throws Exception
{
- // Connect DS to the replicationServer
- try
- {
- // suffix synchronized
- String testName = "initOnLineTest";
- String synchroServerLdif =
- "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
- + "objectClass: top\n"
- + "objectClass: ds-cfg-synchronization-provider\n"
- + "objectClass: ds-cfg-replication-domain\n"
- + "cn: " + testName + "\n"
- + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
- + "ds-cfg-replication-server: localhost:"
- + getChangelogPort(changelogID)+"\n"
- + "ds-cfg-server-id: " + server1ID + "\n"
- + "ds-cfg-receive-status: true\n"
- + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
- + "ds-cfg-window-size: " + WINDOW_SIZE;
+ // suffix synchronized
+ String testName = "initOnLineTest";
+ String synchroServerLdif =
+ "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
+ + "objectClass: top\n"
+ + "objectClass: ds-cfg-synchronization-provider\n"
+ + "objectClass: ds-cfg-replication-domain\n"
+ + "cn: " + testName + "\n"
+ + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
+ + "ds-cfg-replication-server: localhost:"
+ + getChangelogPort(changelogID)+"\n"
+ + "ds-cfg-server-id: " + server1ID + "\n"
+ + "ds-cfg-receive-status: true\n"
+ + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
+ + "ds-cfg-window-size: " + WINDOW_SIZE;
+ // Clear the backend
+ LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
- // Clear the backend
- LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
-
- synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
- DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
- assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+ synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+ DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+ assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
"Unable to add the synchronized server");
- configEntryList.add(synchroServerEntry.getDN());
+ configEntryList.add(synchroServerEntry.getDN());
- replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn);
+ replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn);
- assertTrue(!replDomain.ieRunning(),
+ assertTrue(!replDomain.ieRunning(),
"ReplicationDomain: Import/Export is not expected to be running");
- }
- catch(Exception e)
- {
- log("connectServer1ToChangelog", e);
- fail("connectServer1ToChangelog", e);
- }
}
private int getChangelogPort(int changelogID) throws Exception
@@ -748,10 +683,6 @@
testEntriesInDb();
log("Successfully ending " + testCase);
- }
- catch(Exception e)
- {
- fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
} finally
{
afterTest(testCase);
@@ -967,10 +898,6 @@
testEntriesInDb();
log("Successfully ending " + testCase);
- }
- catch(Exception e)
- {
- fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
} finally
{
afterTest(testCase);
@@ -1022,10 +949,6 @@
// createTask(taskInitTargetS2);
log("Successfully ending " + testCase);
- }
- catch(Exception e)
- {
- fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
} finally
{
afterTest(testCase);
@@ -1094,10 +1017,6 @@
// createTask(taskInitTargetS2);
log("Successfully ending " + testCase);
- }
- catch(Exception e)
- {
- fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
} finally
{
afterTest(testCase);
@@ -1155,40 +1074,32 @@
// Check that the list of connected LDAP servers is correct
// in each replication servers
- List<String> l1 = changelog1.getReplicationServerDomain(
- baseDn.toNormalizedString(), false).
- getConnectedLDAPservers();
- assertEquals(l1.size(), 1);
- assertEquals(l1.get(0), String.valueOf(server1ID));
+ Set<Integer> l1 = changelog1.getReplicationServerDomain(
+ baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
+ Assertions.assertThat(l1).containsExactly(server1ID);
- List<String> l2;
- l2 = changelog2.getReplicationServerDomain(
- baseDn.toNormalizedString(), false).getConnectedLDAPservers();
- assertEquals(l2.size(), 2);
- assertTrue(l2.contains(String.valueOf(server2ID)));
- assertTrue(l2.contains(String.valueOf(server3ID)));
+ Set<Integer> l2 = changelog2.getReplicationServerDomain(
+ baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
+ Assertions.assertThat(l2).containsExactly(server2ID, server3ID);
- List<String> l3;
- l3 = changelog3.getReplicationServerDomain(
- baseDn.toNormalizedString(), false).getConnectedLDAPservers();
- assertEquals(l3.size(), 0);
+ Set<Integer> l3 = changelog3.getReplicationServerDomain(
+ baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
+ Assertions.assertThat(l3).isEmpty();
// Test updates
broker3.stop();
Thread.sleep(1000);
- l2 = changelog2.getReplicationServerDomain(
- baseDn.toNormalizedString(), false).getConnectedLDAPservers();
- assertEquals(l2.size(), 1);
- assertEquals(l2.get(0), String.valueOf(server2ID));
+ l2 = changelog2.getReplicationServerDomain(
+ baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
+ Assertions.assertThat(l2).containsExactly(server2ID);
broker3 = openReplicationSession(DN.decode(EXAMPLE_DN),
server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
broker2.stop();
Thread.sleep(1000);
- l2 = changelog2.getReplicationServerDomain(
- baseDn.toNormalizedString(), false).getConnectedLDAPservers();
- assertEquals(l2.size(), 1);
- assertEquals(l2.get(0), String.valueOf(server3ID));
+ l2 = changelog2.getReplicationServerDomain(
+ baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
+ Assertions.assertThat(l2).containsExactly(server3ID);
// TODO Test ReplicationServerDomain.getDestinationServers method.
@@ -1262,10 +1173,6 @@
log("Successfully ending " + testCase);
}
- catch(Exception e)
- {
- log(testCase + e.getLocalizedMessage());
- }
finally
{
afterTest(testCase);
@@ -1567,18 +1474,11 @@
// in those cases, loop for a while waiting for completion.
for (int i = 0; i< 10; i++)
{
- if (replDomain.ieRunning())
- {
- try
- {
- Thread.sleep(500);
- } catch (InterruptedException e)
- { }
- }
- else
+ if (!replDomain.ieRunning())
{
break;
}
+ sleep(500);
}
assertTrue(!replDomain.ieRunning(),
"ReplicationDomain: Import/Export is not expected to be running");
@@ -1591,14 +1491,14 @@
if (server2 != null)
{
server2.stop();
- TestCaseUtils.sleep(100); // give some time to the broker to disconnect
+ sleep(100); // give some time to the broker to disconnect
// from the replicationServer.
server2 = null;
}
if (server3 != null)
{
server3.stop();
- TestCaseUtils.sleep(100); // give some time to the broker to disconnect
+ sleep(100); // give some time to the broker to disconnect
// from the replicationServer.
server3 = null;
}
@@ -1639,7 +1539,7 @@
log("Successfully cleaned " + testCase);
}
- /**
+ /**
* Clean up the environment.
*
* @throws Exception If the environment could not be set up.
--
Gitblit v1.10.0